- 所有已实现的接口:
- ITunnelWorker
public class TunnelWorker
extends Object
implements ITunnelWorker
TunnelWorker是基于TableStore数据接口之上的全增量一体化服务,用户可以简单地实现对表中历史存量和新增数据的消费处理。
TunnelWorker的设计哲学是通过每一轮的定时心跳探测(Heartbeat)来进行活跃Channel的探测,Channel和ChannelConnect状态的更新,数据处理任务的初始化、运行和结束等。
TunnelWorker实现自动化数据处理的流程如下:
1. TunnelWorker资源的初始化
1.1 将TunnelWorker状态原子的由Ready置为Started(CAS操作)。
1.2 根据TunnelWorkerConfig里的HeartbeatTimeout和ClientTag(客户端标识)等配置进行ConnectTunnel操作,和Tunnel服务端进行联通,
并获取当前TunnelWorker对应的ClientId。
1.3 初始化ChannelDialer(用于新建ChannelConnect), 每一个ChannelConnect都会和一个Channel一一对应,ChannelConnect上会记录
有数据消费的位点。
1.4 根据用户传入的处理数据的Callback和TunnelWorkerConfig中CheckpointInterval(向服务端记数据位点的间隔)
包装出一个带自动记Checkpoint功能的数据处理器, 详细参见: ChannelProcessFactory。
1.5 初始化TunnelStateMachine(会进行Channel状态机的自动化处理)。
2. 固定间隔进行Heartbeat,间隔由TunnelWorkerConfig里的heartbeatIntervalInSec参数决定。
2.1 进行heartbeat请求,从Tunnel服务端获取最新可用的Channel列表,Channel中会包含有ChannelId, Channel的版本和Channel的状态信息。
2.2 将服务端获取到的Channel列表和本地内存中的Channel列表进行Merge,然后进行ChannelConnect的新建和update,规则大致如下
1) Merge: 相同ChannelId,认定版本号更大的为最新状态,直接进行覆盖,若未出现的Channel,则直接插入。
2) 新建ChannelConnect: 若此Channel未新建有其对应的ChannelConnect,则会新建一个WAIT状态的ChannelConnect,若对应的Channel
状态为OPEN状态,则同时会启动该ChannelConnect上的处理数据的循环流水线任务(ReadRecords&&ProcessRecords),
处理详细的细节可以参见ProcessDataPipeline。
3) Update已有ChannelConnect: Merge完成后,若Channel对应的ChannelConnect存在,则根据相同ChannelId的Channel状态来更新
ChannelConnect的状态,比如Channel为Close状态也需要将ChannelConnect的状态置为Closed,进而终止处理任务的流水线任务,
详细的细节可以参见ChannelConnect.notifyStatus方法。
3. 自动化的负载均衡和良好的水平扩展性
运行多个TunnelWorker对同一个Tunnel进行消费时(TunnelId相同), 在TunnelWorker执行Heartbeat时,Tunnel服务端会自动的对Channel资源进行重分配,
让活跃的Channel尽可能的均摊到每一个TunnelWorker上,达到资源负载均衡的目的。同时,在水平扩展性方面,用户可以很容易的通过增加TunnelWorker的
数量来完成,TunnelWorker可以在同一个机器或者不同机器上。
4. 自动化的资源清理和容错处理
4.1 资源清理: 当客户端(TunnelWorker)没有被正常shutdown时(比如异常退出或者手动结束),我们会自动帮用户进行资源的回收,包括释放线程池,
自动调用用户在Channel上注册的shutdown方法,关闭Tunnel连接等。
4.2 容错处理: 当客户端出现Heartbeat超时等非参数类错误时,我们会自动帮用户Renew Connect,以保证数据消费可以稳定的进行持续同步。