VIP会员一折促销,仅需200元/年!
当前位置: 首页 > 资讯 > 商旅生涯 > MirrorMakerV2源码解读

MirrorMakerV2源码解读

放大字体  缩小字体 发布日期:2021-10-24 浏览次数:0


启动脚本剖析

启动脚本为connect-mirror-maker.sh,可以通过KAFKA_LOG4J_OPTS指定log4j配置文件,可以通过KAFKA_HEAP_OPTS指定堆内存(默认为-Xms256M –Xmx2G),具体类路径为:

org.apache.kafka.connect.mirror.MirrorMaker

MirrorMaker类剖析

main方法首先通过argparse4j(第三方实现的解析命令行的工具)解析命令行参数,返回一个Namespace包装类。

ArgumentParser parser = ArgumentParsers.newArgumentParser("connect-mirror-maker");
parser.description("MirrorMaker 2.0 driver");
parser.addArgument("config").type(Arguments.fileType().verifyCanRead())
    .metavar("mm2.properties").required(true)
    .help("MM2 configuration file.");
parser.addArgument("--clusters").nargs("+").metavar("CLUSTER").required(false)
    .help("Target cluster to use for this node."); Namespace ns; try {
    ns = parser.parseArgs(args);
} catch (ArgumentParserException e) {
    parser.handleError(e); Exit.exit(-1); return;
}

题外话:这个Namespace类设计的很方便。该类实际包装一个Map(存储解析出来的命令行参数KV),通过一个get()方法返回泛型类结果,然后getString()、getByte()、getShort()、getInt()、getLong()、getFloat()、getDouble()、getBoolean()、getList()等方法调用get()方法获取指定数据类型的返回结果。这样的设计很方便获取各种类型的命令行参数值,完全可以复用在别的日常开发中。Namespace具体类结构如图所示:

MirrorMakerV2源码解读


MirrorMaker类实例化方法

继续正题,解析connect-mirror-maker.sh命令行参数可以获取到配置文件和clusters参数。然后就可以实例化MirrorMaker类。

MirrorMaker mirrorMaker = new MirrorMaker(config, clusters, Time.SYSTEM);

该类的构造方法中用一个Set类型herderPairs变量存放配置文件中配置的replicaiton flow流向(如:x-y或者y->x)。默认情况下,即使x->y.enabled=false,x->y也会被存放到herderPairs内;除非emit.heartbeats.enabled=false或者
x-y.emit.heartbeats.enabled=false时,x-y才不会被存放到herderPairs内。这么做的原因是:

对于开启emit.hearts的replicationflow(如A->B),是需要两个herders的。一个是A->B属于真实的replicaiton flow同步topic数据,即MirrorSourceConnector;另一个是B->A属于用于监控replicaiton flow健康状态的提交heartbeats到A的MirrorHeartbeatConnector。

log.info("Targeting clusters {}", this.clusters); this.herderPairs = config.clusterPairs().stream()
    .filter(x -> this.clusters.contains(x.target()))
    .collect(Collectors.toSet());

除了上述规则,如果命令行参数显式设置了clusters参数,herderPairs内存储的SourceAndTarget pairs还需要满足target cluster属于clusters参数的子集。这么做的原因是考虑MirrorMaker优先运行在target cluster原则:该原则主要是考虑异常情况下,避免无效读写请求。

然后,遍历herderPairs变量,每个变量(SourceAndTarget)均执行addHerder()方法。AddHerder()方法主要功能:实例化一个Kafka Connect框架Wroker类,该Worker类会运行多个线程执行多个tasks(即读Kafka或者写Kafka)。

Worker worker = new Worker(workerId, time, plugins, distributedConfig, offsetBackingStore, CLIENT_CONFIG_OVERRIDE_POLICY);

再基于Worker对象,实例化Herder接口类,该接口有两个实现类:StandaloneHerder、DistributedHerder。MirrorMaker类使用的是DistributedHerder。

Herder herder = new DistributedHerder(distributedConfig, time, worker,
        kafkaClusterId, statusBackingStore, configBackingStore,
        advertisedUrl, CLIENT_CONFIG_OVERRIDE_POLICY, sharedAdmin);

然后,将每一个SourceAndTarget和对应的herder,存储到Map类型的herders变量中。

private final Map herders = new HashMap<>();
herders.put(sourceAndTarget, herder);

关于Herder接口和两个实现类:

Herder接口用于跟踪和管理workers和connectors。
StandaloneHerder属于单进程,使用基于内存的MemoryStatusBackingStore和MemoryConfigBackingStore,用于standalone模式Kafka Connect进程。
DistributedHerder使用基于Kafka的KafkaStatusBackingStore和KafkaConfigBackingStore,在多个进程之间协调workers,底层基于Kafka group membership实现group managed;每个加入group的DistributedHerder实例,上报它的configuration state。Group协调器给每个实例分配一些connectors和tasks执行,分配策略采用简单的round-ronbin方式。但这也不是绝对的,为了避免start/stop花销,herder也会采用sticky分配策略。DistributedHerder实例只运行分配给它的connectors和tasks。

补充下几个概念:

Connectors:the high level abstraction that coordinates data streaming by managing tasks
Tasks:the implementation of how data is copied to or from Kafka
Workers:the running processes that execute connectors and tasks

至此,MirrorMaker构造方法结束,MirrorMaker实例化完成。

MirrorMaker类start方法

我们知道,上述构造方法中得到Map类型的herders变量。这里重新遍历下,调用每个Herder实例的start方法(即DistributedHerder类的start方法)。

for (Herder herder : herders.values()) { try { herder.start();
    } finally { startLatch.countDown();
    }
}

DistributedHerder类实例化时,实例化了一个线程数为1的herderExecutor线程池。

this.herderExecutor = new ThreadPoolExecutor(1, 1, 0L,
        TimeUnit.MILLISECONDS, new LinkedBlockingDeque(1),
        ThreadUtils.createThreadFactory( this.getClass().getSimpleName() + "-" + clientId + "-%d", false));

DistributedHerder类实现了Runnable接口,其start方法将自身提交到上述线程池运行。

@Override public void start() { this.herderExecutor.submit(this);
}

DistributedHerder类run方法通过调用startServices()方法以启动worker、statusBackingStore、configBackingStore服务。

protected void startServices() { this.worker.start(); this.statusBackingStore.start(); this.configBackingStore.start();
}

最后,MirrorMaker类start()方法会遍历Set类型的herderPairs变量,每一个变量执行configureConnectors()方法,该方法最终调用DistributedHerder类的putConnectorConfig方法。网站源码

herderPairs.forEach(x -> configureConnectors(x));

putConnectorConfig将具体的SourceAndTarget所对应的replication flow任务提交到上述已经完成启动的DistributedHerder各种服务中。

至此,MIrrorMakerV2.0启动完成!!


“如果发现本网站发布的资讯影响到您的版权,可以联系本站!同时欢迎来本站投稿!

0条 [查看全部]  相关评论