Spark之数据读存

xiaoxiao2021-02-28  55

Spark可以通过不同途径读取和存储数据。以下是基于java编程的。

一,读存文本文件里的数据

1,读取数据;file是文件路径

SparkConf conf = new SparkConf().setAppName("Simple Application");JavaSparkContext sc = new JavaSparkContext(conf);

JavaRDD<String> inputData = sc.textFile(file);

sc.stop();

2,存储数据

JavaRDD<String> errorsRDD = inputData.filter(x -> x.contains("error"));

errorsRDD.saveAsTextFile("E:\\errorsspark");

二,读存json数据

//json数据对应的java类

public class Person implements Serializable {    private static final long serialVersionUID = 1L;    private String name;    private int age;    public String getName() {        return name;    }    public void setName(String name) {        this.name = name;    }    public int getAge() {        return age;    }    public void setAge(int age) {        this.age = age;    }    @Override    public String toString() {        return "Person{" +                "name='" + name + '\'' +                ", age=" + age +                '}';    }}

1,读取json数据;

public class ParseJson implements FlatMapFunction<Iterator<String>, Person> {    @Override    public Iterator<Person> call(Iterator<String> lines) throws Exception {        ArrayList<Person> people = new ArrayList<>();        ObjectMapper mapper = new ObjectMapper();        while (lines.hasNext()) {            String line = lines.next();            try {                people.add(mapper.readValue(line, Person.class));            } catch (Exception e) {                // 跳过失败的数据            }        }        return people.iterator();    }

}

JavaRDD<String> input = sc.textFile("E:\\file.json").map(p -> new String(p.getBytes(), 0, p.length(), "utf-8"));//默认是utf-8,需要和文件编码相同,我是将txt默认编码改成了utf-8

JavaRDD<Person> result = input.mapPartitions(new ParseJson())

                .filter(

                        x->x.getName().equals("gch")

                );

2,存储json数据

public class WriteJson implements FlatMapFunction<Iterator<Person>, String> {    @Override    public Iterator<String> call(Iterator<Person> people) throws Exception {        ArrayList<String> text = new ArrayList<>();        ObjectMapper mapper = new ObjectMapper();        while (people.hasNext()) {            Person person = people.next();            text.add(mapper.writeValueAsString(person));        }        return text.iterator();    }

}

JavaRDD<String> formatted = result.mapPartitions(new WriteJson());//result是上面的获取到的

formatted.saveAsTextFile("E:\\wjson");

三,读存CSV数据

1,读取CSV 数据,需要把文件当作普通文本文件来读取数据,再对数据进行处理。Java中使用的是opencsv库。

如果字段中没有换行符

import au.com.bytecode.opencsv.CSVReader;import Java.io.StringReader;

public class ParseLine implements Function<String, String[]> {    public String[] call(String line) throws Exception {        CSVReader reader = new CSVReader(new StringReader(line));        return reader.readNext();    }

}

JavaRDD<String> csvFile = sc.textFile(inputFile);

JavaPairRDD<String[]> csvData = csvFile.map(new ParseLine());

如果字段中有换行符,读取整个文件再处理

public static class ParseLine implements FlatMapFunction<Tuple2<String, String>, String[]> {    public Iterable<String[]> call(Tuple2<String, String> file) throws Exception {        CSVReader reader = new CSVReader(new StringReader(file));        return reader.readAll();    }}JavaPairRDD<String, String> csvData = sc.wholeTextFiles(inputFile);JavaRDD<String[]> keyedRDD = csvData.flatMap(new ParseLine());

2,存储CSV

转载请注明原文地址: https://www.6miu.com/read-2623179.html

最新回复(0)