spark快速大数据分析笔记-2

xiaoxiao2021-02-28  98

Spark快速大数据分析第三章-笔记 本章介绍Spark对数据的核心抽象--弹性分布式数据集RDD,其实就是分布式的元素集合。在Spark中,对数据的所有操作不外呼创建RDD,转化已有RDD以及调用RDD操作进行求值。Spark会自动将RDD中的数据分发到集群上,并将操作并行化执行。 RDD基础 Spark中的RDD就是一个不可变的分布式对象集合。每个RDD都被分为多个分区,这些分区运行在集群中的不同点上。RDD可以包含Python,Java,Scala中任意类型的对象,甚至可以包含用户自定义的对象。 创建RDD的两种方法:读取一个外部数据集,或在驱动器程序里分发驱动器程序中的对象集合(比方说list和set)。 RDD支持的两种类型的操作:转化操作(transformation)和行动操作(action)。转化操作会由一个RDD生成一个新的RDD;行动操作会对RDD计算出一个结果,并把结果返回到驱动程序中,或者把结果存储到外部存储系统(如HDFS)中。 转化操作和行动操作的区别在于Spark计算RDD的方式不同,虽然你可以在任何时候定义新的RDD,但是Spark只会惰性计算这些RDD。也就是说,当真的需要计算的时候才会计算。 默认情况下,Spark的RDD会在你每次对它们进行行动操作时重新计算。如果想在多个行动操作中重用同一个RDD,可以使用RDD.persist()让Spark把这个RDD缓存下来。在实际操作中,你会经常用persist()来把数据的一部分读取到内存中,并反复查询这部分数控。 总的来说,每个Spark程序或shell会话都按如下的方式工作。 (1)从外部数据创建输入RDD (2)使用诸如filter()这样的转化操作对RDD进行转化,以定义新的RDD. (3)告诉Spark对需要被重用的中间结果RDD执行persist()操作。 (4)使用行动操作(例如count()和first()等)来触发一次并行计算,Spark会对计算进行优化后再执行。 cache()与使用默认存储级别调用persist()是一样的。 创建RDD。 创建RDD最简单的方式就是把程序中一个已有的集合传给SparkContext的parallelize()方法。比方说 Python中的parallelize()方法 line=sc.parallelize(["pandas","i like pandas"]) JAVA中的parallelize()方法 JavaRDD<String> lines = sc.parallelize(Arrays.asList("pandas","i like pandas")); 不过,更常用的方式是从外部存储中读取数据来创建RDD。比方说使用方法textFile(). Python版本的textFile()方法 line = sc.textFile("/path/to/README.md") Java版本的textFile()方法 JavaRDD<String> lines = sc.textFile("/path/to/READ.md") RDD操作 转化操作:会返回一个新的RDD作为操作的结果。假设我们想从日志文件log.txt中找出其中的错误消息。 Python实现filter()的转化操作 inputRDD = sc.textFile("log.txt") errorsRDD= inputRDD.filter(lambda x:"error" in x) JAVA实现filter()的转化操作 JavaRDD<String> inputRDD = sc.textFile("log.txt"); JavaRDD<String> errorsRDD = inputRDD.filter(new Function<String,Boolean>(){   public Boolean call(String x){   return x.contains("error");   } }); 注意:filter()操作会返回一个全新的RDD,不会改变已有的inputRDD中的数据。inputRDD在后面的程序中还可以继续使用。 Python版本的union()转化操作 errorsRDD = inputRDD.filter(lambda x: "error" in x) warningsRDD = inputRDD.filter(lambda x:"warning" in x) badLinesRDD = errorsRDD.union(warningsRDD) 转化操作可以操作任意数量的输入RDD。 从已有的RDD中派生出新的RDD,Spark会使用谱系图(lineage graph)来记录这些不同RDD之间的依赖关系。也可以靠谱系图在持久化的RDD丢失部分数据时恢复所丢失的数据。

如图Spark3-1.png所示:

