For regular equality joins on streaming queries, the temporary join results are unbounded. If the size of the join results becomes too large, query performance may get impacted. Therefore, you may want to consider time-bounded join types such as interval joins and temporal joins.

Regular joins

RisingWave supports these regular join types:

  • Inner joins
  • Left (outer) joins
  • Right (outer) joins
  • Full (outer) joins

Inner joins

An inner Join returns the rows from both the left and the right table expressions where the specified join condition is met. Rows that do not meet the condition will be excluded from the result set.

The syntax of INNER JOIN is as follows:

<table_expression> INNER JOIN <table_expression> ON <join_conditions>;
<table_expression> INNER JOIN <table_expression> USING (<col_name>, <col_name>, ...);
<table_expression> NATURAL [ INNER ] JOIN <table_expression>;

Left outer joins

A left outer join (or simply left join) returns all rows from the left table expression and the matched rows from the right table expression. If no match is found, NULL values will be filled in for columns from the right table.

The syntax of LEFT (OUTER) JOIN is as follows:

<table_expression> LEFT [ OUTER ] JOIN <table_expression> ON <join_conditions>;
<table_expression> LEFT [ OUTER ] JOIN <table_expression> USING (<col_name>, <col_name>, ...);
<table_expression> NATURAL LEFT [ OUTER ] JOIN <table_expression>;

Right outer joins

A right outer join (or simply right join) returns all rows from the right table expression and the matched rows from the left table expression. If no match is found, NULL values will be returned for columns from the left table expression.

The syntax of RIGHT (OUTER) JOIN is as follows:

<table_expression> RIGHT [ OUTER ] JOIN <table_expression> ON <join_conditions>;
<table_expression> RIGHT [ OUTER ] JOIN <table_expression> USING (<col_name>, <col_name>, ...);
<table_expression> NATURAL RIGHT [ OUTER ] JOIN <table_expression>;

Full outer joins

A full outer join (or simply, full join) returns all rows when there is a match in either the left or right table expression. If no match is found, NULL values will be returned for columns from the table expression where no match is found.

<table_expression> FULL [ OUTER ] JOIN <table_expression> ON <join_conditions>;
<table_expression> FULL [ OUTER ] JOIN <table_expression> USING (<col_name>, <col_name>, ...);
<table_expression> NATURAL FULL [ OUTER ] JOIN <table_expression>;

ASOF joins

PUBLIC PREVIEW

This feature is currently in public preview, meaning it is nearing the final product but may not yet be fully stable. If you encounter any issues or have feedback, please reach out to us via our Slack channel. Your input is valuable in helping us improve this feature. For more details, see our Public Preview Feature List.

An ASOF join returns the nearest record in a reference table based on the event time or any ordered properties.

RisingWave supports these ASOF join types:

  • Inner ASOF join matches records only when both tables have corresponding data. Here’s the syntax of an inner ASOF join:

    SELECT A.field1 AS A_field1 
    FROM TableA ASOF JOIN TableB 
    ON A.field1 = B.field1 AND A.field2 <= B.field2;
    
  • Outer ASOF join includes all records from the left table, even if there is no match in the right table. When there is no match in the right table, the columns from the right table will have NULL values. Here’s the syntax of an outer ASOF join:

    SELECT A.field1 AS A_field1 
    FROM TableA ASOF LEFT JOIN TableB 
    ON A.field1 = B.field1 AND A.field2 <= B.field2;
    

