Kafka-Enhanced
TapData Cloud 为您提供了云端服务,适合需要快速部署、低前期投资场景,帮助您更好地专注于业务发展而非基础设施管理,注册账号即可领取免费的 Agent 实例,欢迎试用。TapData Enterprise 可部署在您的本地数据中心,适合对数据敏感性或网络隔离有严格要求的场景,可服务于构建实时数仓,实现实时数据交换,数据迁移等场景。Apache Kafka 是一个分布式数据流处理平台,可以实时发布、订阅、存储和处理数据流。Kafka-Enhanced 是一款升级版的 Kafka 连接器,支持标准事件结构和原生 Kafka 数据结构进行数据传输,突破旧版 Kafka 连接器仅支持 JSON Object 的限制,可将非 JSON Object 结构数据加载到应用中进行处理,同时提供更可靠的断点续传机制。
本文将介绍如何在 TapData 中添加 Kafka-Enhanced 数据源,后续可将其作为源或目标库来构建实时数据链路,可应用于构建实时数仓等场景。
Kafka-Enhanced 数据源在 3.15 版本开始支持。
支持版本与架构
- 版本:Kafka 2.0 ~ 2.5(基于 Scala 2.12 构建)
- 架构:单机或集群架构
支持数据类型
类别 | 数据类型 |
---|---|
布尔 | BOOLEAN |
整数 | SHORT、INTEGER、LONG |
浮点数 | FLOAT、DOUBLE |
数值 | NUMBER |
字符串 | CHAR(作为源库时支持)、VARCHAR、STRING、TEXT |
二进制 | BINARY |
复合类型 | ARRAY、MAP、OBJECT(作为源库时支持) |
日期/时间 | TIME、DATE、DATETIME、TIMESTAMP |
唯一标识符 | UUID(作为源库时支持) |
结构模式与同步说明
您可以根据业务需求在配置 Kafka 连接器时选择以下两种结构模式:
- 标准结构(默认)
- 原生结构
说明:支持同步完整的 DML 操作(INSERT、UPDATE、DELETE),作为源时解析并还原 DML + DDL 事件,作用于下游;作为目标时标准化存储这些事件,便于后续任务解析。
典型案例:在 CDC 缓存队列 中,使用“标准结构”将关系型数据变更事件从 MySQL 写入 Kafka,再消费写入其它数据库。
样例数据:
{
"ts": 1727097087513,
"op": "DML:UPDATE",
"opTs": 1727097087512,
"namespaces": [],
"table": "表名",
"before": {},
"after": {},
}
- ts:应用解析事件的时间戳,用于记录事件被解析的时间点。
- op:事件类型,指示操作的具体类型,如
DML:INSERT
、DML:UPDATE
、DML:DELETE
。 - opTs:事件发生时间戳,表示数据变更实际发生的时间。
- namespaces:多 schema 场景下的 schema 名称集合。
- table:表名,指示数据变更所在的表。
- before:修改前的数据内容,仅在
UPDATE
和DELETE
操作时有值。 - after:修改后的数据内容,适用于
INSERT
和UPDATE
操作。
说明:采用原生 Kafka 的数据同步方式,仅支持追加写入,类似 INSERT
,作为源时处理复杂无规律的数据,传递至下游;作为目标时灵活控制分区、头信息、键和值信息,更自由地写入自定义数据。
典型案例:用于 同构数据迁移 或 非结构化数据转换,通过 Kafka -> JS 处理节点 -> Kafka/MySQL 的数据链路实现数据过滤和转换。
样例数据:
{
"offset": 12345,
"timestampType": "LogAppendTime",
"partition": 3,
"timestamp": 1638349200000,
"headers": {
"headerKey1": "headerValue1",
},
"key": "user123",
"value": {
"id": 1,
"name": "John Doe",
"action": "login",
"timestamp": "2021-12-01T10:00:00Z"
}
}
- offset:偏移量标识消息位点,不写入目标消息体。
- timestampType:时间戳类型,仅用于元数据,不作用于消息体。
- partition:指定消息写入的分区号,有值时按指定分区写入。
- timestamp:消息创建时间,有值时使用指定时间,否则为系统时间。
- headers:消息头部信息,存在时写入头部,携带额外元数据。
- key:消息键值,用于分区策略或标识消息来源。
- value:消息内容,承载实际业务数据。
消费说明
在后续配置数据复制/数据转换任务时,可通过右上角的任务设置,指定数据同步方式,对应的消费说明如下:
- 仅全量:从第一条数据开始读取,直到记录的增量位点后停止任务。
- 全量 + 增量:同样从第一条数据读取到记录位点,然后持续同步增量数据。
- 仅增量:选择增量采集的起点为此刻,代表从当前位点开始同步;设置为选择时间,代表根据指定时间换算位点,从该位点开始同步。
由于 Kafka 作为消息队列仅支持追加写入,应避免因源端重复消费带来的目标端出现重复数据。
功能限制
- 数据类型适配:作为数据源时,Kafka 的数据类型需要根据目标数据源的要求进行适当调整,或者手动在目标端创建相应的表结构以确保兼容性。
- 消息推送保障:由于 Kafka 采用
At least once
消息推送语义,并且只支持追加写入,可能导致重复消费。在目标端,需通过实现幂等性保障,避免因源端重复消费而产生重复数据。 - 消费模式限制:消费线程采用不同的消费组编号,请注意对消费并发度的影响。
- 安全认证限制:当前仅支持免认证的 Kafka 实例。
连接 Kafka-Enhanced
在左侧导航栏,单击连接管理。
在页面右侧,单击创建连接。
在跳转到的页面,搜索并选择 Kafka-Enhanced。
根据下述说明完成数据源配置。
- 连接设置
- 连接名称:填写具有业务意义的独有名称。
- 连接类型:支持将 Kafka 作为源或目标库。
- 连接地址:Kafka 连接地址,包含地址和端口号,两者之间用英文冒号(:)分隔,例如
113.222.22.***:9092
。 - 结构模式:基于业务需求选择:
- 标准结构(默认):支持同步完整的 DML 操作(INSERT、UPDATE、DELETE),作为源时解析并还原 DML + DDL 事件,作用于下游;作为目标时标准化存储这些事件,便于后续任务解析。
- 原生结构:采用原生 Kafka 的数据同步方式,仅支持追加写入,类似
INSERT
,作为源时处理复杂无规律的数据,传递至下游;作为目标时灵活控制分区、头信息、键和值信息,更自由地写入自定义数据。
- 键序列器、值序列器:选择键和值的序列化方式,例如 Binary(默认)。
- 高级设置
- ACK 确认机制:根据业务需求选择:不确认、仅写入 Master 分区、写入大多数 ISR 分区(默认)或写入所有 ISR 分区。
- 压缩类型:支持 lz4(默认)、gzip、snappy、zstd,消息量较大时可开启压缩以提高传输效率。
- 扩展配置:支持自定义 Kafka 管理者、生产者和消费者的高级连接属性,用于特定场景下的优化。
- 共享挖掘:挖掘源库的增量日志,可为多个任务共享源库的增量日志,避免重复读取,从而最大程度上减轻增量同步对源库的压力,开启该功能后还需要选择一个外存用来存储增量日志信息。
- 包含表:默认为全部,您也可以选择自定义并填写包含的表,多个表之间用英文逗号(,)分隔。
- 排除表:打开该开关后,可以设定要排除的表,多个表之间用英文逗号(,)分隔。
- Agent 设置:默认为平台自动分配,您也可以手动指定 Agent。
- 模型加载时间:如果数据源中的模型数量少于10000个,则每小时更新一次模型信息。但如果模型数量超过10000个,则刷新将在您指定的时间每天进行。
- 连接设置
单击连接测试,测试通过后单击保存。
提示如提示连接测试失败,请根据页面提示进行修复。
节点高级配置
在配置数据同步/转换任务时,将 Kafka-Enhanced 作为源或目标节点时,为更好满足业务复杂需求,最大化发挥性能,TapData 为其内置更多高级特性能力,您可以基于业务需求配置:
作为源节点
最大读并发数:默认
1
,即单线程读,当设置大于1
且主题 + 分区数大于1
时生效,取两者最小值。作为目标节点
副本数:默认
1
,创建主题时使用,主题存在时不生效。分区数:默认
3
,创建主题时使用,当配置大于对应主题的分区数时会自动扩充分区。