PostgreSQL
TapData Cloud 为您提供了云端服务,适合需要快速部署、低前期投资场景,帮助您更好地专注于业务发展而非基础设施管理,注册账号即可领取免费的 Agent 实例,欢迎试用。TapData Enterprise 可部署在您的本地数据中心,适合对数据敏感性或网络隔离有严格要求的场景,可服务于构建实时数仓,实现实时数据交换,数据迁移等场景。TapData Community 是一个开源的数据集成平台,提供基础的数据同步和转换功能,可通过 Docker 一键部署,帮助您快速探索和实施数据集成项目。随着项目或企业发展,您可以平滑升级至 TapData Cloud 或 TapData Enterprise,以获得更多高级功能或服务支持。PostgreSQL是功能强大的开源对象关系数据库管理系统(ORDBMS)。完成 Agent 部署后,您可以跟随本文教程在 TapData 中添加 PostgreSQL 数据源,后续可将其作为源或目标库来构建数据管道。
支持版本
PostgreSQL 9.4、9.5、9.6、10.x、11.x、12 版本
增量数据读取原理
为实现增量数据的读取,TapData 需借助 PostgreSQL 的逻辑解码功能,提取提交到事务日志中的更改,并通过插件以用户友好的方式处理这些更改。支持的变更数据捕获(CDC)如下:
- 逻辑解码(Logical Decoding):用于从 WAL 日志中解析逻辑变更事件
- 复制协议(Replication Protocol):提供了消费者实时订阅(甚至同步订阅)数据库变更的机制
- 快照导出(Export Snapshot):允许导出数据库的一致性快照(pg_export_snapshot)
- 复制槽(Replication Slot):用于保存消费者偏移量,跟踪订阅者进度。
准备工作
作为源库
以管理员身份登录 PostgreSQL 数据库。
创建用户并授权。
执行下述格式的命令,创建用于数据同步/开发任务的账号。
CREATE USER username WITH PASSWORD 'password';
- username:用户名。
- password:密码。
执行下述格式的命令,授予账号权限。
-- 进入要授权的数据库
\c database_name
-- 授予目标 Schema 的表读取权限
GRANT SELECT ON ALL TABLES IN SCHEMA schema_name TO username;
-- 授予目标 Schema 的 USAGE 权限
GRANT USAGE ON SCHEMA schema_name TO username;
-- 授予复制权限,仅需读取数据库的全量数据,则无需执行
ALTER USER username REPLICATION;- database_name:数据库名称。
- schema_name:Schema 名称。
- username:用户名。
提示如仅需读取 PostgreSQL 的全量数据(不包含增量变更),则无需执行后续步骤。
执行下述格式的命令,修改复制标识为 FULL(使用整行作为标识),该属性决定了当数据发生 UPDATE/DELETE 时,日志记录的字段。
ALTER TABLE 'schema_name'.'table_name' REPLICA IDENTITY FULL;
- schema_name:Schema 名称。
- table_name:表名称。
登录 PostgreSQL 所属的服务器,根据业务需求和版本选择要安装的解码器插件:
Wal2json(9.4 及以上)
如果源表没有主键,则无法实现同步执行删除操作。
Decoderbufs(9.6 及以上)
Pgoutput(10.0 及以上)
接下来,我们以 Wal2json 为例演示安装流程。
提示本案例中,PostgreSQL 为 12 版本,安装在 CentOS 7 操作系统上,如您的环境与本案例不同,需要调整下述步骤中安装的开发包版本、环境变量的路径等。
添加仓库包。
yum install https://download.postgresql.org/pub/repos/yum/reporpms/EL-7-x86_64/pgdg-redhat-repo-latest.noarch.rpm
安装 PostgreSQL 12 开发包。
yum install -y postgresql12-devel
设置环境变量并使其生效。
export PATH=$PATH:/usr/pgsql-12/bin
source /etc/profile安装环境依赖,包含 llvm、clang、gcc 等。
yum install -y devtoolset-7-llvm centos-release-scl devtoolset-7-gcc* llvm5.0
依次执行下述命令,完成插件的安装。
# 克隆并进入目录
git clone https://github.com/eulerto/wal2json.git && cd wal2json
# 进入 scl 的 devtoolset 环境
scl enable devtoolset-7 bash
# 编译安装
make && make install执行命令
vim /var/lib/pgsql/12/data/postgresql.conf
修改配置文件,将wal_level
的值修改为logical
。提示如果 PostgreSQL 的版本为 9.4, 9.5、9.6,还需要设置
max_replication_slots
和max_wal_senders
的值为 10在业务低峰期,重启 PostgreSQL 服务。
service postgresql-12.service restart
(可选)测试日志插件。
连接 postgres 数据库,切换至需要同步的数据库并创建一张测试表。
-- 假设需要同步的数据库为 demodata,模型为 public
\c demodata
CREATE TABLE public.test_decode
(
uid integer not null
constraint users_pk
primary key,
name varchar(50),
age integer,
score decimal
);创建 Slot 连接,以 wal2json 插件为例。
SELECT * FROM pg_create_logical_replication_slot('slot_test', 'wal2json');
对测试表插入一条数据。
INSERT INTO public.test_decode (uid, name, age, score)
VALUES (1, 'Jack', 18, 89);监听日志并查看返回结果,是否有刚才插入操作的信息。
SELECT * FROM pg_logical_slot_peek_changes('slot_test', null, null);
返回示例如下(竖向显示):
lsn | 0/3E38E60
xid | 610
data | {"change":[{"kind":"insert","schema":"public","table":"test_decode","columnnames":["uid","name","age","score"],"columntypes":["integer","character varying(50)","integer","numeric"],"columnvalues":[1,"Jack",18,89]}]}确认无问题后,可销毁 Slot 连接并删除测试表。
SELECT * FROM pg_drop_replication_slot('slot_test');
DROP TABLE public.test_decode;
(可选)如需使用最后更新时间戳的方式进行增量同步,您需要执行下述步骤。
在源数据库中,执行下述命令创建公共函数,需替换 schema 名称。
CREATE OR REPLACE FUNCTION schema_name.update_lastmodified_column()
RETURNS TRIGGER LANGUAGE plpgsql AS $$
BEGIN
NEW.last_update = now();
RETURN NEW;
END;
$$;创建字段和 trigger,每个表均需执行一次,例如表名为 mytable。
// 创建 last_update 字段
ALTER TABLE schema_name.mytable ADD COLUMN last_udpate timestamp DEFAULT now();
// 创建 trigger
CREATE TRIGGER trg_uptime BEFORE UPDATE ON schema_name.mytable FOR EACH ROW EXECUTE PROCEDURE
update_lastmodified_column();
作为目标库
以管理员身份登录 PostgreSQL 数据库。
执行下述格式的命令,创建用于数据同步/开发任务的账号。
CREATE USER username WITH PASSWORD 'password';
- username:用户名。
- password:密码。
执行下述格式的命令,为数据库账号授予权限。
-- 进入要授权的数据库
\c database_name;
-- 授予目标 Schema 的 USAGE 和 CREATE 权限
GRANT CREATE,USAGE ON SCHEMA schemaname TO username;
-- 授予目标 Schema 的表读写权限
GRANT SELECT,INSERT,UPDATE,DELETE,TRUNCATE ON ALL TABLES IN SCHEMA schemaname TO username;- database_name:数据库名称。
- schema_name:Schema 名称。
- username:用户名。
添加数据源
在左侧导航栏,单击连接管理。
单击页面右侧的创建。
在弹出的对话框中,搜索并选择 PostgreSQL。
在跳转到的页面,根据下述说明填写 PostgreSQL 的连接信息。
- 连接信息设置
- 连接名称:填写具有业务意义的独有名称。
- 连接类型:支持将 PostgreSQL 作为源或目标库。
- 地址:数据库连接地址。
- 端口:数据库的服务端口。
- 数据库:数据库名称,即一个连接对应一个数据库,如有多个数据库则需创建多个数据连接。
- 模型:Schema 名称。
- 额外参数:额外的连接参数,默认为空。
- 账号:数据库的账号。
- 密码:数据库账号对应的密码。
- 日志插件:如需读取 PostgreSQL 的数据变更,实现增量数据同步,您需要根据准备工作的指引,完成插件的安装。
- 高级设置
- 时区:默认为数据库所用的时区,您也可以根据业务需求手动指定。
- 共享挖掘:挖掘源库的增量日志,可为多个任务共享源库的增量日志,避免重复读取,从而最大程度上减轻增量同步对源库的压力,开启该功能后还需要选择一个外存用来存储增量日志信息。
- 包含表:默认为全部,您也可以选择自定义并填写包含的表,多个表之间用英文逗号(,)分隔。
- 排除表:打开该开关后,可以设定要排除的表,多个表之间用英文逗号(,)分隔。
- Agent 设置:默认为平台自动分配,您也可以手动指定 Agent。
- 模型加载频率:数据源中模型数量大于 1 万时,TapData 将按照设置的时间定期刷新模型。
- SSL 设置:选择是否开启 SSL 连接数据源,可进一步提升数据安全性,开启该功能后还需要上传 CA 文件、客户端证书、密钥填写客户端密码。
- 连接信息设置
单击连接测试,测试通过后单击保存。
提示如提示连接测试失败,请根据页面提示进行修复。
异常处理
当 CDC 意外中断后,可能导致 Slot 连接无法从 PostgreSQL 主节点删除,此时需手动登录主节点,删除相关 Slot 避免一直占用。
-- 查看是否有 slot_name=tapdata 的信息
TABLE pg_replication_slots;
-- 删除 Slot 节点
select * from pg_drop_replication_slot('tapdata');