NATS is an open-source messaging system for cloud-native applications. It provides a lightweight publish-subscribe architecture for high-performance messaging.

NATS JetStream is a streaming data platform built on top of NATS. It enables real-time and historical access to streams of data via durable subscriptions and consumer groups.

PUBLIC PREVIEW

This feature is currently in public preview, meaning it is nearing the final product but may not yet be fully stable. If you encounter any issues or have feedback, please reach out to us via our Slack channel. Your input is valuable in helping us improve this feature. For more details, see our Public Preview Feature List.

Prerequisites

Before ingesting data from NATS JetStream into RisingWave, please ensure the following:

  • The NATS JetStream server is running and accessible from your RisingWave cluster.
  • If authentication is required for the NATS JetStream server, make sure you have the client username and password. The client user must have the subscribe permission for the subject.
  • Create the NATS subject from which you want to ingest data.
  • Ensure that your RisingWave cluster is running.

Ingest data into RisingWave

When creating a source, you can choose to persist the data from the source in RisingWave by using CREATE TABLE instead of CREATE SOURCE and specifying the connection settings and data format.

Syntax

CREATE { TABLE | SOURCE} [ IF NOT EXISTS ] source_name
[ schema_definition ]
WITH (
   connector='nats',
   server_url='<your nats server>:<port>', [ <another_server_url_if_available>, ...]
   subject='<subject>[,<another_subject...]',
   stream='<stream_name>',

-- optional parameters
   connect_mode='<connect_mode>',
   username='<your user name>',
   password='<your password>',
   jwt=`<your jwt>`,
   nkey=`<your nkey>`, ...

-- delivery parameters
   scan.startup.mode=`<startup_mode>`,
   scan.startup.timestamp.millis='xxxxx'
)
FORMAT PLAIN ENCODE data_encode;

schema_definition:

(
   column_name data_type [ PRIMARY KEY ], ...
   [ PRIMARY KEY ( column_name, ... ) ]
)

RisingWave performs primary key constraint checks on tables with connector settings but not on regular sources. If you need the checks to be performed, please create a table with connector settings.

For a table with primary key constraints, if a new data record with an existing key comes in, the new record will overwrite the existing record.

According to the NATS documentation, stream names must adhere to subject naming rules as well as being friendly to the file system. Here are the recommended guidelines for stream names:

  • Use alphanumeric values.
  • Avoid spaces, tabs, periods (.), greater than (>) or asterisks (*).
  • Do not include path separators (forward slash or backward slash).
  • Keep the name length limited to 32 characters as the JetStream storage directories include the account, stream name, and consumer name.
  • Avoid using reserved file names like NUL or LPT1.
  • Be cautious of case sensitivity in file systems. To prevent collisions, ensure that stream or account names do not clash due to case differences. For example, Foo and foo would collide on Windows or macOS systems.

Parameters

FieldNotes
server_urlRequired. URLs of the NATS JetStream server, in the format of address:port. If multiple addresses are specified, use commas to separate them.
subjectRequired. NATS subject that you want to ingest data from. To specify more than one subjects, use a comma.
streamRequired. NATS stream that you want to ingest data from.
connect_modeRequired. Authentication mode for the connection. Allowed values: plain: No authentication. user_and_password: Use user name and password for authentication. For this option, username and password must be specified. credential: Use JSON Web Token (JWT) and NKeys for authentication. For this option, jwt and nkey must be specified.
jwt and nkeyJWT and NKEY for authentication. For details, see JWT and NKeys.
username and passwordConditional. The client user name and password. Required when connect_mode is user_and_password.
scan.startup.modeOptional. The offset mode that RisingWave will use to consume data. The supported modes are:
  • earliest: Consume from the earliest available message, corresponding to deliver policy DeliverAll.
  • latest: Consume from the next message, corresponding to DeliverNew policy.
  • timestamp: Consume from a particular UNIX timestamp specified via scan.startup.timestamp.millis, corresponding to DeliverByStartTime policy.
