jstorm 本机调测为什么不能在本地调测

1588人阅读
流式计算(2)
紧接上篇,
5、上篇任务已经ServiceHandler.submitTopologyWithOpts()方法,在该方法中,会实例化一个TopologyAssignEvent,相当于创建了一个topology级别的作业,然后将其保存到TopologyAssign的任务队列中,具体代码如下:
TopologyAssignEvent assignEvent = new TopologyAssignEvent();
assignEvent.setTopologyId(topologyId);
assignEvent.setScratch(false);
assignEvent.setTopologyName(topologyname);
assignEvent.setOldStatus(Thrift
.topologyInitialStatusToStormStatus(options
.get_initial_status()));
TopologyAssign.push(assignEvent);
6、TopologyAssign是Jstorm一个任务分配器,它会根据配置和Topology中spout和bolt的关系来进行Task的创建和分配,但是具体任务的创建和非配并发其自身完成的,二是调用Jstorm自身的调度器完成的,当然Jstorm允许用户根据自己业务需求定制调度器,关于Jstorm的调度器分析会本人专门写一篇文章,此处暂不做任何说明。回到TopologyAssign,该类是一个实现了Runnable接口的后台线程,随着Nimbus启动,主要完成topology作业分配、备份和作业均衡的作用,当天还是通过Jstorm的调度器来完成的,其run方法会采用阻塞的方式获取自身作业队列中的作业,然后进行作业分配,其作业分配核心业务如下
public Assignment mkAssignment(TopologyAssignEvent event) throws Exception {
String topologyId = event.getTopologyId();
TopologyAssignContext context = prepareTopologyAssign(event);
//ResourceWorkerSlot是worker的抽象,封装了worker和其task
Set&ResourceWorkerSlot& assignments =
IToplogyScheduler scheduler = schedulers.get(DEFAULT_SCHEDULER_NAME);
//通过Jstorm的调度来计算任务的分配
assignments = scheduler.assignTasks(context);
Assignment assignment =
Map&String, String& nodeHost = getTopologyNodeHost(
context.getCluster(), context.getOldAssignment(), assignments);
Map&Integer, Integer& startTimes = getTaskStartTimes(context,
nimbusData, topologyId, context.getOldAssignment(), assignments);
//获取提交到集群的jar包地址,Worker执行任务时需要下载代码
String codeDir = StormConfig.masterStormdistRoot(nimbusData.getConf(),
topologyId);
assignment = new Assignment(codeDir, assignments, nodeHost, startTimes);
StormClusterState stormClusterState = nimbusData.getStormClusterState();
//将分配好的任务上传到ZK,通知supervisor
stormClusterState.set_assignment(topologyId, assignment);
//更新Task的开始时间
NimbusUtils.updateTaskHbStartTime(nimbusData, assignment, topologyId);
// 更新元信息到ZK
if (context.getAssignType() == TopologyAssignContext.ASSIGN_TYPE_REBALANCE
|| context.getAssignType() == TopologyAssignContext.ASSIGN_TYPE_MONITOR)
NimbusUtils.updateMetricsInfo(nimbusData, topologyId, assignment);
metricsMonitor(event);
7、Nimbus已经将任务分配好了,并且创建到ZK上,此时就需要supervisor认领自己的任务了,supervisor获取任务的具体逻辑封装在SyncSupervisorEvent,其也是一个后台线程,会不停获取ZK上(JSTORM_ROOT/assignments下)的全部任务,然后把自己的任务保存到本地磁盘上,再通过NimbusClient把topology的代码保存到本地,然后启动worker启动线程来执行任务,具体业务逻辑代码如下
public void run() {
RunnableCallback syncCallback = new EventManagerZkPusher(this,
syncSupEventManager);
*首次启动时主动获取ZK上JSTORM_ROOT/assignments的全部任务,后续通过ZK的watch以一种回调的方式获取任务,
Map&String, Assignment& assignments = Cluster.get_all_assignment(
stormClusterState, syncCallback);
*获取本地已经下载的topology
List&String& downloadedTopologyIds = StormConfig
.get_supervisor_toplogy_list(conf);
* 在所有作业中,获取自身的作业
Map&Integer, LocalAssignment& localAssignment = getLocalAssign(
stormClusterState, supervisorId, assignments);
* 将作业保存到本地磁盘
localState.put(Common.LS_LOCAL_ASSIGNMENTS, localAssignment);
// 获取topology的代码下载地址
Map&String, String& topologyCodes = getTopologyCodeLocations(
assignments, supervisorId);
//通过NimbusClient将代码下载到本地
downloadTopology(topologyCodes, downloadedTopologyIds);
* 删除无用的topology
removeUselessTopology(topologyCodes, downloadedTopologyIds);
* 将syncProcesses加到执行队列,syncProcesses复杂启动新的worker来执行任务
processEventManager.add(syncProcesses);
8、SyncSupervisorEvent将自己的作业选出来,并保存到本地之后,再由SyncProcessEvent来启动worker执行具体的作业,SyncProcessEvent主要干两件事,启动新的worker,杀死无用的worker,此处要涉及启动新的Worker,具体业务逻辑如下
private void startNewWorkers(Set&Integer& keepPorts,
Map&Integer, LocalAssignment& localAssignments) throws Exception {
* 获取本次新分配的作业
Map&Integer, LocalAssignment& newWorkers = JStormUtils
.select_keys_pred(keepPorts, localAssignments);
* 给每个新作业生成一个ID
Map&Integer, String& newWorkerIds = new HashMap&Integer, String&();
for (Entry&Integer, LocalAssignment& entry : newWorkers.entrySet()) {
Integer port = entry.getKey();
LocalAssignment assignment = entry.getValue();
String workerId = UUID.randomUUID().toString();
newWorkerIds.put(port, workerId);
//保存每个Worker的ID到本地
StormConfig.worker_pids_root(conf, workerId);
//启动新的JVM执行作业
launchWorker(conf, sharedContext,
assignment.getTopologyId(), supervisorId, port,
workerId, assignment);
以上就是Jstorm提交一个topology的过程,这两篇文章只是给出了一条主线,具体的代码逻辑并未详细给出,后续会不断完善,同时关于Jstrom的调度器后续也会给出详细分析
参考知识库
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:91298次
积分:2193
积分:2193
排名:第12315名
原创:126篇
评论:28条
文章:15篇
阅读:22385
(1)(1)(1)(1)(1)(1)(1)(1)(1)(5)(1)(4)(4)(5)(10)(4)(2)(6)(75)(1)(1)(1)Supervisor是JStorm中的工作节点,类似于MR的TT,subscribe zookeeper的任务调度结果数据,根据任务调度情况启动/停止工作进程Worker。同时Supervisor需要定期向zookeeper写入活跃端口信息以便Nimbus监控。Supervisor不执行具体处理工作,所有的计算任务都交Worker完成。从整个架构上看,Supervisor处在整个JStorm三级管理架构的中间环节,辅助管理任务调度和资源管理工作。
1.Supervisor
Supervisor单节点架构如上图所示,初始化时启动进程Supervisor,根据Nimbus分配的任务情况触发启动/停用Worker JVM进程,其中每个Worker进程启动一个或多个Task线程,其中Task须同属单个Topology。从整个Supervisor节点来看运行多个JVM进程,包括一个Supervisor进程和一个或多个Worker进程。不同角色状态通过不同的方式维护。其中Task通过hb直接将包括时间信息和当前Task的统计信息写到zookeeper;Worker定期将包括Topology id,端口,Task id集合及当前时间写入本地;Supervisor定期将包括时间及节点资源(端口集合)写到zookeeper,同时从zookeeper读取任务调度结果,根据结果启动/停用Worker进程。
在Worker JVM进程内部,除了相互独立的Task线程外,Task线程会共享数据收发和节点之间连接管理等Worker进程内的公共资源,如图所示。其中:VirtualPort:数据接收线程;KeyoTupleSerialize:Tuple数据序列化;TransferQueue:数据发送管道;DrainerRunnable:数据发送线程;RefreshConnections:节点之间连接管理线程。
三、实现与代码剖析
1.Supervisor
在jstorm-0.7.1中,Supervisor daemon实现在jstorm-server/src/main/java目录下com.alipay.dw.jstorm.daemon.supervisor包里。Supervisor.java是Supervisor daemon的入口,Supervisor进程主要做以下几件事情。
1、清理本地临时目录下数据$jstorm-local-dir/supervisor/tmp;2、创建zk操作实例;3、本地新建状态文件,$jstorm-local-dir/supervisor/localstate;4、生成supervisor-id并写入localstate,其中key=&supervisor-id&;如果supervisor重启,先检查supervisor-id是否已经存在,若存在直接读取即可;5、初始化并启动Heartbeat线程;6、初始化并启动SyncProcessEvent线程;7、初始化并启动SyncProcessEvent线程;8、注册主进程退出数据清理Hook in SupervisorManger。
@SuppressWarnings("rawtypes")
public SupervisorManger mkSupervisor( conf, MQContext sharedContext)
("Starting Supervisor with conf " + conf);
active = new AtomicBoolean(true);
* Step 1: cleanup all files in /storm-local-dir/supervisor/tmp
path = StormConfig.supervisorTmpDir(conf);
FileUtils.cleanDirectory(new (path));
* Step 2: create ZK operation instance
* StromClusterState
StormClusterState stormClusterState = Cluster
.mk_storm_cluster_state(conf);
* Step 3, create LocalStat
* LocalStat is one KV database
* 4.1 create LocalState instance
* 4.2 get supervisorId, if no supervisorId, create one
LocalState localState = StormConfig.supervisorState(conf);
supervisorId = () localState.get(Common.LS_ID);
if (supervisorId == null) {
supervisorId = UUID.randomUUID().toString();
localState.put(Common.LS_ID, supervisorId);
threads = new ();
// Step 5 create HeartBeat
// every supervisor.heartbeat.frequency.secs, write SupervisorInfo to ZK
myHostName = NetWorkUtils.hostname();
int startTimeStamp = TimeUtils.current_time_secs();
Heartbeat hb = new Heartbeat(conf, stormClusterState, supervisorId,
myHostName, startTimeStamp, active);
hb.update();
AsyncLoopThread heartbeat = new AsyncLoopThread(hb, false, null,
.MIN_PRIORITY, true);
threads.add(heartbeat);
// Step 6 create and start sync Supervisor thread
// every supervisor.monitor.frequency.secs second run SyncSupervisor
EventManager processEventManager = new EventManagerImp(false);
ConcurrentHashMap workerThreadPids = new ConcurrentHashMap();
//读取$jstorm-local-dir/supervior/localstate中key=local-assignments的value值,根据该值执行workers的kill/start
SyncProcessEvent syncProcessEvent = new SyncProcessEvent(supervisorId,
conf, localState, workerThreadPids, sharedContext);
EventManager syncSupEventManager = new EventManagerImp(false);
//通过比较$zkroot/assignments/{topologyid}全量数据和本地STORM-LOCAL-DIR/supervisor/stormdist/{topologyid}:
//1.从nimbus下载有任务分配到本节点的topology的jar和配置数据
//2.从本地删除已经失效的topology的jar和配置数据
SyncSupervisorEvent syncSupervisorEvent = new SyncSupervisorEvent(
supervisorId, conf, processEventManager, syncSupEventManager,
stormClusterState, localState, syncProcessEvent);
int syncFrequence = () conf
.get(Config.SUPERVISOR_MONITOR_FREQUENCY_SECS);
EventManagerPusher syncSupervisorPusher = new EventManagerPusher(
syncSupEventManager, syncSupervisorEvent, active, syncFrequence);
AsyncLoopThread syncSupervisorThread = new AsyncLoopThread(
syncSupervisorPusher);
threads.add(syncSupervisorThread);
("Starting supervisor with id " + supervisorId + " at host " + myHostName);
// SupervisorManger which can shutdown all supervisor and workers
return new SupervisorManger(conf, supervisorId, active, threads,
syncSupEventManager, processEventManager, stormClusterState,
workerThreadPids);
Heartbeat线程
1、默认间隔60s向zookeeper汇报supervisor信息,汇报内容打包成SupervisorInfo,包括hostname,workerports,current time和during time等信息;
@SuppressWarnings("unchecked")
public void update() {
SupervisorInfo sInfo = new SupervisorInfo(
TimeUtils.current_time_secs(), myHostName,
() conf.get(Config.SUPERVISOR_SLOTS_PORTS),
(int) (TimeUtils.current_time_secs() - startTime));
stormClusterState.supervisor_heartbeat(supervisorId, sInfo);
} catch ( e) {
LOG.error("Failed to update SupervisorInfo to ZK", e);
SyncProcessEvent线程
1、定期从本地文件$jstorm-local-dir/supervisor/localstate中读取key=&local-assignments&数据;该数据会由SyncSupervisorEvent线程定期写入;2、读取本地$jstorm-local-dir /worker/ids/heartbeat中Worker状态数据;3、对比local-assignments及worker的状态数据,执行操作start/kill worker进程;其中Worker和Supervisor属于不同JVM进程,Supervisor通过Shell命令启动Worker:
nohup java &server
-Djava.library.path="$JAVA.LIBRARY.PATH"
-Dlogfile.name="$topologyid-worker-$port.log"
-Dlog4j.configuration=jstorm.log4j.properties
-Djstorm.home="$JSTORM_HOME"
-cp $JAVA_CLASSSPATH:$JSTORM_CLASSPATH
com.alipay.dw.jstorm.daemon.worker.Worker
topologyid supervisorid port workerid
SyncProcessEvent线程执行流程如下:
@SuppressWarnings("unchecked")
public void run() {
LOG.debug("Syncing processes");
* Step 1: get assigned tasks from localstat Map
//1.从本地文件$jstorm-local-dir/supervisor/localstate里读取key=&local-assignments&数据
localAssignments =
localAssignments = () localState
.get(Common.LS_LOCAL_ASSIGNMENTS);
} catch ( e) {
LOG.error("Failed to get LOCAL_ASSIGNMENTS from LocalState", e);
if (localAssignments == null) {
localAssignments = new ();
LOG.debug("Assigned tasks: " + localAssignments);
* Step 2: get local WorkerStats from local_dir/worker/ids/heartbeat
//2.根据localAssignments与workers的hb比对结果得到workers的状态
localWorkerStats =
localWorkerStats = getLocalWorkerStats(conf, localState,
localAssignments);
} catch ( e) {
LOG.error("Failed to get Local worker stats");
LOG.debug("Allocated: " + localWorkerStats);
* Step 3: kill Invalid Workers and remove killed worker from
* localWorkerStats
//3.根据workers的状态值启动/停用相关worker
keepPorts = killUselessWorkers(localWorkerStats);
// start new workers
startNewWorkers(keepPorts, localAssignments);
} catch ( e) {
LOG.error("Failed Sync Process", e);
// throw e
SyncSupervisorEvent线程
1、从$zk-root/assignments/{topologyid}下载所有任务调度结果,并筛选出分配到当前supervisor的任务集合,验证单个端口仅分配了单个Topology的任务通过后,将上述任务集合写入本地文件$jstorm-local-dir/supervisor/localstate,以便SyncProcessEvent读取及后续操作;2、对比任务分配结果与已经存在的Topology,从Nimbus下载新分配过来的Topology,同时删除过期Topology。SyncSupervisorEvent线程执行流程如下:
public void run() {
LOG.debug("Synchronizing supervisor");
RunnableCallback syncCallback = new EventManagerZkPusher(this,
syncSupEventManager);
* Step 1: get all assignments
* and register /ZK-dir/assignment and every assignment watch
//1.从zk获取分配完成的任务集assignments:(topologyid -& Assignment)
//$zkroot/assignments/{topologyid}
assignments = Cluster.get_all_assignment(
stormClusterState, syncCallback);
LOG.debug("Get all assignments " + assignments);
* Step 2: get topologyIds list from
* STORM-LOCAL-DIR/supervisor/stormdist/
//2.本地已经下载的topology集合$jstorm-local-dir/supervisor/stormdist/{topologyid}
downloadedTopologyIds = StormConfig
.get_supervisor_toplogy_list(conf);
LOG.debug("Downloaded storm ids: " + downloadedTopologyIds);
* Step 3: get
from ZK local node's
* assignment
//3.从assignments里筛选出分配到当前supervisor的任务集合
localAssignment = getLocalAssign(
stormClusterState, supervisorId, assignments);
* Step 4: writer local assignment to LocalState
//4.将步骤3得到的结果写本地文件$jstorm-local-dir/supervisor/localstate
LOG.debug("Writing local assignment " + localAssignment);
localState.put(Common.LS_LOCAL_ASSIGNMENTS, localAssignment);
} catch ( e) {
LOG.error("put LS_LOCAL_ASSIGNMENTS " + localAssignment
+ " of localState failed");
// Step 5: download code from ZK
//5.下载新分配任务的Topology
topologyCodes = getTopologyCodeLocations(assignments);
downloadTopology(topologyCodes, downloadedTopologyIds);
* Step 6: remove any downloaded useless topology
6.删除过期任务的Topology
removeUselessTopology(topologyCodes, downloadedTopologyIds);
* Step 7: push syncProcesses Event
processEventManager.add(syncProcesses);
} catch ( e) {
LOG.error("Failed to Sync Supervisor", e);
// throw new RuntimeException(e);
在jstorm-0.7.1里,Worker daemon实现在jstorm-server/src/main/java目录下com.alipay.dw.jstorm.daemon.worker包。其中Worker.java是Worker daemon的入口。Worker进程的生命周期:1、初始化Tuple序列化功能和数据发送管道;2、创建分配到当前Worker的Tasks;3、初始化并启动接收Tuple dispatcher;4、初始化并启动用于维护Worker间连接线程RefreshConnections,包括创建/维护/销毁节点之间的连接等功能;5、初始化并启动心跳线程WorkerHeartbeatRunable,更新本地目录:$jstorm_local_dir/worker/{workerid}/heartbeats/{workerid};6、初始化并启动发送Tuple线程DrainerRunable;7、注册主线程退出现场数据清理Hook。Worker Daemon初始化流程如下:
public WorkerShutdown execute() throws
//1. Tuple序列化+发送管道LinkedBlockingQueue
WorkerTransfer workerTransfer = getSendingTransfer();
// shutdown task callbacks
//2. 初始化task线程
shutdowntasks = createTasks(workerTransfer);
workerData.setShutdownTasks(shutdowntasks);
//3. WorkerVirtualPort:tuple接收dispatcher
// create virtual port object
// when worker receives tupls, dispatch targetTask according to task_id
// conf, supervisorId, topologyId, port, mqContext, taskids
WorkerVirtualPort virtual_port = new WorkerVirtualPort(workerData);
Shutdownable virtual_port_shutdown = virtual_port.launch();
//3. RefreshConnections:维护节点间的连接:创建新连接|维护已建立连接|销毁无用连接
// refresh connection
RefreshConnections refreshConn = makeRefreshConnections();
AsyncLoopThread refreshconn = new AsyncLoopThread(refreshConn);
// refresh ZK active status
RefreshActive refreshZkActive = new RefreshActive(workerData);
AsyncLoopThread refreshzk = new AsyncLoopThread(refreshZkActive);
//4. WorkerHeartbeatRunable:心跳线程
// 每次心跳更新本地目录数据 $LOCAL_PATH/workers/{worker-id}/Heartbeats/{worker-id}
// refresh hearbeat to Local dir
RunnableCallback heartbeat_fn = new WorkerHeartbeatRunable(workerData);
AsyncLoopThread hb = new AsyncLoopThread(heartbeat_fn, false, null,
.NORM_PRIORITY, true);
//5. DrainerRunable:发送tuple线程
// transferQueue, nodeportSocket, taskNodeport
DrainerRunable drainer = new DrainerRunable(workerData);
AsyncLoopThread dr = new AsyncLoopThread(drainer, false, null,
.MAX_PRIORITY, true);
AsyncLoopThread[] threads = { refreshconn, refreshzk, hb, dr };
//6. 注册主线程退出数据清理hook
return new WorkerShutdown(workerData, shutdowntasks,
virtual_port_shutdown, threads);
根据任务在Topology中不同节点角色,Task相应也会分成SpoutTask和BoltTask,二者除Task心跳及公共数据初始化等相同以外,各自有独立处理逻辑。核心实现在SpoutExecutors.java/BoltExecutors.java。
SpoutExecutors主要做两件事情:1、作为DAG起点,负责发送原始Tuple数据;2、如果Topology定义了Acker,SpoutExecutors会启动接收ack线程,根据接收到的ack决定是否重发Tuple;
BoltExecutor相比SpoutExecutor功能会稍微复杂:1、接收从上游发送过来的Tuple,并根据Topology中定义的处理逻辑进行处理;2、如果该Bolt存在下游,需要向下游发送新生成的Tuple;3、如果Topology中定义了Acker,Bolt需要将经过简单计算的ack返回给根Spout。
本文介绍了Supervisor/Worker/Task在整个JStorm中完成的工作及其实现逻辑和关键流程的源码剖析,其中难免存在不足和错误,欢迎交流指导。
五、参考文献
[1]Storm社区.&[2]JStorm源码.&[3]Storm源码.&[4]Jonathan Leibiusky, Gabriel Eisbruch, etc. Getting Started with Storm.. O&Reilly Media, Inc.[5]Xumingming Blog.&[6]量子恒道官方博客.&
阅读(...) 评论()主题信息(必填)
主题描述(最多限制在50个字符)
申请人信息(必填)
申请信息已提交审核,请注意查收邮件,我们会尽快给您反馈。
如有疑问,请联系
傻丫头和高科技产物小心翼翼的初恋
如今的编程是一场程序员和上帝的竞赛,程序员要开发出更大更好、傻瓜都会用到软件。而上帝在努力创造出更大更傻的傻瓜。目前为止,上帝是赢的。个人网站:。个人QQ群:、
编程小菜鸟
个人大数据技术博客:
一只文艺范的软件攻城狮,Keep Learn,Always.

我要回帖

更多关于 jstorm github 的文章

 

随机推荐