spark job server原理

xiaoxiao2021-02-28  91

配置相关

settings.sh 功能:配置环境变量APP_USER/APP_GROUP:作业提交用户和组JMX_PORT:java jmx端口,通常在aws或者其他容器里打开INSTALL_DIR:sjs所做目录LOG_DIR:日志路径PIDFILE:启动sjs,产生pid存放的文件名JOBSERVER_MEMORY:启动spark作业的driverMemSPARK_VERSION:指定spark版本SCALA_VERSION:scala版本SPARK_HOME、SPARK_LOG_DIR、SPARK_CONF_DIR:spark相关配置YARN_CONF_DIR、HADOOP_CONF_DIR:yarn配置local.conf spark.master:指定spark提交的类型,yarn-client、local[4]等spark.jobserver port:指定jobServer的启动端口,使用此端口进行作业提交和监控等context-per-jvm:是否每个context都启动一个独立的进程jobdao:指定处理jobs、jars等逻辑的类datadao:通过POST/data上传到sjs的文件存放路径sqldao:当jobdao指定为JobSqlDAO时使用 slick-driverjdbc-driverrootdir:H2 driver存放数据目录jdbc:连接dbcp:连接池result-chunk-size 作业返回值使用分块传输,每块大小spark.contexts:启动sjs自动加载的context配置 名字context-settings:启动context,即app,相关配置 num-cpu-cores:core个数memory-per-node:executor的mem,eg 512m、1Gdependent-jar-uris:依赖的jar包,list形式,或者字符串,使用逗号隔开 [“file:///xxx.jar”,”file:///xxx2.jar”],或者”file:///xxx.jar,file:///xxx2.jar”其他的spark配置,去掉前缀spark即可 如:spark.speculation可配置为speculationserver_start.sh 启动spark job servermanager_start.sh context-per-jvm设置为true时,才会使用此脚本,用于启动context

使用

启动 运行脚本server_start.sh即可初始化context curl -d "" 'ip:port/contexts/roncen_test_context?context-factory=spark.jobserver.context.HiveContextFactory'上传jar包 curl -H "Content-Type: application/java-archive" --data-binary @/home/vipshop/platform/sjs_2.0/jars/job-server-extras_2.11-0.7.0-SNAPSHOT.jar ip:8091/binaries/sql提交作业 curl -d "sql_file=\"hdfs://bipcluster/spark/sql/test_cassandra.sql\"" 'ip:8091/jobs?appName=sql&classPath=spark.jobserver.vip.VipHiveJob&context=roncen_test_context&sync=false'通过jobId获取job运行状态 curl -v 'ip:8091/jobs/xxx删除context curl -X DELETE "ip:8091/contexts/roncen_test_context"

问题记录

server返回失败问题[delete context时,context上的job并未结束]时不时返回The server was not able to produce a timely response to your request

问题1:The server was not able to produce a timely response to your request

探测方法 curl -v 'ip:port/jobs/b2ee01d2-a495-43a3-a0e5-f2ba82330211'探测对应的jobId状态正常情况下,返回:”RUNNING”|”ERROR”|”FINISHED”获取job状态逻辑 spark.jobserver.WebApi中接收http的GET请求GET /jobs/<jobId>通过akka从jobInfoActor中获取job状态GetJobStatus(jobId) 从jobDao中获取对应jobId的信息 JobSqlDao.getJobInfo()中,从数据库中查询对应job的信息,返回返回格式application/json给客户端 jobId不存在:返回No such job ID xxxx存在: 构造返回格式:jobId: , startTime: , classPath: , context: , duration: , status:通过akka从JobInfoActor中获取job结果GetJobResult 通过AkkaClusterSupervisorActor的GetResultActor(context)得到对应的resultActor 通过contextName得到对应的resultActor - 通过resultActor的GetJobResult(jobId)得到最后的结果返回客户端结果

初始化context步骤

命令示例curl -d "" 'ip:port/contexts/sql-context-for-update-on-sale-85?context-factory=spark.jobserver.context.HiveContextFactory'通过http调用WebApi中的POST /contexts/<contextName>

通过akka调用AkkaClusterSupervisorActor的AddContext(cName, config)

判断是否存在,如果存在则返回ContextAlreadyExists

调用方法startContext()

生成contextActorName,”jobManager-” + uuid在${LOG_DIR}路径下创建contextDir路径,生成对应文件context.conf 存放actorname、context-factory等基础信息生成执行命令:${deploy.manager-start-cmd} contextDir cluster.selfAddress(akka地址),即./manager_start.sh xxx,此命令是在后台执行的,命令后有&判断返回值,如果失败,返回ContextInitError,如果成功,将其放入contextInitInfos的map中执行上述生成的命令 执行主类spark.jobserver.JobManager获取context.conf文件中的配置信息初始化jobDao,spark.jobserver.jobdao配置,这里为spark.jobserver.io.JobSqlDAO初始化JobDAOActor,命名为dao-manager-jobmanager初始化jobManager,命名为${context.actorname}join到cluster中Cluster(system).join(clusterAddress) ???? 发送ActorIdentity(memberActors, actorRefOpt)到AkkaClusterSupervisorActorAkkaClusterSupervisorActor收到此消息后 遍历当前cluster所有的actorRef 如果返回的actorName以jobManager开头则执行以下步骤,否则不处理从contextInitInfos中remove当前actorName对应的actor执行方法initContext()初始化JobResultActor resultActor通过akka将resultActor发送给正在处理的actor,即发送消息JobManagerActor.Initialize(Some(resultActor))到JobManagerActor JobManagerActor得到消息后,进行如下处理 初始化JobStatusActor得到JobResultActor,如果resultActor没有,则初始化一个加载dependent-jar-uris指定的jar包生成contextFactory,生成context生成JobCacheImpl,用于缓存job信息将dependent-jar-uris指定的jar包放入sparkContext.addJar()中返回Initialized(contextName, resultActor),如果失败,则返回InitError(t)得到返回值 如果成功,则将当前context放到contexts中,即contexts(ctxName) = (ref, resActor)

返回成功/失败

返回json类型结果

提交job到context中

命令示例:curl -d "sql = \"show databases\"" 'ip:port/jobs?appName=sql&classPath=spark.jobserver.HiveTestJob&context=sql-context-for-gs-sku-check-85&sync=true'通过http调用WebApi中的POST /jobs通过akka中AkkaClusterSupervisorActor的GetContext(name),得到对应context的jobManager 如果没有得到,则返回NoSuchContext或者ContextInitError(err)通过jobManager进行与context进程通信,发送JobManagerActor.StartJob,用于提交作业 加载未加载的jar包调用startJobInternal() 通过jobSqlDao,获取当前appName上次提交作业的时间和type,如果没有则返回错误随机生成randomUUID,作为jobId通过sparkContextFactory.loadAndValidateJob()生成jobContainer 通过classPath/appname,在JobCacheImpl中获取JobJarInfo,并初始化 如果cache中没有,会通过akka发送消息GetBinaryPath(),从jobSqlDao中获取jar包初始化构造函数,将其放入JobContainer中,返回判断返回值,如果为Good(container),则继续,否则返回错误将结果发送给JobResultActor和JobStatusActor JobStatusActor 发送消息SaveJobInfo到jobSqlDao,将信息存入元数据库调用方法getJobFuture()返回结果 判断当前runningJob是否大于最大运行job,如果是则返回NoJobSlotsAvailable(maxRunningJobs),否则继续使用scala的Future,另起线程执行job 设置SparkEnv发送消息JobInit到JobStatusActor通过方法HiveTestJob.validate()判断当前job是否正常 如果正常 发送消息JobStarted到JobStatusActor设置sparkContext的jobGroup为当前jobId,sc.setJobGroup(jobId, xxx)调用接口,执行job,HiveTestJob.runJob(jobC, jobEnv, jobData)否则发送JobValidationFailed线程执行结束 成功 发送JobFinished到JobStatusActor发送JobResult到JobResultActor失败 发送JobErroredOut到JobStatusActor判断返回结果,并返回给客户端对应的http reponse JobResult(jobId, res)JobErroredOutJobStarted(_, jobInfo) 通过akka发送给JobInfoActor消息StoreJobConfig(jobInfo.jobId, postedJobConfig) JobInfoActor得到消息后,通过jobDao.saveJobConfig(jobId, jobConfig)存储信息,这里为JobSqlDaoJobValidationFailedNoSuchApplicationNoSuchClassWrongJobTypeWrongJobTypeNoJobSlotsAvailableContextInitError

图形化展示

转载请注明原文地址: https://www.6miu.com/read-60312.html

最新回复(0)