行动操作 行动操作会把最终求得的结果返回到驱动程序,或者写入外部存储系统中。如我们可能想输出关于badLinesRDD的一些信息。为此需要使用两个行动操作来实现。用count()来返回计算结果,用take()来收集RDD中的一些元素。 Python版本的对错误进行计数 print "input had"+badLinesRDD.count()+" concerning lines" print "Here are 10 examples:" for line in badLinesRDD.take(10):   print line Java版本中对错误进行计数 System.out.println("Input had "+badLinesRDD.count()+" concerning lines") System.out.println("Here are 10 examples:") for(String line:badLinesRDD.take(10)){   System.out.println(line); } RDD中还有一个collect()函数,可以用来获取整个RDD中的数据。如果你的程序把RDD筛选到一个很小的规模,并且你想在本地处理这些数据,就可以使用它。但是注意collect()不能在大规模数据集中使用。另外每当我们调用一个新的行动操作时,整个RDD都会从头开始计算。要避免这种低效的行为,用户可以将中间结果持久化。 惰性求值 惰性求值意味着当我们对RDD调用转化操作时,操作不会立即执行,相反,Spark会在内部激励下所要求执行的操作的相关信息。把数据读取到RDD的操作也同样是惰性的。因此当我们调用sc.textFile()时,数据并没有读取进来,而是在必要时才会读取,和转化操作一样的是,读取数据的操作也有可能会多次执行。 虽然转化操作是惰性求值的,但你还是可以随时通过运行一个行动操作来强制Spark执行RDD的转化操作比如使用count().这是一种对你所写的程序进行部分测试的简单方法。 向Spark传递函数 Python版本 三种方式来把函数传递给Spark.传递比较短的函数时,可以使用lambda表达式传递。 如: word=rdd.filter(lambda s:"error" in s) def containsError(s):     return "error" in s word = rdd.filter(containsError) 除了lambda表达式,我们也可以传递顶层函数或是定义的局部函数。 传递函数的注意事项:Python会在你不经意间把函数所在的对象也序列化传递出去。当你传递的对象是某个对象的成员,或者包含了某个对象中一个字段的引用时,spark就会把整个对象发到工作节点上。如果遇到无法序列化的对象,也会导致你的程序失败。替代方案时,把你需要传递的字段单独放到一个局部变量中,然后传递着个局部变量。 JAVA版本

在java中,函数需要作为实现了Spark的org.apache.spark.api.java.function包中的任一函数接口的对象来传递。根据不同的返回类型,有不同的接口。最基本的接口如图spark3-2.png所示:

可以把我们的函数类内联定义为使用匿名内部类,也可以创建一个具名类: 例如在Java中使用匿名内部类进行函数传递 RDD<String> errors = lines.filter(new Function<String,Boolean>(){   public Boolean call(String x){   return x.contains("error");   } }); 在Java中使用具名类进行函数传递 class ContainsError implements Function<String,Boolean>(){   public Boolean call(String x){   return x.contains("error");   } } RDD<String> errors = line.filter(new ContainsError()); 例如带参数的Java函数类 class Contains implements Function<Sting,Boolean>(){   private String query;   public Contains(String query){   this.query = query;   }   public Boolean call(String x){   return x.contains(query);   } } RDD<String> errors = line.filter(new Contains("error")); JAVA8的lambda表达式进行传递函数: RDD<String> errors =line.filter(s->s.contains("error")); 常见的转换操作和行动操作 1,针对各个元素的转化操作 最常用的就是map()和filter().map()接收一个函数,把这个函数用于RDD中的每个元素,将函数的返回结果作为结果RDD中对应元素的值,而filter()则接收一个函数,并将RDD中满足该函数的元素放入新的RDD中返回。 比方说使用map()函数来对RDD中的所有数求平方。 Python版本: nums = sc.parallelize([1,2,3,4]) squared=nums.map(lambda x:x*x).collect(); for num in squared:     print "%i " %(num) java版本: SparkConf conf = new SparkConf().setMaster("local").setAppName("squared");  JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4)); JavaRDD<Integer> result = rdd.map(new Function<Integer,Integer>(){   public Integer call(Integer x){   return x*x;   } }); System.out.println(StringUtils.join(result.collect(),",")); 如果你想对每个输入元素生成多个输出元素,实现该功能的操作叫作flatmap().我们提供给flatmap()的函数被分别应用到了输入RDD的每个元素上。不过返回的不是一个元素,而是返回值序列的迭代器。我们得到的是一个包含各个迭代器可访问的所有元素的RDD。flatmap()的一个简单用途是把输入的字符串切分为单词: Python版本 lines = sc.parallelize(["hello world","hi"]) words = lines.flatMap(lambda line:line.split(" ")) words.first() #返回“hello" Java版本 JavaRDD<String> lines = sc.parallelize(Arrays.asList("Hello world","hi")); JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String,String>(){   public Iterable<String> call(String line){   return Arrays.asList(line.split(" "));   } }); words.first();//返回Hello

