向spark传递函数
传递的函数及其引用的数据需要时可序列化的(实现了java的Serializable接口)。如果在Scala 中出现了NotSerializableException,通常问题就在于我们传递了一个不可序列化的类中的函数或字段。把需要的字段放到一个局部变量中,来避免传递包含该字段的整个对象
class SearchFunctions(val query: String) { def isMatch(s: String): Boolean = { s.contains(query) } def getMatchesFunctionReference(rdd: RDD[String]): RDD[String] = { // 问题:"isMatch"表示"this.isMatch",因此我们要传递整个"this" rdd.map(isMatch) } def getMatchesFieldReference(rdd: RDD[String]): RDD[String] = { // 问题:"query"表示"this.query",因此我们要传递整个"this" rdd.map(x => x.split(query)) } def getMatchesNoReference(rdd: RDD[String]): RDD[String] = { // 安全:只把我们需要的字段拿出来放入局部变量中 val query_ = this.query rdd.map(x => x.split(query_)) } }持久化(缓存)为了避免多次计算同一个RDD即多次执行rdd的action算子,可以让Spark 对数据进行持久化。在scala默认情况下persist()会把数据以序列化的形式缓存在jvm的堆空间中。
collect()获取整个RDD中的数据;但是,只有当整个数据集能在单台机器的内存中放得下时,才能使用collect(),因此collect()不能用在大规模数据集上distinct() 转化操作来生成一个只包含不同元素的新RDD。不过需要注意,distinct() 操作的开销很大,因为它需要将所有数据通过网络进行混洗(shuffle),以确保每个元素都只有一份。分区