文章目录PostgreSQL 15.7 CDC → Flink → Kafka 操作笔记版本选型第一步环境准备安装依赖如果没有 jdk11设置环境变量替换之前用 dnf 安装的 Java 8验证第二步源码编译安装 PostgreSQL 15.7配置 PostgreSQL 支持逻辑复制CDC 核心创建测试数据库和表第三步安装 KafkaKRaft 模式无需 ZooKeeper配置 KRaft 模式修改 Kafka KRaft 配置第四步安装 Flink 1.18.1启动 Flink 集群第五步下载 CDC Kafka 连接器 Jar 包重启 Flink 使 jar 包生效第六步启动 Flink SQL CDC 任务方式一通过 Flink SQL Client 交互式执行推荐实验方式二一键脚本方式非交互式第七步验证 CDC 数据流7.1 查看 Kafka Consumer 输出7.2 在 PostgreSQL 中执行 DML 操作观察实时同步7.3 通过 Flink Web UI 查看任务状态7.4 查看复制槽状态第八步常用管理命令实验架构图故障排查1. Java 版本不对2. Flink 启动失败3. CDC 连接失败4. Kafka Consumer 没有数据5. 端口冲突清理实验环境PostgreSQL 15.7 CDC → Flink → Kafka 操作笔记实验环境openEuler 22.03 (LTS-SP4) x86_64版本选型组件版本说明PostgreSQL15.7源码编译安装Kafka3.9.0使用 KRaft 模式无需 ZooKeeperFlink1.18.1流处理引擎JavaOpenJDK 11Flink / Kafka 运行依赖Flink CDC3.2.1PostgreSQL CDC 连接器第一步环境准备安装依赖# 安装编译 PostgreSQL 所需的依赖dnfinstall-ygcc gcc-cmakereadline-devel zlib-devel bison flex perl# 安装 Java 11Flink Kafka 运行所需dnfinstall-yjava-11-openjdk java-11-openjdk-devel# 验证java-versionjavac-version# 设置 JAVA_HOME可选但建议设置echoexport JAVA_HOME/usr/lib/jvm/java-11-openjdk~/.bashrcechoexport PATH$JAVA_HOME/bin:$PATH~/.bashrcsource~/.bashrc如果没有 jdk11下载 Eclipse Temurin OpenJDK 11免安装版cd/optwgethttps://github.com/adoptium/temurin11-binaries/releases/download/jdk-11.0.22%2B7/OpenJDK11U-jdk_x64_linux_hotspot_11.0.22_7.tar.gztar-xzfOpenJDK11U-jdk_x64_linux_hotspot_11.0.22_7.tar.gzmvjdk-11.0.227 /opt/jdk-11设置环境变量替换之前用 dnf 安装的 Java 8cat~/.bashrcEOF export JAVA_HOME/opt/jdk-11 export PATH$JAVA_HOME/bin:$PATH EOFsource~/.bashrc验证java-version# 预期: openjdk version 11.0.22 ...第二步源码编译安装 PostgreSQL 15.7# 创建源码目录mkdir-p/opt/srccd/opt/src# 下载 PostgreSQL 15.7 源码wgethttps://ftp.postgresql.org/pub/source/v15.7/postgresql-15.7.tar.gztar-xzfpostgresql-15.7.tar.gzcdpostgresql-15.7# 编译配置安装到 /postgresql/pgsql./configure--prefix/postgresql/pgsql# 编译 - 使用4个并行任务make-j4# 安装makeinstall# 创建 postgres 用户useradd-r-s/bin/bash-m-d/home/postgres postgres# 创建数据目录mkdir-p/postgresql/pgsql/datachown-Rpostgres:postgres /postgresql/pgsql/data# 设置环境变量写入 postgres 用户和 root 用户的 bashrccat/home/postgres/.bashrcEOF export PGHOME/postgresql/pgsql export PGDATA/postgresql/pgsql/data export PATH$PGHOME/bin:$PATH export LD_LIBRARY_PATH$PGHOME/lib:$LD_LIBRARY_PATH EOFcat/root/.bashrcEOF export PGHOME/postgresql/pgsql export PGDATA/postgresql/pgsql/data export PATH$PGHOME/bin:$PATH export LD_LIBRARY_PATH$PGHOME/lib:$LD_LIBRARY_PATH EOFsource/root/.bashrc# 初始化数据库su- postgres-c/postgresql/pgsql/bin/initdb -D /postgresql/pgsql/data# 启动 PostgreSQLsu- postgres-c/postgresql/pgsql/bin/pg_ctl -D /postgresql/pgsql/data -l /postgresql/pgsql/data/pg.log start# 验证连接su- postgres-c/postgresql/pgsql/bin/psql -c SELECT version();配置 PostgreSQL 支持逻辑复制CDC 核心# 编辑 postgresql.confcat/postgresql/pgsql/data/postgresql.confEOF # CDC 逻辑复制配置 wal_level logical # WAL 级别设为 logical max_wal_senders 10 # 最大 WAL 发送进程数 max_replication_slots 10 # 最大复制槽数 wal_sender_timeout 60s listen_addresses * # 允许远程连接同一机器可不用 EOF# 配置 pg_hba.conf允许本地和远程连接replication 连接echolocal replication all trust/postgresql/pgsql/data/pg_hba.confechohost replication all 0.0.0.0/0 trust/postgresql/pgsql/data/pg_hba.confecholocal all all trust/postgresql/pgsql/data/pg_hba.confechohost all all 127.0.0.1/32 trust/postgresql/pgsql/data/pg_hba.conf# 重启 PostgreSQL 使配置生效su- postgres-c/postgresql/pgsql/bin/pg_ctl -D /postgresql/pgsql/data -l /postgresql/pgsql/data/pg.log restart# 验证 wal_levelsu- postgres-c/postgresql/pgsql/bin/psql -c SHOW wal_level;# 预期输出: logical创建测试数据库和表su- postgres-c/postgresql/pgsql/bin/psqlEOF -- 创建测试数据库 CREATE DATABASE cdc_test; \c cdc_test -- 创建测试表 CREATE TABLE orders ( id SERIAL PRIMARY KEY, order_no VARCHAR(50) NOT NULL, user_id INT NOT NULL, amount DECIMAL(10, 2) NOT NULL, status VARCHAR(20) DEFAULT pending, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); -- 插入一些测试数据 INSERT INTO orders (order_no, user_id, amount) VALUES (ORD-001, 1001, 99.90); INSERT INTO orders (order_no, user_id, amount) VALUES (ORD-002, 1002, 199.50); INSERT INTO orders (order_no, user_id, amount) VALUES (ORD-003, 1003, 299.00); -- 创建发布Publication用于 CDC CREATE PUBLICATION cdc_pub FOR TABLE orders; -- 验证发布 SELECT * FROM pg_publication; -- 给 Flink CDC 用户创建复制槽的权限使用 postgres 超级用户即可 -- 如果希望单独用户可执行 -- CREATE USER flink_cdc WITH REPLICATION LOGIN PASSWORD flink_cdc_123; -- GRANT CONNECT ON DATABASE cdc_test TO flink_cdc; -- GRANT USAGE ON SCHEMA public TO flink_cdc; -- GRANT SELECT ON orders TO flink_cdc; EOF# ★ 关键必须设置 REPLICA IDENTITY FULL否则 UPDATE/DELETE 的 CDC 事件中 before 字段为 null ★su- postgres-c/postgresql/pgsql/bin/psql -d cdc_test -c\ALTER TABLE orders REPLICA IDENTITY FULL;\# 验证su- postgres-c/postgresql/pgsql/bin/psql -d cdc_test -c\\\d orders\第三步安装 KafkaKRaft 模式无需 ZooKeeper# 下载 Kafka 3.9.0cd/opt# wget https://dlcdn.apache.org/kafka/3.9.0/kafka_2.13-3.9.0.tgzwgethttps://archive.apache.org/dist/kafka/3.9.0/kafka_2.13-3.9.0.tgztar-xzfkafka_2.13-3.9.0.tgzmvkafka_2.13-3.9.0 /opt/kafka# 设置环境变量cat~/.bashrcEOF export KAFKA_HOME/opt/kafka export PATH$KAFKA_HOME/bin:$PATH EOFsource~/.bashrc配置 KRaft 模式# 生成集群 UUIDKAFKA_CLUSTER_ID$($KAFKA_HOME/bin/kafka-storage.sh random-uuid)echoKafka Cluster ID:$KAFKA_CLUSTER_ID# 格式化存储目录$KAFKA_HOME/bin/kafka-storage.shformat-t$KAFKA_CLUSTER_ID-c$KAFKA_HOME/config/kraft/server.properties修改 Kafka KRaft 配置# 备份原始配置cp/opt/kafka/config/kraft/server.properties /opt/kafka/config/kraft/server.properties.bak# 修改关键配置# 修复 Kafka 配置添加 CONTROLLER 安全协议映射# 先获取本机实际 IPSERVER_IP$(ipaddr show|grepinet |grep-v127.0.0.1|head-1|awk{print $2}|cut-d/-f1)echo本机 IP:$SERVER_IP# 配置listeners 用 0.0.0.0 监听所有网卡advertised.listeners 用真实 IPcat/opt/kafka/config/kraft/server.propertiesEOF process.rolesbroker,controller node.id1 controller.quorum.voters1${SERVER_IP}:9093 # ★ bind 用 0.0.0.0广播用真实 IP ★ listenersPLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 advertised.listenersPLAINTEXT://${SERVER_IP}:9092,CONTROLLER://${SERVER_IP}:9093 controller.listener.namesCONTROLLER listener.security.protocol.mapCONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL log.dirs/tmp/kafka-logs num.partitions3 num.network.threads3 num.io.threads8 socket.send.buffer.bytes102400 socket.receive.buffer.bytes102400 socket.request.max.bytes104857600 offsets.topic.replication.factor1 transaction.state.log.replication.factor1 transaction.state.log.min.isr1 EOF# 重新格式化并启动rm-rf/tmp/kafka-logsKAFKA_CLUSTER_ID$($KAFKA_HOME/bin/kafka-storage.sh random-uuid)$KAFKA_HOME/bin/kafka-storage.shformat-t$KAFKA_CLUSTER_ID-c$KAFKA_HOME/config/kraft/server.propertiesnohup$KAFKA_HOME/bin/kafka-server-start.sh /opt/kafka/config/kraft/server.properties/tmp/kafka.log21sleep10# 验证 Kafka 是否启动成功nc-zvlocalhost9092# 预期输出: Ncat: Connected to ::1:9092.# 如果返回 Connected说明 Kafka 正常# 创建实验用的 topiccdc_orders_output3 个分区1 个副本$KAFKA_HOME/bin/kafka-topics.sh--create\--bootstrap-server localhost:9092\--topiccdc_orders_output\--partitions3\--replication-factor1# 查看 topic 列表$KAFKA_HOME/bin/kafka-topics.sh--list--bootstrap-server localhost:9092# 启动一个 console consumer 来实时查看 CDC 数据在另一个终端运行或后台运行nohup$KAFKA_HOME/bin/kafka-console-consumer.sh\--bootstrap-server localhost:9092\--topiccdc_orders_output\--from-beginning/tmp/kafka_cdc_consumer.log21echoKafka 搭建完成第四步安装 Flink 1.18.1# 下载 Flinkcd/optwgethttps://dlcdn.apache.org/flink/flink-1.18.1/flink-1.18.1-bin-scala_2.12.tgztar-xzfflink-1.18.1-bin-scala_2.12.tgzmvflink-1.18.1 /opt/flink# 设置环境变量cat~/.bashrcEOF export FLINK_HOME/opt/flink export PATH$FLINK_HOME/bin:$PATH EOFsource~/.bashrc启动 Flink 集群# 修改 Flink 配置调整内存等sed-is/jobmanager.memory.process.size: 1600m/jobmanager.memory.process.size: 1024m//opt/flink/conf/flink-conf.yamlsed-is/taskmanager.memory.process.size: 1728m/taskmanager.memory.process.size: 1024m//opt/flink/conf/flink-conf.yaml# 并行度设为 1实验环境echoparallelism.default: 1/opt/flink/conf/flink-conf.yaml# 启动 Flink 集群$FLINK_HOME/bin/start-cluster.sh# 验证 Flink 是否启动sleep5curl-shttp://localhost:8081/overview|python3-mjson.tool2/dev/null||jq.2/dev/nullechoFlink Web UI: http://$(hostname-I|awk{print $1}):8081echoFlink 集群搭建完成第五步下载 CDC Kafka 连接器 Jar 包# 下载 Flink CDC 连接器cd/opt/flink/lib/# Flink PostgreSQL CDC Connectorwgethttps://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-postgres-cdc/3.2.1/flink-sql-connector-postgres-cdc-3.2.1.jar# Flink Kafka Connectorwgethttps://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.2.0-1.18/flink-sql-connector-kafka-3.2.0-1.18.jar# 验证 jar 包ls-lh/opt/flink/lib/flink-sql-connector-*.jarecho连接器 jar 包下载完成请重启 Flink 集群使 jar 生效重启 Flink 使 jar 包生效$FLINK_HOME/bin/stop-cluster.shsleep3$FLINK_HOME/bin/start-cluster.shsleep5# 验证curl-shttp://localhost:8081/overview|python3-mjson.tool2/dev/null第六步启动 Flink SQL CDC 任务方式一通过 Flink SQL Client 交互式执行推荐实验# 启动 Flink SQL Client$FLINK_HOME/bin/sql-client.sh在 SQL Client 中逐条执行以下 SQL-- 1. 设置 checkpoint 间隔CDC 必须SETexecution.checkpointing.interval3s;-- 2. 创建 PostgreSQL CDC 源表-- 注意请根据实际情况修改 hostname 和密码DROPTABLEIFEXISTSorders_source;CREATETABLEorders_source(idINT,order_no STRING,user_idINT,amountDECIMAL(10,2),statusSTRING,created_atTIMESTAMP(3),PRIMARYKEY(id)NOTENFORCED)WITH(connectorpostgres-cdc,hostnamelocalhost,port5432,usernamepostgres,passwordpaswd,database-namecdc_test,schema-namepublic,table-nameorders,slot.nameflink_cdc_slot,decoding.plugin.namepgoutput);-- 3. 创建 Kafka 输出表CREATETABLEorders_sink(idINT,order_no STRING,user_idINT,amountDECIMAL(10,2),statusSTRING,created_atTIMESTAMP(3),PRIMARYKEY(id)NOTENFORCED)WITH(connectorkafka,topiccdc_orders_output,properties.bootstrap.serverslocalhost:9092,properties.group.idflink-cdc-group,formatdebezium-json,scan.startup.modeearliest-offset);-- 4. 启动 CDC 同步任务将 CDC 数据写入 KafkaINSERTINTOorders_sinkSELECT*FROMorders_source;方式二一键脚本方式非交互式创建一个 SQL 文件然后提交# 创建 SQL 文件cat/tmp/cdc_job.sqlSQLEOF SET execution.checkpointing.interval 3s; CREATE TABLE orders_source ( id INT, order_no STRING, user_id INT, amount DECIMAL(10, 2), status STRING, created_at TIMESTAMP(3), PRIMARY KEY (id) NOT ENFORCED ) WITH ( connector postgres-cdc, hostname localhost, port 5432, username postgres, password , database-name cdc_test, schema-name public, table-name orders, slot.name flink_cdc_slot, decoding.plugin.name pgoutput ); DROP TABLE IF EXISTS orders_sink; CREATE TABLE orders_sink ( id INT, order_no STRING, user_id INT, amount DECIMAL(10, 2), status STRING, created_at TIMESTAMP(3), PRIMARY KEY (id) NOT ENFORCED ) WITH ( connector kafka, topic cdc_orders_output, properties.bootstrap.servers localhost:9092, properties.group.id flink-cdc-group, format debezium-json, scan.startup.mode earliest-offset ); INSERT INTO orders_sink SELECT * FROM orders_source; SQLEOF# 提交 Flink SQL 任务$FLINK_HOME/bin/sql-client.sh-f/tmp/cdc_job.sql第七步验证 CDC 数据流7.1 查看 Kafka Consumer 输出# 在新终端中查看 Kafka 消费的数据从最早开始消费$KAFKA_HOME/bin/kafka-console-consumer.sh\--bootstrap-server localhost:9092\--topiccdc_orders_output\--from-beginning你应该能看到类似以下的 CDC 数据Debezium JSON 格式{before:null,after:{id:1,order_no:ORD-001,user_id:1001,amount:99.90,status:pending,created_at:2026-05-24T12:00:00Z},source:{version:...,connector:postgresql,name:cdc_test,schema:public,table:orders,...},op:r,ts_ms:...}7.2 在 PostgreSQL 中执行 DML 操作观察实时同步# 登录 PostgreSQLsu- postgres-c/postgresql/pgsql/bin/psql -d cdc_test执行以下 SQL观察 Kafka Consumer 终端是否实时收到变化数据-- INSERT新增一条订单INSERTINTOorders(order_no,user_id,amount,status)VALUES(ORD-004,1004,450.00,confirmed);-- UPDATE更新订单状态UPDATEordersSETstatusshippedWHEREorder_noORD-001;-- DELETE删除一条订单DELETEFROMordersWHEREorder_noORD-003;每次执行后Kafka Consumer 终端应该立即收到对应的 CDC 变更事件INSERT 事件op: c(create)UPDATE 事件op: u(update)包含before和afterDELETE 事件op: d(delete)after为 null7.3 通过 Flink Web UI 查看任务状态# 查看 Flink Web UI用浏览器打开echoFlink Dashboard: http://$(hostname-I|awk{print $1}):8081在 Web UI 中可以看到运行的 Job 详情Source/Sink 的吞吐量Checkpoint 状态Task 分布7.4 查看复制槽状态su- postgres-c/postgresql/pgsql/bin/psql -c SELECT slot_name, slot_type, database, active, restart_lsn FROM pg_replication_slots;第八步常用管理命令# PostgreSQL # 启动 PostgreSQLsu- postgres-c/postgresql/pgsql/bin/pg_ctl -D /postgresql/pgsql/data -l /postgresql/pgsql/data/pg.log start# 停止 PostgreSQLsu- postgres-c/postgresql/pgsql/bin/pg_ctl -D /postgresql/pgsql/data stop# 重启 PostgreSQLsu- postgres-c/postgresql/pgsql/bin/pg_ctl -D /postgresql/pgsql/data restart# 查看 PostgreSQL 状态su- postgres-c/postgresql/pgsql/bin/pg_ctl -D /postgresql/pgsql/data status# Kafka # 启动 Kafkanohup$KAFKA_HOME/bin/kafka-server-start.sh /opt/kafka/config/kraft/server.properties/tmp/kafka.log21# 停止 Kafka$KAFKA_HOME/bin/kafka-server-stop.sh# 查看 topic 信息$KAFKA_HOME/bin/kafka-topics.sh--describe--bootstrap-server localhost:9092--topiccdc_orders_output# Flink # 启动$FLINK_HOME/bin/start-cluster.sh# 停止$FLINK_HOME/bin/stop-cluster.sh# 查看运行中的 Job$FLINK_HOME/bin/flink list# 取消某个 Job替换 job_id# $FLINK_HOME/bin/flink cancel job_id# 查看所有进程 jps-l实验架构图┌──────────────┐ CDC (pgoutput) ┌──────────────┐ Kafka Sink ┌──────────────┐ │ │ ──────────────────► │ │ ──────────────► │ │ │ PostgreSQL │ logical decoding │ Flink │ debezium-json │ Kafka │ │ (pgoutput) │ │ CDC Job │ │ Topic │ │ │ │ │ │ │ └──────────────┘ └──────────────┘ └──────────────┘ WAL Level: logical 实时捕获变更 cdc_orders_output Publication: cdc_pub 3 partitions Slot: flink_cdc_slot debezium-json format故障排查1. Java 版本不对# 确保使用 Java 11java-version# 如果安装了多个 Java 版本使用 alternatives 切换alternatives--configjava2. Flink 启动失败# 查看日志tail-100/opt/flink/log/flink-*-standalonesession-*.log# 常见问题端口被占用修改端口# sed -i s/rest.port: 8081/rest.port: 18081/ /opt/flink/conf/flink-conf.yaml3. CDC 连接失败# 检查 PostgreSQL WAL 级别su- postgres-c/postgresql/pgsql/bin/psql -c SHOW wal_level;# 必须是 logical# 检查复制槽su- postgres-c/postgresql/pgsql/bin/psql -c SELECT * FROM pg_replication_slots;# 检查发布su- postgres-c/postgresql/pgsql/bin/psql -c SELECT * FROM pg_publication;# Flink 日志tail-200/opt/flink/log/flink-*-taskexecutor-*.log|grep-ierror4. Kafka Consumer 没有数据# 检查 topic 是否存在$KAFKA_HOME/bin/kafka-topics.sh--list--bootstrap-server localhost:9092# 查看 topic 消息数$KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell\--broker-list localhost:9092--topiccdc_orders_output5. 端口冲突# 查看端口占用netstat-tlnp|grep-E5432|9092|8081清理实验环境# 停止 Flink$FLINK_HOME/bin/stop-cluster.sh# 停止 Kafka$KAFKA_HOME/bin/kafka-server-stop.sh# 停止 PostgreSQLsu- postgres-c/postgresql/pgsql/bin/pg_ctl -D /postgresql/pgsql/data stop