Syntax

To add additional columns, use the INCLUDE clause.

INCLUDE { header | key | offset | partition | timestamp | payload } [AS <column_name>]

If <column_name> is not specified, a default one will be generated in the format _rw_{connector}_{col}, where connector is the name of the source connector used (Kafka, Pulsar, Kinesis, etc.), and col is the type of column being generated (key, offset, timestamp, etc.). For instance, if an offset column is added to a Kafka source, the default column name would be _rw_kafka_offset.

This clause should be included in your CREATE SOURCE or CREATE TABLE command after the schema is defined.

CREATE { SOURCE | TABLE } name [(
    schema_definition )]
[INCLUDE ... [AS ...]]
WITH ( ... )
FORMAT data_format ENCODE data_encode (
    ...
);

Note that for UPSERT types of sources and tables, INCLUDE KEY is required as RisingWave will use this column to perform upsert semantics. A primary key cannot be defined as multiple columns in this case.

Supported connectors

The INCLUDE clause can be used with the following source connectors.

Kafka

When ingesting data from Kafka, the following additional fields can be included.

Allowed componentsDefault typeNote
keybyteaCan be overwritten by ENCODE and KEY ENCODE.
timestamptimestamp with time zone (i64 in millis)Refer to CreateTime rather than LogAppendTime.
partitionvarcharThe partition the message is from.
offsetvarcharThe offset in the partition.
headersstruct<varchar, bytea>[]Key-value pairs along with the message.
payloadjsonThe actual content or data of the message. Only supports JSON format.

In the case of headers, there are two ways to define it.

You can choose to generate headers with type List[Struct<Varchar, Bytea>].

INCLUDE header [AS kafka_header]

Or you can generate a type bytea header, where the column content will be specified as the value associated with the specified key, header_col. The header_col field can only be defined when including a header. In this case, the generated column name will have the format _rw_kafka_header_{header col name}_{col type}, where col type is the data type of the header column.

INCLUDE header 'header_col' [AS kafka_header]

Kinesis

When ingesting data from Kinesis, here are some things to note when including the following fields.

Allowed componentsDefault typeNote
keybyteaCan be overwritten by ENCODE and KEY ENCODE.
timestamptimestamp with time zoneSee the approximate_arrival_timestamp field at Struct aws_sdk_kinesis::types::Record.
partitionvarcharThe partition the message is from.
offsetvarcharThe offset in the partition, which corresponds to Kinesis sequence numbers.
payloadjsonThe actual content or data of the message. Only supports JSON format.

For more components, see Struct aws_sdk_kinesis::types::Record.

Pulsar

When ingesting data from Pulsar, here are some things to note when including the following fields.

Allowed componentsDefault typeNote
keybyteaCan be overwritten by ENCODE and KEY ENCODE.
partitionvarcharThe partition the message is from.
offsetvarcharThe offset in the partition.
payloadjsonThe actual content or data of the message. Only supports JSON format.

For more components, see Struct pulsar::message::proto::MessageMetadata.

S3, GCS, and Azure Blob

When ingesting data from AWS S3, GCS or Azure Blob, the following additional fields can be included.

Allowed componentsDefault typeNote
filevarcharThe file the record is from.
offsetvarcharThe offset in the file.
payloadjsonThe actual content or data of the message. Only supports JSON format.

MQTT

When ingesting data from MQTT, the following additional fields can be included.

Allowed componentsDefault typeNote
partitionvarcharThe topic the record is from.

Examples

Here we create a table, additional_columns, that ingests data from a Kafka broker. Aside from the a column, which is part of the message payload, the additional fields key, partition, offset, timestamp, header, and payload are also added to the table.

CREATE TABLE additional_columns (
  a int,
  primary key (key_col)
)
INCLUDE key AS key_col
INCLUDE partition AS partition_col
INCLUDE offset AS offset_col
INCLUDE timestamp AS timestamp_col
INCLUDE header AS header_col
INCLUDE payload AS payload_col
WITH (
	connector = 'kafka',
  properties.bootstrap.server = 'message_queue:29092',
	topic = 'kafka_additional_columns')
FORMAT UPSERT ENCODE JSON;