This topic describes how to connect RisingWave to a Kafka broker that you want to receive data from, and how to specify data formats, schemas, and security (encryption and authentication) settings.
CREATE SOURCE
command. When creating a source, you can choose to persist the data from the source in RisingWave by using the CREATE TABLE
command and specifying the connection settings and data format.
Regardless of whether the data is persisted in RisingWave, you can create materialized views to perform analysis or data transformations.
RisingWave supports exactly-once semantics by reading transactional messages only when the associated transaction has been committed. This is the set behavior for RisingWave and not configurable.
schema_definition
in the CREATE SOURCE
statement.
RisingWave performs primary key constraint checks on tables but not on sources. If you need the checks to be performed, please create a table. For tables with primary key constraints, if a new data record with an existing key comes in, the new record will overwrite the existing record.
Field | Notes |
---|---|
topic | Required. Address of the Kafka topic. One source can only correspond to one topic. |
properties.bootstrap.server | Required. Address of the Kafka broker. Format: ip:port,ip:port . |
scan.startup.mode | Optional. The offset mode that RisingWave will use to consume data. The two supported modes are earliest (read from low watermark) and latest (read from high watermark). If not specified, the default value earliest will be used. |
scan.startup.timestamp.millis | Optional. RisingWave will start to consume data from the specified UNIX timestamp (milliseconds). If this field is specified, the value for scan.startup.mode will be ignored. |
group.id.prefix | Optional. Specify a custom group ID prefix for the source. The default prefix is rw-consumer . Each job (materialized view) will have a separate consumer group with a generated suffix in the group ID, so the format of the consumer group is {group_id_prefix}-{fragment_id} . This is used to monitor progress in external Kafka tools and for authorization purposes. RisingWave does not rely on committed offsets or join the consumer group. It only reports offsets to the group. |
properties.sync.call.timeout | Optional. Specify the timeout. By default, the timeout is 5 seconds. |
properties.client.id | Optional. Client ID associated with the Kafka client. |
Field | Notes |
---|---|
data_format | Data format. Supported formats: DEBEZIUM, MAXWELL, CANAL, UPSERT, PLAIN. |
data_encode | Data encode. Supported encodes: JSON, AVRO, PROTOBUF, CSV. |
message | Message name of the main Message in schema definition. Required for Protobuf. |
location | Web location of the schema file in http://... , https://... , or S3://... format.
|
schema.registry | Confluent Schema Registry URL. Example: http://127.0.0.1:8081 .
|
schema.registry.username | Conditional. User name for the schema registry. It must be specified with schema.registry.password. |
schema.registry.password | Conditional. Password for the schema registry. It must be specified with schema.registry.username. |
access_key | Required if loading descriptors from S3. The access key ID of AWS. |
secret_key | Required if loading descriptors from S3. The secret access key of AWS. |
region | Required if loading descriptors from S3. The AWS service region. |
arn | Optional. The Amazon Resource Name (ARN) of the role to assume. |
external_id | Optional. The external id used to authorize access to third-party resources. |
WITH options
. For an example of the usage of these parameters, see the JSON example. For additional details on these parameters, see the Configuration properties.
Kafka parameter name | RisingWave parameter name | Type |
---|---|---|
enable.auto.commit | properties.enable.auto.commit | boolean |
enable.ssl.certificate.verification | properties.enable.ssl.certificate.verification | bool |
fetch.max.bytes | properties.fetch.max.bytes | int |
fetch.queue.backoff.ms | properties.fetch.queue.backoff.ms | int |
fetch.wait.max.ms | properties.fetch.wait.max.ms | int |
message.max.bytes | properties.message.max.bytes | int |
queued.max.messages.kbytes | properties.queued.max.messages.kbytes | int |
queued.min.messages | properties.queued.min.messages | int |
receive.message.max.bytes | properties.receive.message.max.bytes | int |
ssl.endpoint.identification.algorithm | properties.ssl.endpoint.identification.algorithm | str |
queued.min.messages
and queued.max.messages.kbytes
are specified with properties.queued.min.messages
and properties.queued.max.messages.kbytes
, respectively, when creating the source.
properties.ssl.endpoint.identification.algorithm
to none
to bypass the verification of CA certificates and resolve SSL handshake failure. This parameter can be set to either https
or none
. By default, it is https
._rw_kafka_timestamp
, will also exist. This column includes the timestamp of the Kafka message.
You can include this column in your views or materialized views to display the Kafka timestamp. Here is an example.
_rw_kafka_timestamp
to filter messages sent within a specific time period. For example, the following query only selects messages sent in the past 10 minutes.
http://...
, https://...
, or S3://...
format, or a Confluent Schema Registry for Kafka data in Protobuf format. For Avro, Confluent Schema Registry and AWS Glue Schema Registry are supported for reading schemas.
For Protobuf, if a schema location is specified, the schema file must be a FileDescriptorSet
, which can be compiled from a .proto
file with a command like this:
CREATE SOURCE
statement.
TopicNameStrategy
strategy when the CREATE SOURCE
statement is issued. Then the schema parser in RisingWave will automatically determine the columns and data types to use in the source.
To specify the Confluent Schema Registry, add this clause to a CREATE SOURCE
statement.
ENCODE AVRO (...)
clause to read schemas from Glue.
Auth-related configurations:
aws.region
: The region of the AWS Glue Schema Registry. For example, us-west-2
.aws.credentials.access_key_id
: Your AWS access key ID.aws.credentials.secret_access_key
: Your AWS secret access key.aws.credentials.role.arn
: The Amazon Resource Name (ARN) of the role to assume. For example, arn:aws:iam::123456123456:role/MyGlueRole
.
glue:GetSchemaVersion
.aws.glue.schema_arn
: The ARN of the schema in AWS Glue Schema Registry. For example, 'arn:aws:glue:ap-southeast-1:123456123456:schema/default-registry/MyEvent'
.CREATE SOURCE
or CREATE TABLE
statement, specify the following parameters.
Parameter | Notes |
---|---|
privatelink.targets | The PrivateLink targets that correspond to the Kafka brokers. The targets should be in JSON format. Note that each target listed corresponds to each broker specified in the properties.bootstrap.server field. If the order is incorrect, there will be connectivity issues. |
privatelink.endpoint | The DNS name of the VPC endpoint. If you’re using RisingWave Cloud, you can find the auto-generated endpoint after you created a connection. See details in Create a PrivateLink connection. |
connection.name | The name of the connection. This parameter should only be included if you are using a connection created with the CREATE CONNECTION statement. Omit this parameter if you have provisioned a VPC endpoint using privatelink.endpoint (recommended). |
{"port": 9094}
corresponds to the broker broker1-endpoint
, {"port": 9095}
corresponds to the broker broker2-endpoint
, and {"port": 9096}
corresponds to the broker broker3-endpoint
.
SSL
is used in configuration and code instead of TLS
.
Simple Authentication and Security Layer (SASL) is a framework for authentication and data security in Internet protocols.
RisingWave supports these SASL authentication mechanisms:
SASL/PLAIN
SASL/SCRAM
SASL/GSSAPI
SASL/OAUTHBEARER
CREATE SOURCE
statement.
CREATE SOURCE
statement.Parameter | Notes |
---|---|
properties.security.protocol | Set to SSL. |
properties.ssl.ca.location | |
properties.ssl.certificate.location | |
properties.ssl.key.location | |
properties.ssl.key.password |
properties.
and therefore do not include this prefix.