跳到主要内容
版本:3.x

Kafka-Enhanced

适用版本Cloud 功能TapData Cloud 为您提供了云端服务,适合需要快速部署、低前期投资场景,帮助您更好地专注于业务发展而非基础设施管理,注册账号即可领取免费的 Agent 实例,欢迎试用。企业版功能TapData Enterprise 可部署在您的本地数据中心,适合对数据敏感性或网络隔离有严格要求的场景,可服务于构建实时数仓,实现实时数据交换,数据迁移等场景。

Apache Kafka 是一个分布式数据流处理平台,可以实时发布、订阅、存储和处理数据流。Kafka-Enhanced 是一款升级版的 Kafka 连接器,支持标准事件结构和原生 Kafka 数据结构进行数据传输,突破旧版 Kafka 连接器仅支持 JSON Object 的限制,可将非 JSON Object 结构数据加载到应用中进行处理,同时提供更可靠的断点续传机制。

本文将介绍如何在 TapData 中添加 Kafka-Enhanced 数据源,后续可将其作为目标库来构建实时数据链路,可应用于构建实时数仓等场景。

提示

Kafka-Enhanced 数据源在 3.15 版本开始支持。

支持版本与架构

  • 版本:Kafka 2.0 ~ 2.5(基于 Scala 2.12 构建)
  • 架构:单机或集群架构

支持数据类型

类别数据类型
布尔BOOLEAN
整数SHORT、INTEGER、LONG
浮点数FLOAT、DOUBLE
数值NUMBER
字符串CHAR(作为源库时支持)、VARCHAR、STRING、TEXT
二进制BINARY
复合类型ARRAY、MAP、OBJECT(作为源库时支持)
日期/时间TIME、DATE、DATETIME、TIMESTAMP
唯一标识符UUID(作为源库时支持)

结构模式与同步说明

您可以根据业务需求在配置 Kafka 连接器时选择以下两种结构模式:

说明:支持同步完整的 DML 操作(INSERT、UPDATE、DELETE),作为源时解析并还原 DML + DDL 事件,作用于下游;作为目标时标准化存储这些事件,便于后续任务解析。

典型案例:在 CDC 缓存队列 中,使用“标准结构”将关系型数据变更事件从 MySQL 写入 Kafka,再消费写入其它数据库。

样例数据

{
"ts": 1727097087513,
"op": "DML:UPDATE",
"opTs": 1727097087512,
"namespaces": [],
"table": "表名",
"before": {},
"after": {},
}
  • ts:应用解析事件的时间戳,用于记录事件被解析的时间点。
  • op:事件类型,指示操作的具体类型,如 DML:INSERTDML:UPDATEDML:DELETE
  • opTs:事件发生时间戳,表示数据变更实际发生的时间。
  • namespaces:多 schema 场景下的 schema 名称集合。
  • table:表名,指示数据变更所在的表。
  • before:修改前的数据内容,仅在 UPDATEDELETE 操作时有值。
  • after:修改后的数据内容,适用于 INSERTUPDATE 操作。

消费说明

在后续配置数据复制/数据转换任务时,可通过右上角的任务设置,指定数据同步方式,对应的消费说明如下:

  • 仅全量:从第一条数据开始读取,直到记录的增量位点后停止任务。
  • 全量 + 增量:同样从第一条数据读取到记录位点,然后持续同步增量数据。
  • 仅增量:选择增量采集的起点为此刻,代表从当前位点开始同步;设置为选择时间,代表根据指定时间换算位点,从该位点开始同步。
提示

由于 Kafka 作为消息队列仅支持追加写入,应避免因源端重复消费带来的目标端出现重复数据。

功能限制

  • 数据类型适配:作为数据源时,Kafka 的数据类型需要根据目标数据源的要求进行适当调整,或者手动在目标端创建相应的表结构以确保兼容性。
  • 消息推送保障:由于 Kafka 采用 At least once 消息推送语义,并且只支持追加写入,可能导致重复消费。在目标端,需通过实现幂等性保障,避免因源端重复消费而产生重复数据。
  • 消费模式限制:消费线程采用不同的消费组编号,请注意对消费并发度的影响。
  • 安全认证限制:当前仅支持免认证的 Kafka 实例。

连接 Kafka-Enhanced

  1. 登录 Tapdata 平台

  2. 在左侧导航栏,单击连接管理

  3. 在页面右侧,单击创建连接

  4. 在跳转到的页面,搜索并选择 Kafka-Enhanced

  5. 根据下述说明完成数据源配置。

    Kafka Enhanced 连接设置

    • 连接设置
      • 连接名称:填写具有业务意义的独有名称。
      • 连接类型:支持将 Kafka 作为源或目标库。
      • 连接地址:Kafka 连接地址,包含地址和端口号,两者之间用英文冒号(:)分隔,例如 113.222.22.***:9092
      • 结构模式:基于业务需求选择:
        • 标准结构(默认):支持同步完整的 DML 操作(INSERT、UPDATE、DELETE),作为源时解析并还原 DML + DDL 事件,作用于下游;作为目标时标准化存储这些事件,便于后续任务解析。
        • 原生结构:采用原生 Kafka 的数据同步方式,仅支持追加写入,类似 INSERT,作为源时处理复杂无规律的数据,传递至下游;作为目标时灵活控制分区、头信息、键和值信息,更自由地写入自定义数据。
      • 键序列器值序列器:选择键和值的序列化方式,例如 Binary(默认)。
    • 高级设置
      • ACK 确认机制:根据业务需求选择:不确认、仅写入 Master 分区、写入大多数 ISR 分区(默认)或写入所有 ISR 分区。
      • 压缩类型:支持 lz4(默认)、gzipsnappyzstd,消息量较大时可开启压缩以提高传输效率。
      • 扩展配置:支持自定义 Kafka 管理者、生产者和消费者的高级连接属性,用于特定场景下的优化。
      • 共享挖掘挖掘源库的增量日志,可为多个任务共享源库的增量日志,避免重复读取,从而最大程度上减轻增量同步对源库的压力,开启该功能后还需要选择一个外存用来存储增量日志信息。
      • 包含表:默认为全部,您也可以选择自定义并填写包含的表,多个表之间用英文逗号(,)分隔。
      • 排除表:打开该开关后,可以设定要排除的表,多个表之间用英文逗号(,)分隔。
      • Agent 设置:默认为平台自动分配,您也可以手动指定 Agent。
      • 模型加载时间:如果数据源中的模型数量少于10000个,则每小时更新一次模型信息。但如果模型数量超过10000个,则刷新将在您指定的时间每天进行。
  6. 单击连接测试,测试通过后单击保存

    提示

    如提示连接测试失败,请根据页面提示进行修复。

节点高级配置

在配置数据同步/转换任务时,将 Kafka-Enhanced 作为源或目标节点时,为更好满足业务复杂需求,最大化发挥性能,TapData 为其内置更多高级特性能力,您可以基于业务需求配置:

Kafka Enhanced 节点高级设置

  • 作为源节点

    最大读并发数:默认1,即单线程读,当设置大于 1 且主题 + 分区数大于 1 时生效,取两者最小值。

  • 作为目标节点

    • 副本数:默认1,创建主题时使用,主题存在时不生效。

    • 分区数:默认3,创建主题时使用,当配置大于对应主题的分区数时会自动扩充分区。