In both types of ASOF joins, the join condition must include at least one equality condition (=) and one inequality condition (>=, >, <=, or <). The inequality condition applies to all data types that support inequality comparison while a time-related type is commonly used. ASOF join is currently supported for streaming operations only. For example, suppose you have two tables:

  • stock_prices: Contains stock price data at certain timestamps.

    stock_namestock_timeprice
    TSLA2024-09-24 09:30:00250
    TSLA2024-09-24 10:30:00252
    TSLA2024-09-24 11:30:00255
    AMZN2024-09-24 09:30:003300
    AMZN2024-09-24 10:30:003310
    AMZN2024-09-24 11:30:003320
    GOOG2024-09-24 09:30:001400
    GOOG2024-09-24 10:30:001410
    GOOG2024-09-24 11:30:001420
  • market_data: Contains market sentiment data at different timestamps.

    stock_namemarket_timesentiment
    TSLA2024-09-24 09:00:000.7
    TSLA2024-09-24 10:00:000.8
    TSLA2024-09-24 11:00:000.9
    AMZN2024-09-24 09:00:000.6
    AMZN2024-09-24 10:00:000.65
    AMZN2024-09-24 11:00:000.7
    NVDA2024-09-24 09:00:000.55
    NVDA2024-09-24 10:00:000.6
    NVDA2024-09-24 11:00:000.65

We want to join the stock prices with the nearest preceding market sentiment for each stock price based on time. We can use an ASOF JOIN to find the latest matching record in market_data where the market_time is less than or equal to the stock_time:

SELECT sp.stock_name, sp.stock_time, sp.price, md.sentiment
FROM stock_prices sp
ASOF JOIN market_data md 
ON sp.stock_name = md.stock_name 
AND md.market_time <= sp.stock_time;

Output:

stock_namestock_timepricesentiment
TSLA2024-09-24 09:30:002500.7
TSLA2024-09-24 10:30:002520.8
TSLA2024-09-24 11:30:002550.9
AMZN2024-09-24 09:30:0033000.6
AMZN2024-09-24 10:30:0033100.65
AMZN2024-09-24 11:30:0033200.7

We can use an ASOF LEFT JOIN to output records in the left table that have no matches in the right table.

SELECT sp.stock_name, sp.stock_time, sp.price, md.sentiment
FROM stock_prices sp
ASOF LEFT JOIN market_data md 
ON sp.stock_name = md.stock_name 
AND md.market_time <= sp.stock_time;

Output:

stock_namestock_timepricesentiment
TSLA2024-09-24 09:30:002500.7
TSLA2024-09-24 10:30:002520.8
TSLA2024-09-24 11:30:002550.9
AMZN2024-09-24 09:30:0033000.6
AMZN2024-09-24 10:30:0033100.65
AMZN2024-09-24 11:30:0033200.7
GOOG2024-09-24 09:30:001400NULL
GOOG2024-09-24 10:30:001410NULL
GOOG2024-09-24 11:30:001420NULL

TSLA and AMZN have matching records in market_data, so they show the closest preceding sentiment. GOOG has no corresponding data in market_data, so the sentiment column is NULL.

Windows joins

In a regular join (that is, a join without time attributes), the join state may grow without restriction. If you only need to get windowed results of two sources, you can segment data in the sources into time windows, and join matching windows from the two sources. To create a window join, the same time window functions must be used, and the window size must be the same.

The syntax of a window join is:

<time_window_expression> JOIN <time_window_expression> ON <join_conditions>;

One of the join_conditions must be an equality condition based on the watermarks of the two table expressions. For the syntax of <time_window_expression>, see Time window functions.

For example, suppose you have these two sources:

CREATE SOURCE s1 (
 id int,
 value int,
 ts TIMESTAMP,
 WATERMARK FOR ts AS ts - INTERVAL '20' SECOND
) WITH (connector = 'datagen');

CREATE SOURCE s2 (
 id int,
 value int,
 ts TIMESTAMP,
 WATERMARK FOR ts AS ts - INTERVAL '20' SECOND
) WITH (connector = 'datagen');

You can join them with the following statement:

CREATE MATERIALIZED VIEW window_join AS
SELECT s1.id AS id1,
       s1.value AS value1,
       s2.id AS id2,
       s2.value AS value2
FROM TUMBLE(s1, ts, interval '1' MINUTE)
JOIN TUMBLE(s2, ts, interval '1' MINUTE)
ON s1.id = s2.id and s1.window_start = s2.window_start;

Interval joins

