连接 Kafka
Apache Kafka 是一个分布式数据流处理平台,可以实时发布、订阅、存储和处理数据流。Tapdata 支持将 Kafka 作为源和目标数据库构建数据管道,本文介绍如何在 Tapdata 中添加 Kafka 数据源。
操作步骤
登录 Tapdata 平台。
在左侧导航栏,单击连接管理。
在页面右侧,单击创建连接。
在跳转到的页面,单击认证数据源标签页,然后选择 Kafka。
根据下述说明完成数据源配置。
连接信息设置
- 连接名称:填写具有业务意义的独有名称。
- 连接类型:支持将 Kafka 作为源或目标库。
- 数据库地址:Kafka 连接地址,包含地址和端口号,两者之间用英文冒号(:)分隔,例如
113.222.22.***:9092
。 - 主题表达式:Kafka 中的 Topic,支持正则表达式,长度不超过 256 个字符,需提前创建好。
- Kerberos 认证:如 Kafka 开启了该认证,您需要打开该开关,然后上传并设置密钥、配置等信息。
- 账号、密码、加密方式:如 Kafka 开启了密码认证,您需要填写账号和密码,选择加密方式。
高级设置
忽略非 JSON 对象格式消息:根据业务需求选择是否忽略,如果未忽略且遇到了该格式的消息,将停止拉取。
ACK 确认机制:根据业务需求选择:不确认、仅写入 Master 分区、写入大多数 ISR 分区(默认)或写入所有 ISR 分区。
消息压缩类型:支持 gzip、snappy、lz4、zstd,消息量较大时可开启压缩以提高传输效率。
忽略推送消息异常:打开该开关后,系统仍会记录相关消息的 offset,但后续不会推送,可能存在数据丢失风险。
共享挖掘:打开该开关后,Tapdata 会挖掘增量日志并存储起来,无需重复采集源库的变更信息,可极大地缓解对源库资源的占用。更多介绍,见共享挖掘。
agent 设置:默认为平台自动分配,您也可以手动指定。
模型加载频率:数据源中模型数量大于 1 万时,Tapdata 将按照本参数的设定定期刷新模型。
开启心跳表:当连接类型选择为源头和目标、源头时,支持打开该开关,由 Tapdata 在源库中创建一个名为 _tapdata_heartbeat_table 的心跳表并每隔 10 秒更新一次其中的数据(数据库账号需具备相关权限),用于数据源连接与任务的健康度监测。更多介绍,见通过心跳表监测数据同步链路。
提示数据源需在数据复制/开发任务引用并启动后,心跳任务任务才会启动,此时您可以再次进入该数据源的编辑页面,即可单击查看心跳任务。
单击连接测试,测试通过后单击保存。
提示如提示连接测试失败,请根据页面提示进行修复。
Kafka 消费说明
- 仅全量:从 Topic 中各分区的 earliest offset 开始订阅消费,如果之前存在消息消费记录,则恢复到之前的 offset 开始消费。
- 仅增量:从 Topic 中各分区的 latest offset 开始订阅消费,如果之前存在消息消费记录,则恢复到之前的 offset 开始消费。
- 全量 + 增量:跳过全量同步阶段,从增量阶段开始。
- 如果没有进行过全量同步,则会从 Topic 中各分区的 earliest offset 开始订阅消费,否则从 Topic 中各分区的 latest offset 开始订阅消费。
- 如果之前存在消息消费记录,则会恢复到之前的 offset 开始消费。