网站首页 > 博客文章 正文
Flink CDC在配置mysql时,可以指定几种方式来选择位点: INITIAL、EARLIEST_OFFSET、LATEST_OFFSET、SPECIFIC_OFFSETS、TIMESTAMP、SNAPSHOT。
INITIAL: 全量与增量
EARLIEST_OFFSET:最早位点
LATEST_OFFSET:最近的位点
SPECIFIC_OFFSETS:指定位点
TIMESTAMP:指定时间点
SNAPSHOT:全量
源码分析
设置该类型的cdc同步任务,机制会检查当前存在的binlog文件列表,因为每个文件是按顺序排列,同时对应的时间也是有顺序的,最终是通过二分法进行查找。
public static void main(String[] args) {
MySqlSource.<Data>builder()
.startupOptions(StartupOptions.timestamp(System.currentTimeMillis()))
.build();
}
当设置了cdc任务的类型为TIMESTAMP时,会通过以下的方法来获取对应的binlogfile,具体查看类 BinlogOffsetUtils.java
public static BinlogOffset initializeEffectiveOffset(
BinlogOffset offset, MySqlConnection connection) {
BinlogOffsetKind offsetKind = offset.getOffsetKind();
switch (offsetKind) {
case EARLIEST:
return BinlogOffset.ofBinlogFilePosition("", 0);
case TIMESTAMP:
// 遍历当前所有存在的binlogfile文件,取每个文件的文件头来判断时间
// 所以一定是当前整个文件的数据,也是按binlogfile文件名来读取数据的
return DebeziumUtils.findBinlogOffset(offset.getTimestampSec() * 1000, connection);
case LATEST:
return DebeziumUtils.currentBinlogOffset(connection);
default:
return offset;
}
}
public static BinlogOffset findBinlogOffset(long targetMs, MySqlConnection connection) {
MySqlConnection.MySqlConnectionConfiguration config = connection.connectionConfig();
BinaryLogClient client =
new BinaryLogClient(
config.hostname(), config.port(), config.username(), config.password());
List<String> binlogFiles = new ArrayList<>();
JdbcConnection.ResultSetConsumer rsc =
rs -> {
while (rs.next()) {
String fileName = rs.getString(1);
long fileSize = rs.getLong(2);
if (fileSize > 0) {
binlogFiles.add(fileName);
}
}
};
try {
// 获取mysql系统内存在的binlog
connection.query("SHOW BINARY LOGS", rsc);
LOG.info("Total search binlog: {}", binlogFiles);
if (binlogFiles.isEmpty()) {
return BinlogOffset.ofBinlogFilePosition("", 0);
}
// 搜索最接近的binlog文件
String binlogName = searchBinlogName(client, targetMs, binlogFiles);
return BinlogOffset.ofBinlogFilePosition(binlogName, 0);
} catch (Exception e) {
throw new FlinkRuntimeException(e);
}
}
private static String searchBinlogName(
BinaryLogClient client, long targetMs, List<String> binlogFiles)
throws IOException, InterruptedException {
int startIdx = 0;
int endIdx = binlogFiles.size() - 1;
// 因为binlog文件名是递增的,同时时间也是递增的
// 以二分法进行查找
while (startIdx <= endIdx) {
int mid = startIdx + (endIdx - startIdx) / 2;
long midTs = getBinlogTimestamp(client, binlogFiles.get(mid));
if (midTs < targetMs) {
startIdx = mid + 1;
} else if (targetMs < midTs) {
endIdx = mid - 1;
} else {
return binlogFiles.get(mid);
}
}
return endIdx < 0 ? binlogFiles.get(0) : binlogFiles.get(endIdx);
}
从以上的逻辑可以看到,当指定了timestamp时,会从最接近的那个binlog文件开始从头开始读取数据,那会不会多读很多数据呢?答案是否定的,当从找到的binlog文件中读取数据后,真正在处理的时候,会再判断一次当前的事件是否在指定的时间范围内,代码在 MySqlBinlogSplitReadTask.java
protected void handleEvent(
MySqlPartition partition, MySqlOffsetContext offsetContext, Event event) {
// 当从binlog读取数据后,进行一次过滤
if (!eventFilter.test(event)) {
return;
}
super.handleEvent(partition, offsetContext, event);
// check do we need to stop for read binlog for snapshot split.
if (isBoundedRead()) {
final BinlogOffset currentBinlogOffset =
RecordUtils.getBinlogPosition(offsetContext.getOffset());
// reach the high watermark, the binlog reader should finished
if (currentBinlogOffset.isAtOrAfter(binlogSplit.getEndingOffset())) {
// send binlog end event
try {
signalEventDispatcher.dispatchWatermarkEvent(
binlogSplit,
currentBinlogOffset,
SignalEventDispatcher.WatermarkKind.BINLOG_END);
} catch (InterruptedException e) {
LOG.error("Send signal event error.", e);
errorHandler.setProducerThrowable(
new DebeziumException("Error processing binlog signal event", e));
}
// tell reader the binlog task finished
((StoppableChangeEventSourceContext) context).stopChangeEventSource();
}
}
}
eventFilter由BinlogSplitReader在创建MySqlBinlogSplitReadTask时处理。
private Predicate<Event> createEventFilter(BinlogOffset startingOffset) {
// 当是TIMESTAMP类型时,需要将小于指定时间的事件进行移除
if (BinlogOffsetKind.TIMESTAMP.equals(startingOffset.getOffsetKind())) {
long startTimestampSec = startingOffset.getTimestampSec();
return event ->
EventType.HEARTBEAT.equals(event.getHeader().getEventType())
|| event.getHeader().getTimestamp() >= startTimestampSec * 1000;
}
return event -> true;
}
- 上一篇: 深入探讨HBASE
- 下一篇: MySQL日期时间函数 ADDDATE() 用法
猜你喜欢
- 2024-12-06 超详细canal入门,看这篇就够了
- 2024-12-06 MySQL专题1: 字段和索引
- 2024-12-06 这些MySQL面试题集锦,据说知名互联网公司都用
- 2024-12-06 mysql数据库一天五万条以上的增量,预计运维三年,怎么优化
- 2024-12-06 Mysql时间格式转化——DATE_FORMAT()
- 2024-12-06 RPA数据库功能应用及时间戳获取
- 2024-12-06 Mysql日期函数、时间函数的实际应用
- 2024-12-06 MySQL时间、字符串、戳的转换
- 2024-12-06 MySQL系列-第2篇:MySQL中数据类型介绍
- 2024-12-06 Mysql:TIMESTAMP、DATETIME、BIGINT。该如何选择?
你 发表评论:
欢迎- 最近发表
- 标签列表
-
- powershellfor (55)
- messagesource (56)
- aspose.pdf破解版 (56)
- promise.race (63)
- 2019cad序列号和密钥激活码 (62)
- window.performance (66)
- qt删除文件夹 (72)
- mysqlcaching_sha2_password (64)
- ubuntu升级gcc (58)
- nacos启动失败 (64)
- ssh-add (70)
- jwt漏洞 (58)
- macos14下载 (58)
- yarnnode (62)
- abstractqueuedsynchronizer (64)
- source~/.bashrc没有那个文件或目录 (65)
- springboot整合activiti工作流 (70)
- jmeter插件下载 (61)
- 抓包分析 (60)
- idea创建mavenweb项目 (65)
- vue回到顶部 (57)
- qcombobox样式表 (68)
- vue数组concat (56)
- tomcatundertow (58)
- pastemac (61)
本文暂时没有评论,来添加一个吧(●'◡'●)