To know the difference between stream processing and ad-hoc query, please refer to Ad-hoc (on read) vs. Streaming (on write).

  • Streaming ingestion from external systems: This is tied to a stream processing task, continuously monitoring and synchronizing changes from external systems.
  • Ad-hoc ingestion from external systems: This is bound to an ad-hoc query, where RisingWave queries the current data from the external system for processing during the query.
  • Ingest via DML statements: Like other databases, RisingWave allows users to directly insert and modify data in tables using DML statements (INSERT, UPDATE, DELETE).
    • Additionally, with the INSERT ... SELECT statement, users can transform ad-hoc ingestion data into a streaming flow to the table, affecting the downstream streaming pipeline of the table. This can also be used for bulk imports.

Ingest data from external systems

Source

In RisingWave, Source is the most fundamental object used to connect to data from external systems. Here is a simple example of creating a source from Kafka.

CREATE SOURCE kafka_source (
  k int,
  v1 text,
  v2 text
)
WITH (
  connector = 'kafka',
  topic = 'topic_name',
  properties.bootstrap.server = 'message_queue:29092'
) FORMAT PLAIN ENCODE JSON;

After creating a source, no actual data ingestion occurs. Data ingestion happens when a query that references the source is submitted.

  • CREATE MATERIALIZED VIEW or CREATE SINK statement will generate streaming ingestion jobs.

The following statement will continuously import data from the Kafka topic and store it in the materialized view mv.

CREATE MATERIALIZED VIEW mv AS
SELECT *
FROM kafka_source;
  • Also, queries can be executed directly on the source, and ad-hoc ingestion will happen during the query’s processing, see more information in directly query Kafka.
SELECT * FROM source_name
WHERE _rw_kafka_timestamp > now() - interval '10 minute';

For specific source types, their support for streaming ingestion and ad-hoc ingestion varies. Please refer to our documentation for the specific source.

Table with connectors

For sources that support streaming ingestion, RisingWave supports the direct creation of tables on them.

CREATE TABLE table_on_kafka (
  k int primary key,
  v1 text,
  v2 text)
WITH (
  connector = 'kafka',
  topic = 'topic_name',
  properties.bootstrap.server = 'message_queue:29092'
) FORMAT PLAIN ENCODE JSON;

The statement will create a streaming job that continuously ingests data from the Kafka topic to the table and the data will be stored in RisingWave’s internal storage, which brings the following benefits:

  1. Improved ad-hoc query performance: When users execute queries such as SELECT * FROM table_on_kafka, the query engine will directly access the data from RisingWave’s internal storage, eliminating unnecessary network overhead and avoiding read pressure on upstream systems. Additionally, users can create indexes on the table to accelerate queries.
  2. Allow defining primary keys: With the help of its internal storage, RisingWave can efficiently maintain primary key constraints. Users can define a primary key on a specific column of the table and define different behaviors for primary key conflicts with ON CONFLICT clause.
  3. Ability to handle delete/update changes: Based on the definition of primary keys, RisingWave can efficiently process upstream synchronized delete and update operations. For systems that synchronize delete/update operations from external systems, such as database’s CDC and UPSERT format messages from message queues, we do not allow creating a source on it but require a table with connectors.
  4. Stronger consistency guarantee: When using a table with connectors, all downstream jobs will be guaranteed to have a consistent view of the data persisted in the table; while for source, different jobs may see inconsistent results due to different ingestion speed or data retention in the external system.
  5. Greater flexibility: Like regular tables, you can use DML statements like INSERT, UPDATE and DELETE to insert or modify data in tables with connectors, and use CREATE SINK INTO TABLE to merge other data streams into the table.

PostgreSQL table

RisingWave supports using the table-valued function postgres_query to directly query PostgreSQL databases. This function connects to a specified PostgreSQL instance, executes the provided SQL query, and returns the results as a table in RisingWave.

To use it, specify connection details (such as hostname, port, username, password, database name) and the desired SQL query. This makes it easier to integrate PostgreSQL data directly into RisingWave workflows without needing additional data transfer steps. For more information, see Ingest data from Postgres tables.

DML on tables

Insert data into tables

You can load data in batch mode to RisingWave by creating a table and then inserting data into it. For example, the statement below creates a table website_visits and inserts 5 rows of data.

CREATE TABLE website_visits (
  timestamp timestamp with time zone,
  user_id varchar,
  page_id varchar,
  action varchar
);

INSERT INTO website_visits (timestamp, user_id, page_id, action) VALUES
  ('2023-06-13T10:00:00Z', 'user1', 'page1', 'view'),
  ('2023-06-13T10:01:00Z', 'user2', 'page2', 'view'),
  ('2023-06-13T10:02:00Z', 'user3', 'page3', 'view'),
  ('2023-06-13T10:03:00Z', 'user4', 'page1', 'view'),
  ('2023-06-13T10:04:00Z', 'user5', 'page2', 'view');

Use INSERT SELECT to do bulk ingestion

For sources that only support ad-hoc ingestion but not streaming ingestion, such as the Iceberg source, insert ... select ... can be used to implement bulk data import into the table, and to convert the data into a stream of changes that are synchronized downstream to the table.

CREATE SOURCE source_iceberg_t1
WITH (
    connector = 'iceberg',
    catalog.type = 'storage',
    ...
    database.name = 's1',
    table.name = 't1'
);

CREATE TABLE t1(
  timestamp timestamp with time zone,
  user_id varchar,
  page_id varchar,
  action varchar
);

INSERT INTO t1 SELECT * FROM source_iceberg_t1;

File source management

RisingWave supports reading data from file sources including AWS S3, GCS, and Azure Blob Storage.

Batch reading from file source

Added in version 2.1.

To read data in batch from file sources, you need to create a materialized view from the source or create a table with the appropriate connector. You can also directly query the file source. Below are examples using AWS S3.

-- Create a source that connects to S3
CREATE SOURCE s3_source WITH ( connector = 's3', ... );

-- Create a materialized view from the source for batch processing
CREATE MATERIALIZED VIEW mv AS SELECT * FROM s3_source;

-- Create a table using the S3 connector
CREATE TABLE s3_table ( ... ) WITH ( connector = 's3', ... );

-- Select from the source directly
SELECT count(*) from s3_source;

Data type mapping in Parquet

When using file source to read parquet files, please define the schema information according to the following mapping.

Parquet data typeRisingWave file source data type
booleanboolean
int16smallint
int32int
int64bigint
floatreal
doubledouble precision
stringvarchar
datedate
decimaldecimal
int8smallint
uint8smallint
uint16int
uint32bigint
uint64decimal
float16double precision
timestamp(_, Some(_))timestamptz
timestamp(_, None)timestamp

Topics in this section

The information presented above provides a brief overview of the data ingestion process in RisingWave. To gain a more comprehensive understanding of this process, the following topics in this section will delve more deeply into the subject matter. Here is a brief introduction to what you can expect to find in each topic: