【源码分析】storm拓扑运行全流程源码分析

xiaoxiao2021-02-28  98

【源码分析】storm拓扑运行全流程源码分析

@(STORM)[storm]

源码分析storm拓扑运行全流程源码分析一拓扑提交流程 一stormpy 1storm jar2def jar3exec_storm_class4get_classpath 二拓扑提交之一 1用户代码调用submitTopology2StormSubmittersubmitTopologyWithProgressBar3StormSubmittersubmitTopology 二提交拓扑之二StormSubmittersubmitTopologyAs 1加载配置2使用NimbusClient提交拓扑 二 拓扑运行流程 一概述

一、拓扑提交流程

拓扑提交的总体流程如下: 1、客户端通过thrift RPC提交topology的配置及jar包到nimbus。 2、nimbus针对该topology建立本地目录。 3、nimbus调度器根据topology的配置计算task,并把task分配到不同的worker上,调度的结果写入zookeeper。 4、zk上建立assignment节点,存储task和supervisor中的worker的对应关系。同时在zk上创建workerbeats节点来监控worker的心跳。 5、supervisor去zk上获取分配的task信息,启动一个或者多个worker来执行。 6、每个worker上运行一个或多个executor,每个executor对应一个线程,worker内部的executor之间通过DisrupterQueue进行通信,不同worker间默认采用netty来通信。 7、executor运行一个或者多个task(spout/bolt) 到此,topology就正式运行起来了。

具体流程图如下:(参考自《storm技术内幕与大数据实践》P96)

本文介绍了通过调用storm jar如何向nimbus提交拓扑的过程,即上述的第一步,主要的工作是加载配置信息,classpath,并将其与用户的jar包通过thrift协调上传至nimbus,等待nimbus的调用。

(一)storm.py

在这部分,请尤其注意classpath的设置。 依次将下列内容加入classpath中:

\$STORM_HOME \$STORM_HOME/lib \$STORM_HOME/extlib 用户代码的jar包 ~/.storm \$STORM_HOME/bin

详见下面的分析。

1、storm jar

用户可以通过storm jar命令向storm集群提交一个拓扑,如:

/home/hadoop/storm/bin/storm jar storm-starter-topologies-0.9.4.jar storm.starter.WordCountTopology word-count

其实,storm执行的是bin/目录下的storm.py文件

2、def jar

jar函数只有一行,就是执行exec_storm_class函数。

def jar(jarfile, klass, *args): exec_storm_class( klass, jvmtype="-client", extrajars=[jarfile, USER_CONF_DIR, STORM_BIN_DIR], args=args, daemon=False, jvmopts=JAR_JVM_OPTS + ["-Dstorm.jar=" + jarfile])

其中的几个变量为:

USER_CONF_DIR = os.path.expanduser("~" + os.sep + ".storm") STORM_BIN_DIR = os.path.join(STORM_DIR, "bin")

因此用户jar包,~/.storm及$STORM_HOME/bin目录下的jar包会被自动加载到classpath中。

3、exec_storm_class

def exec_storm_class(klass, jvmtype="-server", jvmopts=[], extrajars=[], args=[], fork=False, daemon=True, daemonName=""): global CONFFILE storm_log_dir = confvalue("storm.log.dir",[CLUSTER_CONF_DIR]) if(storm_log_dir == None or storm_log_dir == "nil"): storm_log_dir = os.path.join(STORM_DIR, "logs") all_args = [ JAVA_CMD, jvmtype, "-Ddaemon.name=" + daemonName, get_config_opts(), "-Dstorm.home=" + STORM_DIR, "-Dstorm.log.dir=" + storm_log_dir, "-Djava.library.path=" + confvalue("java.library.path", extrajars, daemon), "-Dstorm.conf.file=" + CONFFILE, "-cp", get_classpath(extrajars, daemon), ] + jvmopts + [klass] + list(args) print("Running: " + " ".join(all_args)) if fork: os.spawnvp(os.P_WAIT, JAVA_CMD, all_args) elif is_windows(): # handling whitespaces in JAVA_CMD sub.call(all_args) else: os.execvp(JAVA_CMD, all_args)

可以看出,最后就是运行一条java命令,主类是用户main函数的类。 看一下classpath的设置。

4、get_classpath

