pySpark 关于DS.foreachRDD与rdd.foreachPartition 绑定自有参数问题

xiaoxiao2021-02-28  22

刚开始研究spark,打算使用python作为spark的快速开发语言

将函数基础类都归并到同一文件内

由于python 序列化pickle无法序列化嵌套字(链接类)对象(事实上是有坑,很深的坑),

所以需要再partition内的函数建立对应链接进行数据库累加操作

需要将外部输入的数据参数动态配置到函数内

使用到了python的偏函数概念

functions.partial

将函数封装后丢入到DS.foreachRDD内

代码1

statistics_data_func = functools.partial( function_dict["statistics_data"], mysql_dict=self.parameterDict["MYSQL_DICT"] ) statistics_data_func.__dict__["__code__"] = function_dict["statistics_data"].__code__ binding_receive_error_func = functools.partial( function_dict["binding_receive_error"], save_path_prefix=self.parameterDict["SAVE_PATH_PREFIX"] ) binding_receive_error_func.__dict__["__code__"] = function_dict["binding_receive_error"].__code__ ds.filter(lambda x: x['error_data'] == '0').foreachRDD( statistics_data_func ) ds.filter(lambda x: x['error_data'] != '0'). \ map(lambda x: x['error_data']). \ foreachRDD( binding_receive_error_func )

但是会抛异常,为

AttributeError: 'functools.partial' object has no attribute '__code__'

原因是functools.partial源码中__dict__参数为None

且DS.foreachRDD会调用函数

def foreachRDD(self, func): """ Apply a function to each RDD in this DStream. """ if func.__code__.co_argcount == 1: old_func = func func = lambda t, rdd: old_func(rdd) jfunc = TransformFunction(self._sc, func, self._jrdd_deserializer) api = self._ssc._jvm.PythonDStream api.callForeachRDD(self._jdstream, jfunc)

当函数参数为1的时候函数增加默认参数t

大于1的时候直接传入数据,这就导致参数对应不上

所以在代码一种将导出的偏函数的__dict__增加属性__code__,属性值为原函数的__code__

且默认DS.foreachRDD会传入两个参数,所以如果不增加一个无用的占位参数接收符,就会出现

TypeError: xxx() got multiple values for keyword argument 'yyy' 异常

所以在自定义函数中需要增加一个无用的占位参数

def statistics_data(_, normal_rdd, mysql_dict):

这样就可以完成数据的自定义传输了

接下来是

rdd.foreachPartition

不会默认给函数传输其他的变量

且是此函数不会调用func.__code__值

所以在配置次方法的函数是的时候注意参数顺序就可以了,贴如下代码

def dml_by_receive_count(partition, mysql_dict): mysql_tool = MysqlTool(mysql_dict) mysql_tool.get_connect() .foreachPartition(functools.partial(dml_by_receive_count, mysql_dict=mysql_dict) )

就可以完成调用了

转载请注明原文地址: https://www.6miu.com/read-2350237.html

最新回复(0)