public class TunnelWorker extends Object implements ITunnelWorker
2. 固定间隔进行Heartbeat,间隔由TunnelWorkerConfig里的heartbeatIntervalInSec参数决定。 2.1 进行heartbeat请求,从Tunnel服务端获取最新可用的Channel列表,Channel中会包含有ChannelId, Channel的版本和Channel的状态信息。 2.2 将服务端获取到的Channel列表和本地内存中的Channel列表进行Merge,然后进行ChannelConnect的新建和update,规则大致如下 1) Merge: 相同ChannelId,认定版本号更大的为最新状态,直接进行覆盖,若未出现的Channel,则直接插入。 2) 新建ChannelConnect: 若此Channel未新建有其对应的ChannelConnect,则会新建一个WAIT状态的ChannelConnect,若对应的Channel 状态为OPEN状态,则同时会启动该ChannelConnect上的处理数据的循环流水线任务(ReadRecords&&ProcessRecords), 处理详细的细节可以参见ProcessDataPipeline。 3) Update已有ChannelConnect: Merge完成后,若Channel对应的ChannelConnect存在,则根据相同ChannelId的Channel状态来更新 ChannelConnect的状态,比如Channel为Close状态也需要将ChannelConnect的状态置为Closed,进而终止处理任务的流水线任务, 详细的细节可以参见ChannelConnect.notifyStatus方法。
3. 自动化的负载均衡和良好的水平扩展性 运行多个TunnelWorker对同一个Tunnel进行消费时(TunnelId相同), 在TunnelWorker执行Heartbeat时,Tunnel服务端会自动的对Channel资源进行重分配, 让活跃的Channel尽可能的均摊到每一个TunnelWorker上,达到资源负载均衡的目的。同时,在水平扩展性方面,用户可以很容易的通过增加TunnelWorker的 数量来完成,TunnelWorker可以在同一个机器或者不同机器上。
4. 自动化的资源清理和容错处理 4.1 资源清理: 当客户端(TunnelWorker)没有被正常shutdown时(比如异常退出或者手动结束),我们会自动帮用户进行资源的回收,包括释放线程池, 自动调用用户在Channel上注册的shutdown方法,关闭Tunnel连接等。 4.2 容错处理: 当客户端出现Heartbeat超时等非参数类错误时,我们会自动帮用户Renew Connect,以保证数据消费可以稳定的进行持续同步。
构造器和说明 |
---|
TunnelWorker(String tunnelId,
TunnelClientInterface client,
TunnelWorkerConfig workerConfig) |
TunnelWorker(String tunnelId,
TunnelClientInterface client,
TunnelWorkerConfig workerConfig,
IChannelProcessorFactory factory) |
public TunnelWorker(String tunnelId, TunnelClientInterface client, TunnelWorkerConfig workerConfig)
public TunnelWorker(String tunnelId, TunnelClientInterface client, TunnelWorkerConfig workerConfig, IChannelProcessorFactory factory)
public void connectAndWorking() throws Exception
connectAndWorking
在接口中 ITunnelWorker
Exception
public void shutdown()
shutdown
在接口中 ITunnelWorker
Copyright © 2019. All Rights Reserved.