刚开始用Spark,操作dataframe不是很熟练,遇到的第一个问题是给dataframe添加索引列,查阅了网上的一些教程,大都是用Scala语言编写的代码,下面给出自己用python写的三种方法。
方法一:先创建Pandas版本的dataframe,然后带索引保存为本地文件,再用SparkSession来创建DataFrame。方法二:先创建Pandas版本的dataframe,添加一个索引列后,再直接转换成Spark版本的dataframe。方法三:直接利用Spark的select方法,新增列的过程包含在自定义的udf函数中。1.先创建一个Pandas版本的dataframe,从本地csv文件导入数据。
import pandas as pd from pyspark.sql import * # build a dataframe pandas_df = pd.read_csv('/home/hadoop/PycharmProjects/wjw/ftdd/ftdd/data/net_data.csv') print(pandas_df)显示结果如下 2.然后添加索引列,从1开始编号,并保存为本地文件。
pandas_df.index = range(1, len(pandas_df) + 1) # 索引值设置为从1开始 print(pandas_df) pandas_df.to_csv('/home/hadoop/PycharmProjects/wjw/ftdd/ftdd/data/net_data01.csv', index=True, index_label='index')显示结果 3.再用SparkSession来创建DataFrame。
spark = SparkSession.builder.getOrCreate() df = spark.read.csv('/home/hadoop/PycharmProjects/wjw/ftdd/ftdd/data/net_data01.csv', header=True) df.show()结果如下
1.第一步同方法一。 2.先添加一个索引列,insert方法可以设置列的插入位置。
pandas_df.index = range(1, len(pandas_df) + 1) # set index from 1 pandas_index = pandas_df.index # 将dataframe的索引赋给一个变量 pandas_df.insert(0, 'index', pandas_index) # 第一个参数是列插入的位置 print(pandas_df)结果如下 3. 再直接转换成Spark版本的dataframe
spark = SparkSession.builder.getOrCreate() spark_df = spark.createDataFrame(pandas_df) spark_df.show()结果如下
1.用SparkSession来创建dataframe,从本地直接读取数据。
from pyspark.sql import SparkSession from pyspark.sql.functions import udf, col from pyspark.sql.types import IntegerType spark = SparkSession.builder.getOrCreate() spark_df = spark.read.csv('/home/hadoop/PycharmProjects/wjw/ftdd/ftdd/data/net_data.csv', header=True) spark_df.show()结果如下 2. 定义自己的udf函数,来创建索引列。
index_list = [x for x in range(1, spark_df.count()+1)] # 构造一个列表存储索引值,用生成器会出错 idx = 0 # 定义一个函数 def set_index(x): global idx # 将idx设置为全局变量 if x is not None: idx += 1 return index_list[idx-1]3.调用select方法来添加索引列。
index = udf(set_index, IntegerType()) # udf的注册,这里需要定义其返回值类型 spark_df.select(col("*"), index("cab_id").alias("index")).show() # udf的注册的使用,alias方法用于修改列名结果如下
