Overview of data ingestion
RisingWave supports a variety of data ingestion methods.
To know the difference between stream processing and ad-hoc query, please refer to Ad-hoc (on read) vs. Streaming (on write).
- Streaming ingestion from external systems: This is tied to a stream processing task, continuously monitoring and synchronizing changes from external systems.
- Ad-hoc ingestion from external systems: This is bound to an ad-hoc query, where RisingWave queries the current data from the external system for processing during the query.
- Ingest via DML statements: Like other databases, RisingWave allows users to directly insert and modify data in tables using DML statements (
INSERT
,UPDATE
,DELETE
).- Additionally, with the
INSERT ... SELECT
statement, users can transform ad-hoc ingestion data into a streaming flow to the table, affecting the downstream streaming pipeline of the table. This can also be used for bulk imports.
- Additionally, with the
Ingest data from external systems
Source
In RisingWave, Source is the most fundamental object used to connect to data from external systems. Here is a simple example of creating a source from Kafka.
After creating a source, no actual data ingestion occurs. Data ingestion happens when a query that references the source is submitted.
CREATE MATERIALIZED VIEW
orCREATE SINK
statement will generate streaming ingestion jobs.
The following statement will continuously import data from the Kafka topic and store it in the materialized view mv
.
- Also, queries can be executed directly on the source, and ad-hoc ingestion will happen during the query’s processing, see more information in directly query Kafka.
For specific source types, their support for streaming ingestion and ad-hoc ingestion varies. Please refer to our documentation for the specific source.
Table with connectors
For sources that support streaming ingestion, RisingWave supports the direct creation of tables on them.
The statement will create a streaming job that continuously ingests data from the Kafka topic to the table and the data will be stored in RisingWave’s internal storage, which brings the following benefits:
- Improved ad-hoc query performance: When users execute queries such as
SELECT * FROM table_on_kafka
, the query engine will directly access the data from RisingWave’s internal storage, eliminating unnecessary network overhead and avoiding read pressure on upstream systems. Additionally, users can create indexes on the table to accelerate queries. - Allow defining primary keys: With the help of its internal storage, RisingWave can efficiently maintain primary key constraints. Users can define a primary key on a specific column of the table and define different behaviors for primary key conflicts with ON CONFLICT clause.
- Ability to handle delete/update changes: Based on the definition of primary keys, RisingWave can efficiently process upstream synchronized delete and update operations. For systems that synchronize delete/update operations from external systems, such as database’s CDC and UPSERT format messages from message queues, we do not allow creating a source on it but require a table with connectors.
- Stronger consistency guarantee: When using a table with connectors, all downstream jobs will be guaranteed to have a consistent view of the data persisted in the table; while for source, different jobs may see inconsistent results due to different ingestion speed or data retention in the external system.
- Greater flexibility: Like regular tables, you can use DML statements like INSERT, UPDATE and DELETE to insert or modify data in tables with connectors, and use CREATE SINK INTO TABLE to merge other data streams into the table.
PostgreSQL table
RisingWave supports using the table-valued function postgres_query
to directly query PostgreSQL databases. This function connects to a specified PostgreSQL instance, executes the provided SQL query, and returns the results as a table in RisingWave.
To use it, specify connection details (such as hostname, port, username, password, database name) and the desired SQL query. This makes it easier to integrate PostgreSQL data directly into RisingWave workflows without needing additional data transfer steps. For more information, see Ingest data from Postgres tables.
DML on tables
Insert data into tables
You can load data in batch mode to RisingWave by creating a table and then inserting data into it. For example, the statement below creates a table website_visits
and inserts 5 rows of data.
Use INSERT SELECT
to do bulk ingestion
For sources that only support ad-hoc ingestion but not streaming ingestion, such as the Iceberg source, insert ... select ...
can be used to implement bulk data import into the table, and to convert the data into a stream of changes that are synchronized downstream to the table.
File source management
RisingWave supports reading data from file sources including AWS S3, GCS, and Azure Blob Storage.
Batch reading from file source
Added in version 2.1.
To read data in batch from file sources, you need to create a materialized view from the source or create a table with the appropriate connector. You can also directly query the file source. Below are examples using AWS S3.
Data type mapping in Parquet
When using file source to read parquet files, please define the schema information according to the following mapping.
Parquet data type | RisingWave file source data type |
---|---|
boolean | boolean |
int16 | smallint |
int32 | int |
int64 | bigint |
float | real |
double | double precision |
string | varchar |
date | date |
decimal | decimal |
int8 | smallint |
uint8 | smallint |
uint16 | int |
uint32 | bigint |
uint64 | decimal |
float16 | double precision |
timestamp(_, Some(_)) | timestamptz |
timestamp(_, None) | timestamp |
Topics in this section
The information presented above provides a brief overview of the data ingestion process in RisingWave. To gain a more comprehensive understanding of this process, the following topics in this section will delve more deeply into the subject matter. Here is a brief introduction to what you can expect to find in each topic:
- Among different types of sources, we have abstracted a series of common syntax and features.
- For more detailed information about the types, formats, and encoding options of sources, see Formats and encoding.
- For the complete list of the sources and formats supported in RisingWave, see Supported sources and formats.
- To learn about how to manage schemas and ingest additional fields from sources :
- To learn about how to ingest data from a particular source, see specific Data ingestion guides.
Was this page helpful?