图Spark3-3阐释了flatMap()和map()的区别

伪集合的操作 我们的RDD中最常缺失的集合属性是元素的唯一性,因为常常有重复元素,如果只要唯一的元素,可以通过使用RDD.distinct()转化操作来生成一个只包含不同元素的新RDD。

图Spark3-4总结了常见的RDD转化操作

行动操作 最常见的行动操作是reduce(),它接收一个函数作为参赛,这个函数要操作两个RDD的元素类型的数据并返回一个同样类型的新元素。 Python中的reduce() sum = rdd.reduce(lambda x,y:x+y) Java中的reduce() Integer sum = rdd.reduce(new Function2<Integer,Integer,Integer>(){   public Integer call(Integer x,Integer y){   return x+y;   } }) fold()和reduce()类似,接收一个与reduce()接收的函数签名相同的函数,再加上一个“初始值”来作为每个分区第一次调用时的结果。 我们可以用aggregate()来计算RDD的平均值,代替map()后面接fold()的方式。 Python中的aggregate() sumCount=nums.aggregate((0,0),(lambda acc,value:(acc[0]+value,acc[1]+1),(lambda acc1,acc2:(acc1[0]+acc2[0],acc1[1]+acc2[1])))) return sumCount[0]/float(sumCount[1]) Java中的aggregate() class AvgCount implements Serializable{   public AvgCount(int total,int num){   this.total=total;   this.num=num;   }   public int num;   public int total;   public double avg(){   return total/(double)num;   } } Function2<AvgCount,Integer,AvgCount> addAndCount = new Function2<AvgCount,Integer,AvgCount>(){   public AvgCount call(AvgCount a,Integer x){   a.total+=x;   a.num+=1;   return a;   } }; Function2<AvgCount,AvgCount,AvgCount> combine = new Function2<AvgCount,AvgCount,AvgCount>(){   public AvgCount call(AvgCount a,AvgCount b){   a.total+=b.total;   a.num+=b.num;   return a;   } }; AvgCount initial = new AvgCount(0,0); AvgCount result = rdd.aggregate(initial,addAndCount,combine); System.out.println(result.avg()); collect()把数据返回驱动程序中。 take(n)返回RDD中的n个元素。 top()从RDD中获取前几个元素。

图Spark3-5.png展示了基本的RDD行动操作:

在不同RDD类型间转换 有些函数只能用于特定类型的RDD,比如mean()和variance()只能用在数值RDD上。而join()只能用在键值对RDD上。 在java中,各种RDD的特殊类型间的转换更为明确。Java中有两个专门的类,JavaDoubleRDD和JavaPairRDD,来处理特殊的RDD。要构建出这些特殊类型到的RDD,需要使用特殊版本的类来代替一般使用的Function类,如果要从T类型的RDD创建出一个DoubleRDD,我们应该使用DoubleFunction<T>来代替Function<T,Double>.

Java中针对专门类型的函数接口如图Spark3-6.png

用Java创建DoubleRDD JavaDoubleRDD result = rdd.mapToDouble( new DoubleFunction<Integer>(){   public double call(Integer x){   return (double) x*x;   } }); System.out.println(result.mean()); 持久化(缓存) 为了避免多次计算同一个RDD,可以让Spark对数据进行持久化。当我们让Spark持久化存储一个RDD时,计算出RDD的节点会分别保存它们所求出的分区数据。如果一个有持久化数据的节点发送故障,Spark会在需要用到缓存的数据时重算丢失的数据分区。在java中,默认情况下persist()会把数据以序列化的形式缓存在jvm的堆空间中。RDD中还有一个方法unpersist(),调用该方法可以手动把持久化的RDD从缓存中移除。
转载请注明原文地址: https://www.6miu.com/read-2250321.html

最新回复(0)