Change Data Capture (CDC) refers to the process of identifying and capturing data changes in a database, and then delivering the changes to a downstream service in real time.
wal_level
is logical
. Check by using the following statement.replica
. For CDC, you will need to set it to logical in the database configuration file (postgresql.conf
) or via a psql
command. The following command will change the wal_level
.wal_level
requires a restart of the PostgreSQL instance and can affect database performance.max_wal_senders
to be greater than or equal to the number of synced tables. By default, max_wal_senders
is 10.REPLICATION
, LOGIN
,and CREATEDB
role attributes to the user.
For an existing user, run the following statement to assign the attributes:
ALTER USER <username> REPLICATION LOGIN CREATEDB;
For a new user, run the following statement to create the user and assign the attributes:
CREATE USER <username> REPLICATION LOGIN CREATEDB;
You can check your role attributes by using the \du
psql command:pg_table_name
) which we are selecting from.
Field | Notes |
---|---|
hostname | Hostname of the database. |
port | Port number of the database. |
username | Username of the database. |
password | Password of the database. |
database.name | Name of the database. |
schema.name | Optional. Name of the schema. By default, the value is public. |
table.name | Name of the table that you want to ingest data from. |
slot.name | Optional. The replication slot for this PostgreSQL source. By default, a unique slot name will be randomly generated. Each source should have a unique slot name. Valid replication slot names must contain only lowercase letters, numbers, and underscores, and be no longer than 63 characters. |
auto.schema.change | Optional. Specify whether you want to enable replicating Postgres table schema change. |
ssl.mode | Optional. The ssl.mode parameter determines the level of SSL/TLS encryption for secure communication with Postgres. Accepted values are disabled , preferred , required , verify-ca , and verify-full . The default value is disabled .
|
ssl.root.cert | Optional. Specify the root certificate secret. You must create secret first and then use it here. |
publication.name | Optional. Name of the publication. By default, the value is rw_publication . |
publication.create.enable | Optional. By default, the value is true . If publication.name does not exist and this value is true , a publication.name will be created. If publication.name does not exist and this value is false , an error will be returned. |
transactional | Optional. Specify whether you want to enable transactions for the CDC table that you are about to create. By default, the value is true for shared sources, and false otherwise. This feature is also supported for shared CDC sources for multi-table transactions. For performance considerations, transactions involving changes to more than 4096 rows cannot be guaranteed. |
Field | Notes |
---|---|
snapshot | Optional. If false, CDC backfill will be disabled and only upstream events that have occurred after the creation of the table will be consumed. This option can only be applied for tables created from a shared source. |
snapshot.interval | Optional. Specifies the barrier interval for buffering upstream events. The default value is 1. |
snapshot.batch_size | Optional. Specifies the batch size of a snapshot read query from the upstream table. The default value is 1000. |
INCLUDE timestamp AS column_name
clause, it allows you to ingest the upstream commit timestamp. For historical data, the commit timestamp will be set to 1970-01-01 00:00:00+00:00
. Here is an example:
WITH
clause when creating a table or shared source. Add the prefix debezium.
to the connector property you want to include.
For instance, to skip unknown DDL statements, specify the schema.history.internal.skip.unparseable.ddl
parameter as debezium.schema.history.internal.skip.unparseable.ddl
.
postgres-cdc
as the source.
Field | Notes |
---|---|
database_name | Name of the database. |
schema_name | Name of the schema. |
table_name | Name of the table. |
database_name
, schema_name
, table_name
) to provide contextual information about where the data resides within the PostgreSQL database.
FORMAT PLAIN ENCODE JSON
so it does not need to be specified.
tt3
in the schema public
. When specifying the PostgreSQL table name in the FROM
clause after the keyword TABLE
, the schema name must also be specified.
tt4
in the schema ods
.
PostgreSQL type | RisingWave type |
---|---|
BOOLEAN | BOOLEAN |
BIT(1) | BOOLEAN |
BIT( > 1) | No support |
BIT VARYING[(M)] | No support |
SMALLINT, SMALLSERIAL | SMALLINT |
INTEGER, SERIAL | INTEGER |
BIGINT, BIGSERIAL, OID | BIGINT |
REAL | REAL |
DOUBLE PRECISION | DOUBLE PRECISION |
CHAR[(M)] | CHARACTER VARYING |
VARCHAR[(M)] | CHARACTER VARYING |
CHARACTER[(M)] | CHARACTER VARYING |
CHARACTER VARYING[(M)] | CHARACTER VARYING |
TIMESTAMPTZ, TIMESTAMP WITH TIME ZONE | TIMESTAMP WITH TIME ZONE |
TIMETZ, TIME WITH TIME ZONE | TIME WITHOUT TIME ZONE (assume UTC time zone) |
INTERVAL [P] | INTERVAL |
BYTEA | BYTEA |
JSON, JSONB | JSONB |
XML | CHARACTER VARYING |
UUID | CHARACTER VARYING |
POINT | STRUCT (with form <x REAL, y REAL>) |
LTREE | No support |
CITEXT | CHARACTER VARYING* |
INET | CHARACTER VARYING* |
INT4RANGE | CHARACTER VARYING* |
INT8RANGE | CHARACTER VARYING* |
NUMRANGE | CHARACTER VARYING* |
TSRANGE | CHARACTER VARYING* |
TSTZRANGE | CHARACTER VARYING* |
DATERANGE | CHARACTER VARYING* |
ENUM | CHARACTER VARYING* |
DATE | DATE |
TIME(1), TIME(2), TIME(3), TIME(4), TIME(5), TIME(6) | TIME WITHOUT TIME ZONE (limited to [1973-03-03 09:46:40, 5138-11-16 09:46:40)) |
TIMESTAMP(1), TIMESTAMP(2), TIMESTAMP(3) | TIMESTAMP WITHOUT TIME ZONE (limited to [1973-03-03 09:46:40, 5138-11-16 09:46:40)) |
TIMESTAMP(4), TIMESTAMP(5), TIMESTAMP(6), TIMESTAMP | TIMESTAMP WITHOUT TIME ZONE |
NUMERIC[(M[,D])], DECIMAL[(M[,D])] | numeric, rw_int256, or varchar. numeric supports values with a precision of up to 28 digits, and any values beyond this precision will be treated as NULL. To process values exceeding 28 digits, use rw_int256 or varchar instead. When creating a table, make sure to specify the data type of the column corresponding to numeric as rw_int256 or varchar. Note that rw_int256 treats inf, -inf, nan, or numeric with decimal parts as NULL. |
MONEY[(M[,D])] | NUMERIC |
HSTORE | No support |
HSTORE | No support |
INET | CHARACTER VARYING* |
CIDR | CHARACTER VARYING* |
MACADDR | CHARACTER VARYING* |
MACADDR8 | CHARACTER VARYING* |
source
and table_with_connector
models will be used. For more details about these two models, please refer to Use dbt for data transformations.
First, we create a source
model pg_mydb.sql
.
table_with_connector
model tt3.sql
.
*
when creating a table to ingest all columns from the source table. Note that *
cannot be used if other columns are specified in the table creation process.
Below is an example to create a table that ingests all columns from the upstream table from the PostgreSQL database:
DESCRIBE supplier;
publish_via_partition_root = false
. This setting causes replication slot events to contain separate events for each partition, rather than for the root partitioned table.
If you need to read from the partitioned table, you should explicitly set this property to TRUE
when creating a publication. Execute the following command in your upstream PostgreSQL database:
publish_via_partition_root = true
.
Please be aware that PostgreSQL does not support adding both a partitioned table and its individual partitions to the same publication; however, it does not generate an error if attempted. If you need to ingest data from both the root table and its partitions, you should create separate publications for each. Otherwise, you will not be able to read from the table partitions. Meanwhile, in RisingWave, you should create separate sources with dedicated publication names for the partitioned table and its partitions.