如图Spark3-1.png所示:
行动操作 行动操作会把最终求得的结果返回到驱动程序,或者写入外部存储系统中。如我们可能想输出关于badLinesRDD的一些信息。为此需要使用两个行动操作来实现。用count()来返回计算结果,用take()来收集RDD中的一些元素。 Python版本的对错误进行计数 print "input had"+badLinesRDD.count()+" concerning lines" print "Here are 10 examples:" for line in badLinesRDD.take(10): print line Java版本中对错误进行计数 System.out.println("Input had "+badLinesRDD.count()+" concerning lines") System.out.println("Here are 10 examples:") for(String line:badLinesRDD.take(10)){ System.out.println(line); } RDD中还有一个collect()函数,可以用来获取整个RDD中的数据。如果你的程序把RDD筛选到一个很小的规模,并且你想在本地处理这些数据,就可以使用它。但是注意collect()不能在大规模数据集中使用。另外每当我们调用一个新的行动操作时,整个RDD都会从头开始计算。要避免这种低效的行为,用户可以将中间结果持久化。 惰性求值 惰性求值意味着当我们对RDD调用转化操作时,操作不会立即执行,相反,Spark会在内部激励下所要求执行的操作的相关信息。把数据读取到RDD的操作也同样是惰性的。因此当我们调用sc.textFile()时,数据并没有读取进来,而是在必要时才会读取,和转化操作一样的是,读取数据的操作也有可能会多次执行。 虽然转化操作是惰性求值的,但你还是可以随时通过运行一个行动操作来强制Spark执行RDD的转化操作比如使用count().这是一种对你所写的程序进行部分测试的简单方法。 向Spark传递函数 Python版本 三种方式来把函数传递给Spark.传递比较短的函数时,可以使用lambda表达式传递。 如: word=rdd.filter(lambda s:"error" in s) def containsError(s): return "error" in s word = rdd.filter(containsError) 除了lambda表达式,我们也可以传递顶层函数或是定义的局部函数。 传递函数的注意事项:Python会在你不经意间把函数所在的对象也序列化传递出去。当你传递的对象是某个对象的成员,或者包含了某个对象中一个字段的引用时,spark就会把整个对象发到工作节点上。如果遇到无法序列化的对象,也会导致你的程序失败。替代方案时,把你需要传递的字段单独放到一个局部变量中,然后传递着个局部变量。 JAVA版本
在java中,函数需要作为实现了Spark的org.apache.spark.api.java.function包中的任一函数接口的对象来传递。根据不同的返回类型,有不同的接口。最基本的接口如图spark3-2.png所示:
可以把我们的函数类内联定义为使用匿名内部类,也可以创建一个具名类: 例如在Java中使用匿名内部类进行函数传递 RDD<String> errors = lines.filter(new Function<String,Boolean>(){ public Boolean call(String x){ return x.contains("error"); } }); 在Java中使用具名类进行函数传递 class ContainsError implements Function<String,Boolean>(){ public Boolean call(String x){ return x.contains("error"); } } RDD<String> errors = line.filter(new ContainsError()); 例如带参数的Java函数类 class Contains implements Function<Sting,Boolean>(){ private String query; public Contains(String query){ this.query = query; } public Boolean call(String x){ return x.contains(query); } } RDD<String> errors = line.filter(new Contains("error")); JAVA8的lambda表达式进行传递函数: RDD<String> errors =line.filter(s->s.contains("error")); 常见的转换操作和行动操作 1,针对各个元素的转化操作 最常用的就是map()和filter().map()接收一个函数,把这个函数用于RDD中的每个元素,将函数的返回结果作为结果RDD中对应元素的值,而filter()则接收一个函数,并将RDD中满足该函数的元素放入新的RDD中返回。 比方说使用map()函数来对RDD中的所有数求平方。 Python版本: nums = sc.parallelize([1,2,3,4]) squared=nums.map(lambda x:x*x).collect(); for num in squared: print "%i " %(num) java版本: SparkConf conf = new SparkConf().setMaster("local").setAppName("squared"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4)); JavaRDD<Integer> result = rdd.map(new Function<Integer,Integer>(){ public Integer call(Integer x){ return x*x; } }); System.out.println(StringUtils.join(result.collect(),",")); 如果你想对每个输入元素生成多个输出元素,实现该功能的操作叫作flatmap().我们提供给flatmap()的函数被分别应用到了输入RDD的每个元素上。不过返回的不是一个元素,而是返回值序列的迭代器。我们得到的是一个包含各个迭代器可访问的所有元素的RDD。flatmap()的一个简单用途是把输入的字符串切分为单词: Python版本 lines = sc.parallelize(["hello world","hi"]) words = lines.flatMap(lambda line:line.split(" ")) words.first() #返回“hello" Java版本 JavaRDD<String> lines = sc.parallelize(Arrays.asList("Hello world","hi")); JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String,String>(){ public Iterable<String> call(String line){ return Arrays.asList(line.split(" ")); } }); words.first();//返回Hello
图Spark3-3阐释了flatMap()和map()的区别
伪集合的操作 我们的RDD中最常缺失的集合属性是元素的唯一性,因为常常有重复元素,如果只要唯一的元素,可以通过使用RDD.distinct()转化操作来生成一个只包含不同元素的新RDD。
图Spark3-4总结了常见的RDD转化操作
行动操作 最常见的行动操作是reduce(),它接收一个函数作为参赛,这个函数要操作两个RDD的元素类型的数据并返回一个同样类型的新元素。 Python中的reduce() sum = rdd.reduce(lambda x,y:x+y) Java中的reduce() Integer sum = rdd.reduce(new Function2<Integer,Integer,Integer>(){ public Integer call(Integer x,Integer y){ return x+y; } }) fold()和reduce()类似,接收一个与reduce()接收的函数签名相同的函数,再加上一个“初始值”来作为每个分区第一次调用时的结果。 我们可以用aggregate()来计算RDD的平均值,代替map()后面接fold()的方式。 Python中的aggregate() sumCount=nums.aggregate((0,0),(lambda acc,value:(acc[0]+value,acc[1]+1),(lambda acc1,acc2:(acc1[0]+acc2[0],acc1[1]+acc2[1])))) return sumCount[0]/float(sumCount[1]) Java中的aggregate() class AvgCount implements Serializable{ public AvgCount(int total,int num){ this.total=total; this.num=num; } public int num; public int total; public double avg(){ return total/(double)num; } } Function2<AvgCount,Integer,AvgCount> addAndCount = new Function2<AvgCount,Integer,AvgCount>(){ public AvgCount call(AvgCount a,Integer x){ a.total+=x; a.num+=1; return a; } }; Function2<AvgCount,AvgCount,AvgCount> combine = new Function2<AvgCount,AvgCount,AvgCount>(){ public AvgCount call(AvgCount a,AvgCount b){ a.total+=b.total; a.num+=b.num; return a; } }; AvgCount initial = new AvgCount(0,0); AvgCount result = rdd.aggregate(initial,addAndCount,combine); System.out.println(result.avg()); collect()把数据返回驱动程序中。 take(n)返回RDD中的n个元素。 top()从RDD中获取前几个元素。
图Spark3-5.png展示了基本的RDD行动操作:
在不同RDD类型间转换 有些函数只能用于特定类型的RDD,比如mean()和variance()只能用在数值RDD上。而join()只能用在键值对RDD上。 在java中,各种RDD的特殊类型间的转换更为明确。Java中有两个专门的类,JavaDoubleRDD和JavaPairRDD,来处理特殊的RDD。要构建出这些特殊类型到的RDD,需要使用特殊版本的类来代替一般使用的Function类,如果要从T类型的RDD创建出一个DoubleRDD,我们应该使用DoubleFunction<T>来代替Function<T,Double>.
Java中针对专门类型的函数接口如图Spark3-6.png
用Java创建DoubleRDD JavaDoubleRDD result = rdd.mapToDouble( new DoubleFunction<Integer>(){ public double call(Integer x){ return (double) x*x; } }); System.out.println(result.mean()); 持久化(缓存) 为了避免多次计算同一个RDD,可以让Spark对数据进行持久化。当我们让Spark持久化存储一个RDD时,计算出RDD的节点会分别保存它们所求出的分区数据。如果一个有持久化数据的节点发送故障,Spark会在需要用到缓存的数据时重算丢失的数据分区。在java中,默认情况下persist()会把数据以序列化的形式缓存在jvm的堆空间中。RDD中还有一个方法unpersist(),调用该方法可以手动把持久化的RDD从缓存中移除。