def get_classpath(extrajars, daemon=True): ret = get_jars_full(STORM_DIR) ret.extend(get_jars_full(STORM_DIR + "/lib")) ret.extend(get_jars_full(STORM_DIR + "/extlib")) if daemon: ret.extend(get_jars_full(STORM_DIR + "/extlib-daemon")) if STORM_EXT_CLASSPATH != None: for path in STORM_EXT_CLASSPATH.split(os.pathsep): ret.extend(get_jars_full(path)) if daemon and STORM_EXT_CLASSPATH_DAEMON != None: for path in STORM_EXT_CLASSPATH_DAEMON.split(os.pathsep): ret.extend(get_jars_full(path)) ret.extend(extrajars) return normclasspath(os.pathsep.join(ret))

依次将下列内容加入classpath中:

"-Dstorm.jar=" + jarfile \$STORM_HOME \$STORM_HOME/lib \$STORM_HOME/extlib 用户代码的jar包 ~/.storm \$STORM_HOME/bin

(二)拓扑提交之一

1、用户代码调用submitTopology

用户一般通过StormSubmitter.submitTopology提交拓扑

if (args != null && args.length > 0) { conf.setNumWorkers(3); StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); }

这里使用了submitTopologyWithProgressBar,只是在submitTopology的基础上增加了一些进度信息,见下面代码。

2、StormSubmitter.submitTopologyWithProgressBar

public static void submitTopologyWithProgressBar(String name, Map stormConf, StormTopology topology, SubmitOptions opts) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException { // show a progress bar so we know we're not stuck (especially on slow connections) submitTopology(name, stormConf, topology, opts, new StormSubmitter.ProgressListener() { @Override public void onStart(String srcFile, String targetFile, long totalBytes) { System.out.printf("Start uploading file '%s' to '%s' (%d bytes)\n", srcFile, targetFile, totalBytes); } @Override public void onProgress(String srcFile, String targetFile, long bytesUploaded, long totalBytes) { int length = 50; int p = (int)((length * bytesUploaded) / totalBytes); String progress = StringUtils.repeat("=", p); String todo = StringUtils.repeat(" ", length - p); System.out.printf("\r[%s%s] %d / %d", progress, todo, bytesUploaded, totalBytes); } @Override public void onCompleted(String srcFile, String targetFile, long totalBytes) { System.out.printf("\nFile '%s' uploaded to '%s' (%d bytes)\n", srcFile, targetFile, totalBytes); } }); }

本质上就是调用submitTopology方法,同时在start, progress和complete阶段输出一些信息。

3、StormSubmitter.submitTopology

@SuppressWarnings("unchecked") public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts, ProgressListener progressListener) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException { submitTopologyAs(name, stormConf, topology, opts, progressListener, null); }

StormSubmitter.submitTopology其实就是调用StormSubmitter.submitTopologyAs。下面我们详细分析一下StormSubmitter.submitTopologyAs

(二)提交拓扑之二:StormSubmitter.submitTopologyAs

1、加载配置

在submitTopologyAs中,第一件事就是将拓扑的配置加载到一个HashMap中

if(!Utils.isValidConf(stormConf)) { throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable"); } stormConf = new HashMap(stormConf); stormConf.putAll(Utils.readCommandLineOpts()); Map conf = Utils.readStormConfig(); conf.putAll(stormConf); stormConf.putAll(prepareZookeeperAuthentication(conf));

上述代码完成了以下功能: (1)检查拓扑传进来的conf是否有效,是否能json化,然后将其转换为HashMap。这里的conf是用户在建立拓扑时通过以下类似代码传进来的:

Config config = new Config(); config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 200); config.setNumWorkers(topoNumWorker); config.setMaxTaskParallelism(20); config.put(Config.NIMBUS_HOST, nimbusHost); config.put(Config.NIMBUS_THRIFT_PORT, 6627); config.put(Config.STORM_ZOOKEEPER_PORT, 2181); config.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList(zk)); config.put(Config.TOPOLOGY_NAME, topologyName);

(2)将命令行中的参数加载进stormConf中 (3)调用readStormConfig,加载配置文件中的内容:

public static Map readStormConfig() { Map ret = readDefaultConfig(); String confFile = System.getProperty("storm.conf.file"); Map storm; if (confFile==null || confFile.equals("")) { storm = findAndReadConfigFile("storm.yaml", false); } else { storm = findAndReadConfigFile(confFile, true); } ret.putAll(storm); ret.putAll(readCommandLineOpts()); return ret; }

