Hama学习笔记(3)-编写BSP程序

xiaoxiao2021-03-01  12

Hama中提供了BSP框架的编程接口,就像MapReduce一样方便使用。

[引用请注明出处:http://blog.csdn.net/bhq2010/article/details/8531243]

BSP框架

首先明确一下BSP的概念:

BSP是一个计算框架,按照这个框架编写的BSP程序会在集群的各个节点上做本地的I/O和计算,这一点和MapReduce相似(其实BSP的提出比MapReduce还要早差不多10年,应该算前辈才是~),但不同的是BSP框架中,各个节点之间可以进行比较有效的通信。

一个BSP程序(或者叫BSP Job)的执行过程中包含了若干个超步(Supersteps),每个超步的执行过程又有以下三个步骤:

各个节点本地的计算->节点间通信->节点同步

第一和第二个步骤之前其实没有明确的界限。在一个超步中,各个结点在进入同步状态之前可以随时进行I/O和通信。

当某个结点认为自己的计算任务已经完成时,可以进入同步状态并挂起。当一个超步中所有的结点都进入同步状态时,一个超步就结束了,各个节点从挂起处开始继续执行,所有结点都退出时,整个BSP程序就结束了。

继承BSP类

Hama中编写BSP程序和Hadoop MapReduce差不多,首先写一个类,继承Hama API中的BSP抽象类,例如:

public static class MyEstimator extends BSP<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> 这个类不一定是static的,以上只是hama.example里计算PI的一个例程。

然后要实现BSP类中的抽象方法bsp,例如:

public void bsp(BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> peer) throws IOException, SyncException, InterruptedException {.....} 此外,和Hadoop MapReduce类似,BSP类中还有两个方法可以重载:setup和cleanup

这两个方法分别在一个BSP程序执行前后进行初始化和清理的工作。

一个完整的BSP程序见上一篇日志:http://blog.csdn.net/bhq2010/article/details/8513052

文件I/O

在配置BSP Job时,可以为其指定输入输出格式和路径,和Hadoop很相似,例如:

job.setInputPath(new Path("/tmp/sequence.dat"); job.setInputFormat(org.apache.hama.bsp.SequenceFileInputFormat.class); or, SequenceFileInputFormat.addInputPath(job, new Path("/tmp/sequence.dat")); or, SequenceFileInputFormat.addInputPaths(job, "/tmp/seq1.dat,/tmp/seq2.dat,/tmp/seq3.dat"); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setOutputFormat(TextOutputFormat.class); FileOutputFormat.setOutputPath(job, new Path("/tmp/result")); 其中setInputFormat和setOutputFormat是设置输入和输出文件的格式的,默认是文本格式的,这和Hadoop的setInputFormatClass、setOutputFormatClass作用一样。这样在bsp方法中就可以用BSPPeer类型的参数peer来读取输入文件(通常是在HDFS上)并向输出文件中写入了,例如:

public final void bsp( BSPPeer<LongWritable, Text, Text, LongWritable, Text> peer) throws IOException, InterruptedException, SyncException { // this method reads the next key value record from file KeyValuePair<LongWritable, Text> pair = peer.readNext(); // the following lines do the same: LongWritable key = new LongWritable(); Text value = new Text(); peer.readNext(key, value); // write peer.write(value, key); } 需要重新打开输入文件重新读取,可以用peer.reopenInput()方法。

此外,在bsp中也可已随意访问合法的文件,不过这些文件IO就没法在配置BSP Job时指定,而只能硬编码了。

计算结点间通信

Hama为BSP提供的通信API如下:

方法描述send(String peerName, BSPMessage msg)Sends a message to another peer.getCurrentMessage()Returns a received message.getNumCurrentMessages()Returns the number of received messages.sync()Barrier synchronization.getPeerName()Returns a peer's hostname.getAllPeerNames()Returns all peer's hostname.getSuperstepCount()Returns the count of supersteps

这些都是bsp方法的参数peer的方法,像上面调用read、write方法一样调用即可。

同步

调用peer.sync()方法可以使当前节点进入同步状态,当所有的节点都进入同步状态后,同步完成,开始下一个超步或者结束Job。

[引用请注明出处:http://blog.csdn.net/bhq2010/article/details/8531243]

相关资源:5509A的mcbsp程序
转载请注明原文地址: https://www.6miu.com/read-3200326.html

最新回复(0)