If not specified, the default value earliest will be used.
scan.startup.timestamp.millisConditional. Required when scan.startup.mode is timestamp. RisingWave will start to consume data from the specified UNIX timestamp.
data_encodeSupported encodes: JSON, PROTOBUF, BYTES.
consumer.deliver_subjectOptional. Subject to deliver messages to.
consumer.durable_nameRequired. Durable name for the consumer.
consumer.nameOptional. Name of the consumer.
consumer.descriptionOptional. Description of the consumer.
consumer.deliver_policyOptional. Policy on how messages are delivered.
consumer.ack_policyOptional. Acknowledgment policy for message processing (e.g., None, All, Explicit).
consumer.ack_wait.secOptional. Time to wait for acknowledgment before considering a message as undelivered.
consumer.max_deliverOptional. Maximum number of times a message will be delivered.
consumer.filter_subjectOptional. Filter for subjects that the consumer will process.
consumer.filter_subjectsOptional. List of subjects that the consumer will filter on.
consumer.replay_policyOptional. Policy for replaying messages (e.g., Instant, Original).
consumer.rate_limitOptional. Rate limit for message delivery in bits per second.
consumer.sample_frequencyOptional. Frequency for sampling messages, ranging from 0 to 100.
consumer.max_waitingOptional. Maximum number of messages that can be waiting for acknowledgment.
consumer.max_ack_pendingOptional. Maximum number of acknowledgments that can be pending.
consumer.headers_onlyOptional. If true, only message headers will be delivered.
consumer.max_batchOptional. Maximum number of messages to process in a single batch.
consumer.max_bytesOptional. Maximum number of bytes to receive in a single batch.
consumer.max_expires.secOptional. Maximum expiration time for a message in seconds.
consumer.inactive_threshold.secOptional. Time in seconds before a consumer is considered inactive.
consumer.num.replicasOptional. Number of replicas for the consumer.
consumer.memory_storageOptional. If true, messages will be stored in memory.
consumer.backoff.secOptional. Backoff time in seconds for retrying message delivery.

Examples

The following SQL query creates a table that ingests data from a NATS JetStream source.

CREATE TABLE live_stream_metrics
WITH
  (
    connector = 'nats',
    server_url = 'nats-server:4222',
    subject = 'live_stream_metrics',
    stream = 'risingwave',
    connect_mode = 'plain'
  ) FORMAT PLAIN ENCODE PROTOBUF (
    message = 'livestream.schema.LiveStreamMetrics',
    schema.location = 'http://file_server:8080/schema'
  );

The parameters supported by the async_nats crate are all supported in the RisingWave NATS source connector.

CREATE SOURCE test_source
WITH (
    connector='nats',
    server_url='{{ env_var("SERVER") }}',
    subject='risingwave.test.source',
    stream='risingwave-test-source',
    scan.startup.mode='earliest',
    connect_mode='user_and_password',
    username='{{ env_var("USER") }}',
    password='{{ env_var("PASSWORD") }}',
    consumer.durable_name='risingwave-test-source',
    consumer.description='desc-test-source',
    consumer.ack_policy='all',
    consumer.ack_wait=10,
    consumer.max_deliver=10,
    consumer.filter_subjects='demo.subject.filter.*',
    consumer.filter_subjects='demo.subject.filter.1,demo.subject.filter.2',
    consumer.replay_policy='instant',
    consumer.rate_limit=100000000000,
    consumer.sample_frequency=100,
    consumer.max_waiting=10,
    consumer.max_ack_pending=10,
    -- consumer.idle_heartbeat=60, not available in async_nats crate
    consumer.max_batch=1000,
    consumer.max_bytes=1000000000,
    consumer.max_expires=3600,
    consumer.inactive_threshold=10000000,
    consumer.memory_storage='false',
    consumer.backoff='10,30,60',
    consumer.num_replicas=1
) FORMAT PLAIN ENCODE JSON;