跳到主要内容
版本:3.x

PostgreSQL

适用版本Cloud 功能TapData Cloud 为您提供了云端服务,适合需要快速部署、低前期投资场景,帮助您更好地专注于业务发展而非基础设施管理,注册账号即可领取免费的 Agent 实例,欢迎试用。企业版功能TapData Enterprise 可部署在您的本地数据中心,适合对数据敏感性或网络隔离有严格要求的场景,可服务于构建实时数仓,实现实时数据交换,数据迁移等场景。社区版功能TapData Community 是一个开源的数据集成平台,提供基础的数据同步和转换功能,可通过 Docker 一键部署,帮助您快速探索和实施数据集成项目。随着项目或企业发展,您可以平滑升级至 TapData Cloud 或 TapData Enterprise,以获得更多高级功能或服务支持。

PostgreSQL是功能强大的开源对象关系数据库管理系统(ORDBMS)。完成 Agent 部署后,您可以跟随本文教程在 TapData 中添加 PostgreSQL 数据源,后续可将其作为源或目标库来构建数据管道。

支持版本

PostgreSQL 9.4、9.5、9.6、10.x、11.x、12 版本

增量数据读取原理

为实现增量数据的读取,TapData 需借助 PostgreSQL 的逻辑解码功能,提取提交到事务日志中的更改,并通过插件以用户友好的方式处理这些更改。支持的变更数据捕获(CDC)如下:

  • 逻辑解码(Logical Decoding):用于从 WAL 日志中解析逻辑变更事件
  • 复制协议(Replication Protocol):提供了消费者实时订阅(甚至同步订阅)数据库变更的机制
  • 快照导出(Export Snapshot):允许导出数据库的一致性快照(pg_export_snapshot)
  • 复制槽(Replication Slot):用于保存消费者偏移量,跟踪订阅者进度。

准备工作

作为源库

  1. 以管理员身份登录 PostgreSQL 数据库。

  2. 创建用户并授权。

    1. 执行下述格式的命令,创建用于数据同步/开发任务的账号。

      CREATE USER username WITH PASSWORD 'password';
      • username:用户名。
      • password:密码。
    2. 执行下述格式的命令,授予账号权限。

      -- 进入要授权的数据库
      \c database_name

      -- 授予目标 Schema 的表读取权限
      GRANT SELECT ON ALL TABLES IN SCHEMA schema_name TO username;

      -- 授予目标 Schema 的 USAGE 权限
      GRANT USAGE ON SCHEMA schema_name TO username;

      -- 授予复制权限,仅需读取数据库的全量数据,则无需执行
      ALTER USER username REPLICATION;
      • database_name:数据库名称。
      • schema_name:Schema 名称。
      • username:用户名。
      提示

      如仅需读取 PostgreSQL 的全量数据(不包含增量变更),则无需执行后续步骤。

  3. 执行下述格式的命令,修改复制标识为 FULL(使用整行作为标识),该属性决定了当数据发生 UPDATE/DELETE 时,日志记录的字段。

    ALTER TABLE 'schema_name'.'table_name' REPLICA IDENTITY FULL;   
    • schema_name:Schema 名称。
    • table_name:表名称。
  4. 登录 PostgreSQL 所属的服务器,根据业务需求和版本选择要安装的解码器插件:

    • Wal2json(9.4 及以上)

      如果源表没有主键,则无法实现同步执行删除操作。

    • Decoderbufs(9.6 及以上)

    • Pgoutput(10.0 及以上)

    接下来,我们以 Wal2json 为例演示安装流程。

    提示

    本案例中,PostgreSQL 为 12 版本,安装在 CentOS 7 操作系统上,如您的环境与本案例不同,需要调整下述步骤中安装的开发包版本、环境变量的路径等。

    1. 添加仓库包。

      yum install https://download.postgresql.org/pub/repos/yum/reporpms/EL-7-x86_64/pgdg-redhat-repo-latest.noarch.rpm
    2. 安装 PostgreSQL 12 开发包。

      yum install -y postgresql12-devel
    3. 设置环境变量并使其生效。

      export PATH=$PATH:/usr/pgsql-12/bin
      source /etc/profile
    4. 安装环境依赖,包含 llvm、clang、gcc 等。

      yum install -y devtoolset-7-llvm centos-release-scl devtoolset-7-gcc* llvm5.0
    5. 依次执行下述命令,完成插件的安装。

      # 克隆并进入目录
      git clone https://github.com/eulerto/wal2json.git && cd wal2json

      # 进入 scl 的 devtoolset 环境
      scl enable devtoolset-7 bash

      # 编译安装
      make && make install
    6. 执行命令 vim /var/lib/pgsql/12/data/postgresql.conf 修改配置文件,将 wal_level 的值修改为 logical

      提示

      如果 PostgreSQL 的版本为 9.4, 9.5、9.6,还需要设置 max_replication_slotsmax_wal_senders 的值为 10

    7. 在业务低峰期,重启 PostgreSQL 服务。

      service postgresql-12.service restart
  1. (可选)测试日志插件。

    1. 连接 postgres 数据库,切换至需要同步的数据库并创建一张测试表。

      -- 假设需要同步的数据库为 demodata,模型为 public
      \c demodata

      CREATE TABLE public.test_decode
      (
      uid integer not null
      constraint users_pk
      primary key,
      name varchar(50),
      age integer,
      score decimal
      );
    2. 创建 Slot 连接,以 wal2json 插件为例。

      SELECT * FROM pg_create_logical_replication_slot('slot_test', 'wal2json');
    3. 对测试表插入一条数据。

      INSERT INTO public.test_decode (uid, name, age, score)
      VALUES (1, 'Jack', 18, 89);
    4. 监听日志并查看返回结果,是否有刚才插入操作的信息。

      SELECT * FROM pg_logical_slot_peek_changes('slot_test', null, null);

      返回示例如下(竖向显示):

      lsn  | 0/3E38E60
      xid | 610
      data | {"change":[{"kind":"insert","schema":"public","table":"test_decode","columnnames":["uid","name","age","score"],"columntypes":["integer","character varying(50)","integer","numeric"],"columnvalues":[1,"Jack",18,89]}]}
    5. 确认无问题后,可销毁 Slot 连接并删除测试表。

      SELECT * FROM pg_drop_replication_slot('slot_test');
      DROP TABLE public.test_decode;
  2. (可选)如需使用最后更新时间戳的方式进行增量同步,您需要执行下述步骤。

    1. 在源数据库中,执行下述命令创建公共函数,需替换 schema 名称。

      CREATE OR REPLACE FUNCTION schema_name.update_lastmodified_column()
      RETURNS TRIGGER LANGUAGE plpgsql AS $$
      BEGIN
      NEW.last_update = now();
      RETURN NEW;
      END;
      $$;
    2. 创建字段和 trigger,每个表均需执行一次,例如表名为 mytable

      // 创建 last_update 字段
      ALTER TABLE schema_name.mytable ADD COLUMN last_udpate timestamp DEFAULT now();

      // 创建 trigger
      CREATE TRIGGER trg_uptime BEFORE UPDATE ON schema_name.mytable FOR EACH ROW EXECUTE PROCEDURE
      update_lastmodified_column();

