暂时的依赖关系如下:
代码如下:【仅为测试功能,最简单的WordCount】
PS:注释掉前几行是因为我想把提交运行的代码与计算逻辑的代码分开。
object RemoteWordCount { def main(args: Array[String]) { //Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN) //Logger.getLogger("org.apache.spark").setLevel(Level.WARN) //val input = "hdfs://host0.com:8020/user/attop/test_data/movies.dat" //val output = "file:///E:/tmp20180502" //val master = "spark://host1.com:7077" //val conf = new SparkConf().setAppName("WordCountScala").setMaster(master) val conf = new SparkConf().setAppName("WordCountRemote") val sc = new SparkContext(conf) val lines = sc.textFile(args(0)) //lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).collect().foreach(println) val kvWordRDD = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) kvWordRDD.foreach(word => println(word._1 + "出现了" + word._2 + "次。")) // 排序之后,默认为ascending升序(排序完成再把key-value的位置“恢复”) val afterSort = kvWordRDD.map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1)) afterSort.foreach(word => println(word._1 + "出现了" + word._2 + "次。")) afterSort.saveAsTextFile(args(1)) sc.stop() } }~打包:多模块打包管理推荐使用“maven-assembly-plugin” 也可参考 注意:pom文件中build的路径,尽量缩小范围,不相干的就别加进去了。避免打包时受其他文件的影响~
以前编译scala并打包都是用的SBT,这次在maven-assembly-plugin中行不通了
【主要是我有点“较真”,我希望scala文件与java文件不混合打包,如果混合的话,还打包,是很方便的,但是在scala的代码中,我只想“专注处理数据”(不既处理数据,又提交jar到集群运行),并且单独打包,然后再使用Java或者Scala调用SparkSubmit提交到集群】
猜测该插件只编译java代码,而不编译scala代码 解决方法:maven-scala-plugin【但是这个库7年不更新了。。。几乎没人用】
<!-- scala打包 --> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <version>2.15.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin>补充:好慢的!!!比SBT慢太多了!!!
结果如下:
1.代码
public class RemoteSparkJava { public static void main(String[] args) { String fileName = "wordcount"; String[] arg0 = new String[]{ "--master", "spark://172.21.176.51:7078", "--deploy-mode", "client", "--class", "lab.ipl.scalacode.RemoteWordCount", "--executor-memory", "1000m", "--name", "use which name?", // "--jar",tmp + "lib/spark_filter.jar", // 指定jar包 "D:/JavaStudy/LabIpl/ipbdLab/ipl-bigdata/target/ipl-bigdata.jar", "hdfs://host0.com:8020/user/attop/test_data/movies.dat", "file:///E:/tmp20180502_" + fileName }; SparkSubmit.main(arg0); } }2.报错
java.lang.NoClassDefFoundError: org/apache/spark/deploy/SparkSubmit 原因是我在Maven中引入Spark依赖后,就删掉了之前导入的jars,加上即可。
3.报错:
不能连接到7077
start-slave.sh spark://host_name:7077首先我尝试手动重启spark:./start-master.sh -h 172.21.176.51发觉还是连接不上,于是查看日志,注意到master尝试web UI 连接8080失败:【按道理应该是 8080 for master, 8081 for worker】
WARN Utils: Service ‘MasterUI’ could not bind on port 8080. Attempting port 8081到8081页面后发觉master启动在7078
修改后再次运行,警告:
TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources 初始化job时没有获取到任何资源;提示检查集群,确保workers可以被注册并有足够的内存资源首先,我确保scala代码是运行了的: (这里的第一个警告: Skip remote jar,没找到合理的解释,TODO)
任务确实是能成功提交的:
jps找不见worker,查阅livy提交作业估计企业中也不咋用,这篇文章,留着实习了,再更新吧