Spark可以通过不同途径读取和存储数据。以下是基于java编程的。
一,读存文本文件里的数据
1,读取数据;file是文件路径
SparkConf conf = new SparkConf().setAppName("Simple Application");JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> inputData = sc.textFile(file);
sc.stop();
2,存储数据
JavaRDD<String> errorsRDD = inputData.filter(x -> x.contains("error"));
errorsRDD.saveAsTextFile("E:\\errorsspark");
二,读存json数据
//json数据对应的java类
public class Person implements Serializable { private static final long serialVersionUID = 1L; private String name; private int age; public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } @Override public String toString() { return "Person{" + "name='" + name + '\'' + ", age=" + age + '}'; }}
1,读取json数据;
public class ParseJson implements FlatMapFunction<Iterator<String>, Person> { @Override public Iterator<Person> call(Iterator<String> lines) throws Exception { ArrayList<Person> people = new ArrayList<>(); ObjectMapper mapper = new ObjectMapper(); while (lines.hasNext()) { String line = lines.next(); try { people.add(mapper.readValue(line, Person.class)); } catch (Exception e) { // 跳过失败的数据 } } return people.iterator(); }
}
JavaRDD<String> input = sc.textFile("E:\\file.json").map(p -> new String(p.getBytes(), 0, p.length(), "utf-8"));//默认是utf-8,需要和文件编码相同,我是将txt默认编码改成了utf-8
JavaRDD<Person> result = input.mapPartitions(new ParseJson())
.filter(
x->x.getName().equals("gch"));
2,存储json数据
public class WriteJson implements FlatMapFunction<Iterator<Person>, String> { @Override public Iterator<String> call(Iterator<Person> people) throws Exception { ArrayList<String> text = new ArrayList<>(); ObjectMapper mapper = new ObjectMapper(); while (people.hasNext()) { Person person = people.next(); text.add(mapper.writeValueAsString(person)); } return text.iterator(); }
}
JavaRDD<String> formatted = result.mapPartitions(new WriteJson());//result是上面的获取到的
formatted.saveAsTextFile("E:\\wjson");
三,读存CSV数据
1,读取CSV 数据,需要把文件当作普通文本文件来读取数据,再对数据进行处理。Java中使用的是opencsv库。
如果字段中没有换行符
import au.com.bytecode.opencsv.CSVReader;import Java.io.StringReader;
public class ParseLine implements Function<String, String[]> { public String[] call(String line) throws Exception { CSVReader reader = new CSVReader(new StringReader(line)); return reader.readNext(); }
}
JavaRDD<String> csvFile = sc.textFile(inputFile);
JavaPairRDD<String[]> csvData = csvFile.map(new ParseLine());
如果字段中有换行符,读取整个文件再处理
public static class ParseLine implements FlatMapFunction<Tuple2<String, String>, String[]> { public Iterable<String[]> call(Tuple2<String, String> file) throws Exception { CSVReader reader = new CSVReader(new StringReader(file)); return reader.readAll(); }}JavaPairRDD<String, String> csvData = sc.wholeTextFiles(inputFile);JavaRDD<String[]> keyedRDD = csvData.flatMap(new ParseLine());
2,存储CSV