作为目标库

  1. 以管理员身份登录 PostgreSQL 数据库。

  2. 执行下述格式的命令,创建用于数据同步/开发任务的账号。

    CREATE USER username WITH PASSWORD 'password';
    • username:用户名。
    • password:密码。
  3. 执行下述格式的命令,为数据库账号授予权限。

    -- 进入要授权的数据库
    \c database_name;

    -- 授予目标 Schema 的 USAGE 和 CREATE 权限
    GRANT CREATE,USAGE ON SCHEMA schemaname TO username;

    -- 授予目标 Schema 的表读写权限
    GRANT SELECT,INSERT,UPDATE,DELETE,TRUNCATE ON ALL TABLES IN SCHEMA schemaname TO username;
    • database_name:数据库名称。
    • schema_name:Schema 名称。
    • username:用户名。

添加数据源

  1. 登录 TapData 平台

  2. 在左侧导航栏,单击连接管理

  3. 单击页面右侧的创建

  4. 在弹出的对话框中,搜索并选择 PostgreSQL

  5. 在跳转到的页面,根据下述说明填写 PostgreSQL 的连接信息。

    PostgreSQl 连接示例

    • 连接信息设置
      • 连接名称:填写具有业务意义的独有名称。
      • 连接类型:支持将 PostgreSQL 作为源或目标库。
      • 地址:数据库连接地址。
      • 端口:数据库的服务端口。
      • 数据库:数据库名称,即一个连接对应一个数据库,如有多个数据库则需创建多个数据连接。
      • 模型:Schema 名称。
      • 额外参数:额外的连接参数,默认为空。
      • 账号:数据库的账号。
      • 密码:数据库账号对应的密码。
      • 日志插件:如需读取 PostgreSQL 的数据变更,实现增量数据同步,您需要根据准备工作的指引,完成插件的安装。
    • 高级设置
      • 时区:默认为数据库所用的时区,您也可以根据业务需求手动指定。
      • 共享挖掘挖掘源库的增量日志,可为多个任务共享源库的增量日志,避免重复读取,从而最大程度上减轻增量同步对源库的压力,开启该功能后还需要选择一个外存用来存储增量日志信息。
      • 包含表:默认为全部,您也可以选择自定义并填写包含的表,多个表之间用英文逗号(,)分隔。
      • 排除表:打开该开关后,可以设定要排除的表,多个表之间用英文逗号(,)分隔。
      • Agent 设置:默认为平台自动分配,您也可以手动指定 Agent。
      • 模型加载频率:数据源中模型数量大于 1 万时,TapData 将按照设置的时间定期刷新模型。
    • SSL 设置:选择是否开启 SSL 连接数据源,可进一步提升数据安全性,开启该功能后还需要上传 CA 文件、客户端证书、密钥填写客户端密码。
  6. 单击连接测试,测试通过后单击保存

    提示

    如提示连接测试失败,请根据页面提示进行修复。

异常处理

当 CDC 意外中断后,可能导致 Slot 连接无法从 PostgreSQL 主节点删除,此时需手动登录主节点,删除相关 Slot 避免一直占用。

-- 查看是否有 slot_name=tapdata 的信息
TABLE pg_replication_slots;

-- 删除 Slot 节点
select * from pg_drop_replication_slot('tapdata');