Ingest data from Apache Iceberg
This guide describes how to batch ingest data from Apache Iceberg to RisingWave using the Iceberg source in RisingWave. Apache Iceberg is a table format designed to support huge tables. For more information, see Apache Iceberg.
PUBLIC PREVIEW
This feature is currently in public preview, meaning it is nearing the final product but may not yet be fully stable. If you encounter any issues or have feedback, please reach out to us via our Slack channel. Your input is valuable in helping us improve this feature. For more details, see our Public Preview Feature List.
Syntax
You don’t need to specify the column name for the Iceberg source, as RisingWave can derive it from the Iceberg table metadata directly. Use DESCRIBE statement to view the column names and data types.
Parameters
Field | Notes |
---|---|
type | Required. Allowed values: append-only and upsert . |
s3.endpoint | Optional. Endpoint of the S3.
|
s3.region | Optional. The region where the S3 bucket is hosted. Either s3.endpoint or s3.region must be specified. |
s3.access.key | Required. Access key of the S3 compatible object store. |
s3.secret.key | Required. Secret key of the S3 compatible object store. |
database.name | Required. Name of the database that you want to ingest data from. |
table.name | Required. Name of the table that you want to ingest data from. |
catalog.name | Conditional. The name of the Iceberg catalog. It can be omitted for storage catalog but required for other catalogs. |
catalog.type | Optional. The catalog type used in this table. Currently, the supported values are storage , rest , hive , jdbc , and glue . If not specified, storage is used. For details, see Catalogs. |
warehouse.path | Conditional. The path of the Iceberg warehouse. Currently, only S3-compatible object storage systems, such as AWS S3 and MinIO, are supported. It’s required if the catalog.type is not rest . |
catalog.url | Conditional. The URL of the catalog. It is required when catalog.type is not storage . |
Data type mapping
RisingWave converts data types from Iceberg to RisingWave according to the following data type mapping table.
Iceberg Type | RisingWave Type |
---|---|
boolean | boolean |
integer | int |
long | bigint |
float | real |
double | double |
string | varchar |
date | date |
timestamptz | timestamptz |
timestamp | timestamp |
decimal | decimal |
Catalogs
Iceberg supports these types of catalogs:
- Storage catalog: The Storage catalog stores all metadata in the underlying file system, such as Hadoop or S3. Currently, we only support S3 as the underlying file system. Examples
- REST catalog: RisingWave supports the REST catalog, which acts as a proxy to other catalogs like Hive, JDBC, and Nessie catalog. This is the recommended approach to use RisingWave with Iceberg tables. Examples
- Hive catalog: RisingWave supports the Hive catalog. You need to set
catalog.type
tohive
to use it. Examples
- Jdbc catalog: RisingWave supports the JDBC catalog. Examples
- Glue Catalog: RisingWave supports the Glue catalog. You should use AWS S3 if you use the Glue catalog. Below are example codes for using this catalog: Examples
Time travel
Our Iceberg source provides time travel capabilities, allowing you to query data from a specific point in time or version, rather than just the current state of the data. You can achieve this by specifying a timestamp or a version identifier.
Here is the syntax for specifying a system time. The timestamp here should be in a format like YYYY-MM-DD HH:MM:SS
or a UNIX timestamp in seconds.
Here is the syntax for specifying a system version:
Here are some examples:
System tables
We currently support system tables rw_iceberg_files and rw_iceberg_snapshots. rw_iceberg_files
contains the current files of the Iceberg source or table. Here is a simple example:
Read Iceberg files
rw_iceberg_snapshots
contains all Iceberg snapshots in RisingWave. Based on it, you can read a specific snapshot by a time travel query. For example, you can use the following query to read these snapshots:
Read all Iceberg snapshots
Examples
Firstly, create an append-only Iceberg table, see Append-only sink from upsert source for details.
Then, you can query the Iceberg source by using a batch query:
Typically, you can use CTAS (CREATE TABLE AS SELECT) to load historical Iceberg table data into a RisingWave table:
Furthermore, if you have a Kafka upstream on the Iceberg table, you can use SINK INTO TABLE
to ingest data from Kafka to the RisingWave table: