spark job server使用方法

xiaoxiao2021-02-28  77

入门

clone代码 从github上的spark-jobserver工程clone代码到本地编译 需要将工程根目录下的config文件删除将文件夹job-server/config拷贝到工程根目录下将local.conf.template/local.sh.template重命名为locao.conf/local.sh配置环境export JAVA_HOME=/d/java/jdk1.7.0_75`</li> <li>运行编译命令bin/server_package.sh local`(需要sbt环境)打好的包位于:/tmp/job-server/job-server.tar.gz(打包位置,可以在bin/server_package.sh中设置WORK_DIR,来改变)将job-server.tar.gz上传到linux系统中配置 local.confmaster: 指定spark的masterjobserver: server的配置settings.sh port配置server路径log路径spark的home/conf路径等配置server_start.sh此脚本,使用spark-submit提交作业spark的相关配置信息,可以在此脚本里配置运行/停止server ./server_start.sh./server_stop.sh日志 setting.sh中配置的LOG_DIR,是日志所在位置server_start.log本地server日志spark-job-server.logspark的driver日志使用 初始化spark的context 例curl -d "" 'ip:port/contexts/contextName?context-factory=spark.jobserver.context.SQLContextFactory'ip:port:为spark job server启动的机器和端口contextName:context的名字,后面执行操作需要用到,还可以在jobTracker页面,通过此名字搜索运行的applicationcontext-factory:初始化context spark.jobserver.context.SQLContextFactory用来初始化SQLContextspark.jobserver.context.HiveContextFactory用来初始化HiveContextspark.jobserver.context.DefaultSparkContextFactory用来初始化SparkContextspark.jobserver.context.StreamingContextFactory用来初始化StreamingContext上传jar包,初始化appName 例curl --data-binary @/xx/xx/job-server-extras_2.10-0.7.0-SNAPSHOT.jar ip:port/jars/appName@指定本地资源路径jars/appName指定appName,后面执行操作需要用到执行操作(sql等) 例curl -d "sql=\"show databases\"" 'ip:port/jobs?appName=xxx&classPath=spark.jobserver.SqlTestJob&context=contextName&sync=true'post方式将数据”sql=xxx”传递给serverappName:步骤2中初始化的值classPath:用户可以自定义,实现接口spark.jobserver.api.SparkJobBase的方法runJob即可 spark.jobserver.SqlTestJob实现逻辑拿到传入的sqlSQLContext.sql(sql).collect()执行context:指定提交到的contextName,步骤1中初始化的值sync:是否为同步模式 true:会等待一段时间,如果超过这个时间,没有返回,则返回json格式,有报错信息例: {"status": "ERROR", "result": {"message": "Ask timed out on [Actor[akka://JobServer/user/context-supervisor/hive-context-test#91063205]] after[10000 ms]", "errorClass": "akka.pattern.AskTimeoutException", "stack": ["akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)", ..."]}false:直接返回json,包括信息如下:duration 作业完成时,则为作业总共占用时间,xx secs作业未完成,则为”Job not done yet”classPath:指定的classPathstartTime:作业提交时间context:提交到的context的namestatus ”STARTED”:开始执行jobId:spark运行的jobId获取返回结果 例curl -v 'ip:port/jobs/jobId'查询jobId的结果 duration:作业运行时间result:步骤3中自定义的classPath的返回结果statusFINISHEDERRORRUNNING等例:{ "duration": "24.463 secs", "classPath": "spark.jobserver.HiveTestJob", "startTime":"2016-11-17T11:01:09.249+08:00", "context": "hive-context-test", "result": ["[2,www]",...],"status": "FINISHED", "jobId": "5bc87741-c289-4f13-8f5c-de044256fcc7"}其他使用 context操作获取:curl http://host:port/contexts删除:curl -X DELETE http://host:port/contexts/namekill所有的context,并reload配置中的:curl -X PUT http://host:port/contexts?reset=rebootjob操作获取:curl http://host:port/jobskill:curl -X DELETE http://host:port/jobs/jobId

参考文献

spark job server git

转载请注明原文地址: https://www.6miu.com/read-59788.html

最新回复(0)