先加载defaults.yaml, 然后再加载storm.yaml

(4)最后,加载zk认证相关信息。 (5)除此之外,还可以组件中覆盖getComponentConfiguration方法以修改其组件的配置。 (6)最后,还可以使用spoutDeclare与boltDeclare设置外部组件。

注意,这里有conf和stormConf2个变量,conf才是全部的配置,stormConf不包括defaults.yaml和storm.yaml。先将用户配置加载到stormConf,然后将defaults.yaml和storm.yaml回到conf,最后将stormConf加载到conf.

2、使用NimbusClient提交拓扑

当配置准备好以后,就开始向nimbus提交拓扑。在storm中,nimbus是一个thrift服务器,它接受客户端通过json文件提交RPC调用,即NimbusClient向nimbus提供一份json格式的字符串,用于提交拓扑信息。

String serConf = JSONValue.toJSONString(stormConf); NimbusClient client = NimbusClient.getConfiguredClientAs(conf, asUser); if(topologyNameExists(conf, name, asUser)) { throw new RuntimeException("Topology with name `" + name + "` already exists on cluster"); } String jar = submitJarAs(conf, System.getProperty("storm.jar"), progressListener, asUser); try { LOG.info("Submitting topology " + name + " in distributed mode with conf " + serConf); if(opts!=null) { client.getClient().submitTopologyWithOpts(name, jar, serConf, topology, opts); } else { // this is for backwards compatibility client.getClient().submitTopology(name, jar, serConf, topology); } } catch(InvalidTopologyException e) { LOG.warn("Topology submission exception: "+e.get_msg()); throw e; } catch(AlreadyAliveException e) { LOG.warn("Topology already alive exception", e); throw e; } finally { client.close(); }

核心步骤包括: (1)将配置文件改为json格式的string

String serConf = JSONValue.toJSONString(stormConf);

(2)获取Nimbus client对象

NimbusClient client = NimbusClient.getConfiguredClientAs(conf, asUser);

getConfiguredClientAs的代码中的其中一行是指定nimbus的地址:

String nimbusHost = (String) conf.get(Config.NIMBUS_HOST);

(3)检查拓扑名称是否已经存在

if(topologyNameExists(conf, name, asUser)) { throw new RuntimeException("Topology with name `" + name + "` already exists on cluster"); }

(4)将jar包上传至nimbus

String jar = submitJarAs(conf, System.getProperty("storm.jar"), progressListener, asUser);

(5)最后调用submitTopologyWithOpts正式向nimbus提交拓扑,参数包括:

client.getClient().submitTopologyWithOpts(name, jar, serConf, topology, opts);

submitTopologyWithOpts方法就只有2行:

send_submitTopologyWithOpts(name, uploadedJarLocation, jsonConf, topology, options); recv_submitTopologyWithOpts();

即将信息发送至thrift server及接收返回信息。发送的信息包括:

args.set_name(name); args.set_uploadedJarLocation(uploadedJarLocation); args.set_jsonConf(jsonConf); args.set_topology(topology); args.set_options(options);

其中set_uploadedJarLocation指定了jar包的上传路径。

综上所述,其实所谓的提交拓扑,就是将拓扑的配置信息通过thrift发送到thrift server,并把jar包上传到nimbus,等待nimbus的后续处理,此时拓扑并未真正起来,直至recv_submitTopologyWithOpts获得成功的返回信息为止。

二、 拓扑运行流程

(一)概述

拓扑数据流如下: 1、Spout读取或者产生数据 2、通过netty/ZMQ将数据从所在的worker发送到下一个Executor所在的worker(如果下一个Executor与spout的executor在同一个worker,则直接发送到自身worker内部的Disruptor Queue) 3、worker根据TaskId将消息放入Executor的输入Disruptor Queue中 4、Executor处理完数据后,将其放到自身的输出Disruptor Queue中 5、然后Executor还会启动一个线程将输出Disruptor Queue中的内容通过netty发送到其它worker中,或者直接发送至其它Executor相对应的输入Disruptor Queue(源executor与目标executor在同一个worker的情况)。 6、如此循环3~5步骤,直至所有executor都处理完成数据。

executor的执行方式是一个典型的生产者消费者模式

转载请注明原文地址: https://www.6miu.com/read-63257.html

最新回复(0)