Window joins require that the two sources have the same window type and window size. This requirement can be too strict in some scenarios. If you want to join two sources that have some time offset, you can create an interval join by specifying an accepted interval range based on watermarks.

The syntax of an interval join is:

<table_expression> JOIN <table_expression> ON <equality_join_condition> AND <interval_condition> ...;

In an interval join, the interval_condition must be a watermark-based range.

For example, for sources s1 and s2 used in the above section, you can create an interval join:

CREATE MATERIALIZED VIEW interval_join AS
SELECT s1.id AS id1,
       s1.value AS value1,
       s2.id AS id2,
       s2.value AS value2
FROM s1 JOIN s2
ON s1.id = s2.id and s1.ts between s2.ts and s2.ts + INTERVAL '1' MINUTE;

Interval join‘s state cleaning is triggered only when upstream messages arrive, and it operates at the granularity of each join key. As a result, if no messages are received for a join key, the state may still hold stale data.

Process-time temporal joins

Process-time temporal joins are divided into two categories: append-only process-time temporal join and non-append-only process-time temporal join. Check the following instructions for their differences.

Append-only process-time temporal join

An append-only temporal join is often used to widen a fact table. Its advantage is that it does not require RisingWave to maintain the join state, making it suitable for scenarios where the dimension table is not updated, or where updates to the dimension table do not affect the previously joined results. To further improve performance, you can use the index of a dimension table to form a join with the fact table.

Syntax

SELECT ... FROM <table_expression> [AS <alias>]
[ LEFT | INNER ] JOIN <table_expression> FOR SYSTEM_TIME AS OF PROCTIME() [AS <alias>]
ON <join_conditions>;
  • The left table expression is an append-only table or source.
  • The right table expression is a table, index or materialized view.
  • The process-time syntax FOR SYSTEM_TIME AS OF PROCTIME() is included in the right table expression.
  • The join type is INNER JOIN or LEFT JOIN.
  • The Join condition includes the primary key of the right table expression.

Example

If you have an append-only stream that includes messages like below:

transaction_idproduct_idquantitysale_dateprocess_time
110132023-06-182023-06-18 10:15:00
210222023-06-192023-06-19 15:30:00
310112023-06-202023-06-20 11:45:00

And a versioned table products:

idproduct_namepricevalid_fromvalid_to
101Product A202023-06-01 00:00:002023-06-15 23:59:59
101Product A252023-06-16 00:00:002023-06-19 23:59:59
101Product A222023-06-20 00:00:00NULL
102Product B152023-06-01 00:00:00NULL

For the same product ID, the product name or the price is updated from time to time.

You can use a temporal join to fetch the latest product name and price from the products table and form a wider table. To further improve performance, you can create an index for table products, and join sales with the index instead.

SELECT transaction_id, product_id, quantity, sale_date, product_name, price
FROM sales
JOIN products FOR SYSTEM_TIME AS OF PROCTIME()
ON product_id = id WHERE process_time BETWEEN valid_from AND valid_to;
transaction_idproduct_idquantitysale_dateproduct_nameprice
110132023-06-18Product A25
210222023-06-19Product B15
310112023-06-20Product A22

Non-append-only process-time temporal join

Compared to the append-only temporal join, the non-append-only temporal join can accommodate non-append-only input for the left table. However, it introduces an internal state to materialize the lookup result for each left-hand side (LHS) insertion. This allows the temporal join operator to retract the join result it sends downstream when update or delete messages arrive.

Syntax

The non-append-only temporal join shares the same syntax as the append-only temporal join.

<table_expression> [ LEFT | INNER ] JOIN <table_expression> FOR SYSTEM_TIME AS OF PROCTIME() ON <join_conditions>;

Example

Now if you update the table sales:

UPDATE sales SET quantity = quantity + 1;

You will get these results:

transaction_idproduct_idquantitysale_dateproduct_nameprice
110142023-06-18Product A25
210232023-06-19Product B15
310122023-06-20Product A22

Every time you update the left-hand side table, it will look up the latest data from the right-hand side table.