Apache Pulsar is an open-source distributed pub-sub messaging system and event streaming platform that is scalable and designed to support geo-replication.

PUBLIC PREVIEW

This feature is currently in public preview, meaning it is nearing the final product but may not yet be fully stable. If you encounter any issues or have feedback, please reach out to us via our Slack channel. Your input is valuable in helping us improve this feature. For more details, see our Public Preview Feature List.

Prerequisites

Before sinking data from RisingWave to Pulsar, please ensure the following:

  • A Pulsar cluster is running and accessible from RisingWave.
  • You have the permission to access the Pulsar topics you want to sink data to.

Syntax

CREATE SINK [ IF NOT EXISTS ] sink_name
[FROM sink_from | AS select_query]
WITH (
   connector='pulsar',
   connector_parameter = 'value', ...
)
FORMAT data_format ENCODE data_encode [ (
    key = 'value' ) ]
[KEY ENCODE key_encode [(...)]]
;

Parameters

Parameter namesDescription
topicRequired. The address of the Pulsar topic. One source can only correspond to one topic.
service.urlRequired. The address of the Pulsar service.
auth.tokenOptional. A token for auth. If both auth.token and oauth are set, only oauth authorization is considered.
oauth.issuer.urlOptional. The issuer URL for OAuth2. This field must be filled if other oauth fields are specified.
oauth.credentials.urlOptional. The path for credential files, which starts with file://. This field must be filled if other oauth fields are specified.
oauth.audienceOptional. The audience for OAuth2. This field must be filled if other oauth fields are specified.
oauth.scopeOptional. The scope for OAuth2.
aws.credentials.access_key_idOptional. The AWS access key for loading from S3. This field does not need to be filled if oauth.credentials.url is specified to a local path.
aws.credentials.secret_access_keyOptional. The AWS secret access key for loading from S3. This field does not need to be filled if oauth.credentials.url is specified to a local path.
max_retry_numOptional. The maximum number of times to retry sending a batch to Pulsar. This allows retrying in case of transient errors. The default value is 3.
retry_intervalOptional. The time in milliseconds to wait after a failure before retrying to send a batch. The default value is 100ms.
primary_keyOptional. The primary keys of the sink. Use , to delimit the primary key columns. Primary keys are optional when creating a PLAIN sink but required for UPSERT and DEBEZIUM sinks.

FORMAT and ENCODE options

These options should be set in FORMAT data_format ENCODE data_encode (key = 'value'), instead of the WITH clause

FieldNotes
data_formatData format. Allowed formats:
  • PLAIN: Output data with insert operations.
  • DEBEZIUM: Output change data capture (CDC) log in Debezium format.
  • UPSERT: Output data as a changelog stream. primary_key must be specified in this case.
To learn about when to define the primary key if creating an UPSERT sink, see the Overview.
data_encodeData encode. Supported encode: JSON.
force_append_onlyIf true, forces the sink to be PLAIN (also known as append-only), even if it cannot be.
timestamptz.handling.modeControls the timestamptz output format. This parameter specifically applies to append-only or upsert sinks using JSON encoding.
  • If omitted, the output format of timestamptz is 2023-11-11T18:30:09.453000Z which includes the UTC suffix Z.
  • When utc_without_suffix is specified, the format is changed to 2023-11-11 18:30:09.453000.
key_encodeOptional. When specified, the key encode can only be TEXT, and the primary key should be one and only one of the following types: varchar, bool, smallint, int, and bigint; When absent, both key and value will use the same setting of ENCODE data_encode ( ... ).

Example

The following SQL query in RisingWave creates a Pulsar sink.

CREATE SINK IF NOT EXISTS pulsar_sink
FROM mv_name
WITH (
  connector = 'pulsar',
  topic = 'test-topic',
  service.url = 'pulsar://broker:6650',

  -- OAuth
  oauth.issuer.url = 'https://issuer.com',
  oauth.credentials.url = 'https://provider.com',
  oauth.audience = 'test-aud',
  oauth.scope = 'consume',

  -- S3 credential for oauth file
  aws.credentials.access_key_id = 'xxx',
  aws.credentials.secret_access_key = 'xxx'
)
FORMAT DEBEZIUM ENCODE JSON;