This topic describes how to use the INCLUDE
clause when creating a table or source with an external connector to extract fields not included in the payload as separate columns. The payload refers to the actual content or information carried by a message, event, or record, defined in the schema when creating a source or table.
INCLUDE
clause.
<column_name>
is not specified, a default one will be generated in the format _rw_{connector}_{col}
, where connector
is the name of the source connector used (Kafka, Pulsar, Kinesis, etc.), and col
is the type of column being generated (key, offset, timestamp, etc.). For instance, if an offset column is added to a Kafka source, the default column name would be _rw_kafka_offset
.
This clause should be included in your CREATE SOURCE
or CREATE TABLE
command after the schema is defined.
UPSERT
types of sources and tables, INCLUDE KEY
is required as RisingWave will use this column to perform upsert semantics. A primary key cannot be defined as multiple columns in this case.
INCLUDE
clause can be used with the following source connectors.
Allowed components | Default type | Note |
---|---|---|
key | bytea | Can be overwritten by ENCODE and KEY ENCODE . |
timestamp | timestamp with time zone (i64 in millis) | Refer to CreateTime rather than LogAppendTime . |
partition | varchar | The partition the message is from. |
offset | varchar | The offset in the partition. |
headers | struct<varchar, bytea>[] | Key-value pairs along with the message. |
payload | json | The actual content or data of the message. Only supports JSON format. |
List[Struct<Varchar, Bytea>]
.
bytea
header, where the column content will be specified as the value associated with the specified key, header_col
. The header_col
field can only be defined when including a header. In this case, the generated column name will have the format _rw_kafka_header_{header col name}_{col type}
, where col type
is the data type of the header column.
Allowed components | Default type | Note |
---|---|---|
key | bytea | Can be overwritten by ENCODE and KEY ENCODE . |
timestamp | timestamp with time zone | See the approximate_arrival_timestamp field at Struct aws_sdk_kinesis::types::Record. |
partition | varchar | The partition the message is from. |
offset | varchar | The offset in the partition, which corresponds to Kinesis sequence numbers. |
payload | json | The actual content or data of the message. Only supports JSON format. |
Allowed components | Default type | Note |
---|---|---|
key | bytea | Can be overwritten by ENCODE and KEY ENCODE . |
partition | varchar | The partition the message is from. |
offset | varchar | The offset in the partition. |
payload | json | The actual content or data of the message. Only supports JSON format. |
Allowed components | Default type | Note |
---|---|---|
file | varchar | The file the record is from. |
offset | varchar | The offset in the file. |
payload | json | The actual content or data of the message. Only supports JSON format. |
Allowed components | Default type | Note |
---|---|---|
partition | varchar | The topic the record is from. |
additional_columns
, that ingests data from a Kafka broker. Aside from the a
column, which is part of the message payload, the additional fields key
, partition
, offset
, timestamp
, header
, and payload
are also added to the table.