Ingest additional fields with INCLUDE clause
This topic describes how to use the INCLUDE
clause when creating a table or source with an external connector to extract fields not included in the payload as separate columns. The payload refers to the actual content or information carried by a message, event, or record, defined in the schema when creating a source or table.
Syntax
To add additional columns, use the INCLUDE
clause.
If <column_name>
is not specified, a default one will be generated in the format _rw_{connector}_{col}
, where connector
is the name of the source connector used (Kafka, Pulsar, Kinesis, etc.), and col
is the type of column being generated (key, offset, timestamp, etc.). For instance, if an offset column is added to a Kafka source, the default column name would be _rw_kafka_offset
.
This clause should be included in your CREATE SOURCE
or CREATE TABLE
command after the schema is defined.
Note that for UPSERT
types of sources and tables, INCLUDE KEY
is required as RisingWave will use this column to perform upsert semantics. A primary key cannot be defined as multiple columns in this case.
Supported connectors
The INCLUDE
clause can be used with the following source connectors.
Kafka
When ingesting data from Kafka, the following additional fields can be included.
Allowed components | Default type | Note |
---|---|---|
key | bytea | Can be overwritten by ENCODE and KEY ENCODE . |
timestamp | timestamp with time zone (i64 in millis) | Refer to CreateTime rather than LogAppendTime . |
partition | varchar | The partition the message is from. |
offset | varchar | The offset in the partition. |
headers | struct<varchar, bytea>[] | Key-value pairs along with the message. |
payload | json | The actual content or data of the message. Only supports JSON format. |
In the case of headers, there are two ways to define it.
You can choose to generate headers with type List[Struct<Varchar, Bytea>]
.
Or you can generate a type bytea
header, where the column content will be specified as the value associated with the specified key, header_col
. The header_col
field can only be defined when including a header. In this case, the generated column name will have the format _rw_kafka_header_{header col name}_{col type}
, where col type
is the data type of the header column.
Kinesis
When ingesting data from Kinesis, here are some things to note when including the following fields.
Allowed components | Default type | Note |
---|---|---|
key | bytea | Can be overwritten by ENCODE and KEY ENCODE . |
timestamp | timestamp with time zone | See the approximate_arrival_timestamp field at Struct aws_sdk_kinesis::types::Record. |
partition | varchar | The partition the message is from. |
offset | varchar | The offset in the partition, which corresponds to Kinesis sequence numbers. |
payload | json | The actual content or data of the message. Only supports JSON format. |
For more components, see Struct aws_sdk_kinesis::types::Record.
Pulsar
When ingesting data from Pulsar, here are some things to note when including the following fields.
Allowed components | Default type | Note |
---|---|---|
key | bytea | Can be overwritten by ENCODE and KEY ENCODE . |
partition | varchar | The partition the message is from. |
offset | varchar | The offset in the partition. |
payload | json | The actual content or data of the message. Only supports JSON format. |
For more components, see Struct pulsar::message::proto::MessageMetadata.
S3, GCS, and Azure Blob
When ingesting data from AWS S3, GCS or Azure Blob, the following additional fields can be included.
Allowed components | Default type | Note |
---|---|---|
file | varchar | The file the record is from. |
offset | varchar | The offset in the file. |
payload | json | The actual content or data of the message. Only supports JSON format. |
MQTT
When ingesting data from MQTT, the following additional fields can be included.
Allowed components | Default type | Note |
---|---|---|
partition | varchar | The topic the record is from. |
Examples
Here we create a table, additional_columns
, that ingests data from a Kafka broker. Aside from the a
column, which is part of the message payload, the additional fields key
, partition
, offset
, timestamp
, header
, and payload
are also added to the table.
Was this page helpful?