importjava.net.URI;
import java.util.Arrays;
import java.io.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.FileSystem;
public class mysparktest {
public static void main(String[] args) throws IOException{
String uri = "hdfs://192.168.217.132:9000/unit/xferlog";
String uro = "hdfs://192.168.217.132:9000/unit/xferlogoutput" ;
Configuration conf = new Configuration();
try {
//打开文件系统
FileSystem fs = FileSystem. get(URI.create (uri), conf);
//打开文件输入流
FSDataInputStream in = fs.open( new Path(uri));
//文件读取
byte[] ioBuffer = new byte[1024];
int readLen = in.read(ioBuffer);
while(readLen!=-1)
{
readLen = in.read(ioBuffer);
}
String str = new String(ioBuffer);
int cnt=0;
while(( int)(str.charAt(cnt)) != 0)cnt++;
str=str.substring(0, cnt);
System. out.println(str);
in.close();
//文件的删除
fs.delete( new Path(uri), true);
//写入到新文件中
FSDataOutputStream out = fs.create( new Path(uro));
out.write(( "new1"+str).getBytes( "UTF-8"));
out.flush();
out. sync();
out.close();
fs.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
public final class mysparktest {
private static final Pattern SPACE = Pattern. compile(" ");
public static void main(String[] args) throws Exception {
String uri = "hdfs://Master:9000/unit/xferlog";
String uro = "hdfs://Master:9000/unit/xferlog1";
// if (args.length < 1) {
// System.err.println("Usage: JavaWordCount
");
// System.exit(1);
// }
//创建SparkConf,包含application的相关信息
SparkConf sparkConf = new SparkConf().setAppName( "JavaWordCount");
//创建一个JavaSparkContext对象
JavaSparkContext sc = new JavaSparkContext(sparkConf);
//textFile()方法可将本地文件或HDFS文件转换成RDD,读取本地文件需要各节点上都存在,或者通过网络共享该文件
//读取一行
JavaRDD
lines = sc.textFile(uri, 1);
//flatMap与map的区别是,对每个输入,flatMap会生成一个或多个的输出,而map只是生成单一的输出
//用空格分割各个单词,输入一行,输出多个对象,所以用flatMap
JavaRDD
words = lines.flatMap( new FlatMapFunction
() {
@Override
public Iterable
call(String s) { return Arrays.asList(SPACE.split(s)); } }); //对每个单词生成key-value对,PairFunction
//表示输入类型为T,生成的key-value对中的key类型为k,value类型为v,对本例,T=String, K=String, V=Integer(计数) //重写scala的Tupple2方法 JavaPairRDD
ones = words.mapToPair( new PairFunction
() { @Override //scala.Tuple2
call(T t) //Tuple2为scala中的一个对象,call方法的输入参数为T,即输入一个单词s,新的Tuple2对象的key为这个单词,计数为1 public Tuple2
call(String s) { return new Tuple2
(s, 1); } }); //调用reduceByKey方法,按key值进行reduce //调用Function2对象,Function2
//输入两个参数,T1,T2,返回R //若ones有<"one", 1>, <"one", 1>,会根据"one"将相同的pair单词个数进行统计,输入为Integer,输出也为Integer //输出<"one", 2> JavaPairRDD
counts = ones. reduceByKey(new Function2
() { @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); //将结果保存到HDFS中 counts.saveAsTextFile(uro); //collect返回一个包含RDD内所有元素的Array List
> output = counts.collect(); for (Tuple2
tuple : output) { System.out.println(tuple._1() + ": " + tuple._2()); } sc.stop(); } }importorg.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; import java.util.Arrays; import java.util.List; import java.util.regex.Pattern; public final class mysparktest { private static final Pattern SPACE = Pattern. compile(" "); public static void main(String[] args) throws Exception { String uri = "hdfs://Master:9000/unit/xferlog"; String uro = "hdfs://Master:9000/unit/xferlog1" ; SparkConf sparkConf = new SparkConf().setAppName( "JavaWordCount"); JavaSparkContext ctx = new JavaSparkContext(sparkConf); //也可以使用ctx获取环境变量,例如下面的语句 System.out.println("spark home:"+ctx.getSparkHome()); //一次一行,String类型 ,还有 hadoopfile,sequenceFile什么的 ,可以直接用sc.textFile("path") JavaRDD
lines = ctx.textFile(uri, 1); //java.lang.String path, int minSplits lines.cache(); //cache,暂时放在缓存中,一般用于哪些可能需要多次使用的RDD,据说这样会减少运行时间 //collect方法,用于将RDD类型转化为java基本类型,如下 List
line = lines.collect(); for(String val:line) System. out.println(val); //下面这些也是RDD的常用函数 // lines.collect(); List
// lines.union(); javaRDD
// lines.top(1); List
// lines.count(); long // lines.countByValue(); /** * filter test * 定义一个返回 bool类型的函数,spark运行filter的时候会过滤掉那些返回只为false的数据 * String s,中的变量s可以认为就是变量lines(lines可以理解为一系列的String类型数据)的每一条数据 */ JavaRDD
contaninsE = lines.filter( new Function
() { public Boolean call(String s) throws Exception { return (s.contains( "passwd")); } }); System.out.println("--------------next filter's result------------------"); line = contaninsE.collect(); for(String val:line) System. out.println(val); /** * sample test * sample函数使用很简单,用于对数据进行抽样 * 参数为:withReplacement: Boolean, fraction: Double, seed: Int * */ JavaRDD
sampletest = lines.sample( false,0.1,5); System.out.println("-------------next sample-------------------"); line = sampletest.collect(); for(String val:line) System. out.println(val); /** * * new FlatMapFunction
两个string分别代表输入和输出类型 * Override的call方法需要自己实现一个转换的方法,并返回一个 Iterable的结构 * * flatmap属于一类非常常用的spark函数,简单的说作用就是将一条 rdd数据使用你定义的函数给分解成多条 rdd数据 * 例如,当前状态下,lines这个 rdd类型的变量中,每一条数据都是一行String,我们现在想把他拆分成1个个的词的话, * 可以这样写 : */ JavaRDD
words = lines.flatMap( new FlatMapFunction
() { @Override public Iterable
call(String s) { String[] words=s.split( " "); return Arrays. asList(words); } }); /** * map 键值对 ,类似于MR的map方法 * pairFunction
: T:输入类型;K,V:输出键值对 * 需要重写call方法实现转换 */ JavaPairRDD
ones = words.mapToPair( new PairFunction
() { @Override public Tuple2
call(String s) { return new Tuple2
(s, 1); } }); //A two-argument function that takes arguments // of type T1 and T2 and returns an R. /** * reduceByKey方法,类似于MR的reduce * 要求被操作的数据(即下面实例中的ones)是KV键值对形式,该方法会按照key相同的进行聚合,在两两运算 */ JavaPairRDD
counts = ones.reduceByKey( new Function2
() { @Override public Integer call(Integer i1, Integer i2) { //reduce阶段,key相同的value怎么处理的问题 return i1 + i2; } }); //备注:spark也有reduce方法,输入数据是RDD类型就可以,不需要键值对, // reduce方法会对输入进来的所有数据进行两两运算 /** * sort,顾名思义,排序 */ JavaPairRDD
sort = counts.sortByKey(); System.out.println("----------next sort----------------------"); /** * collect方法其实之前已经出现了多次,该方法用于将spark的RDD类型转化为我们熟知的java常见类型 */ List
> output = sort.collect(); for (Tuple2
tuple : output) { System. out.println(tuple. _1 + ": " + tuple._2()); } /** * 保存函数,数据输出,spark为结果输出提供了很多接口 */ // sort.saveAsTextFile( uro); // sort.saveAsNewAPIHadoopFile(); // sort.saveAsHadoopFile(); System.exit(0); } }