
Understanding the fast-moving market is pivotal to making informed trading decisions. The market is constantly influenced by many external factors that are not reflected in the raw market data, such as geopolitical events, economic indicators, and indisutry-specific news. These factors often create rapid fluctuations that are not immediately apparent in raw market data. To make sense of these shifts, traders need external data to better understand the context behinds these price movements.

Compiling and analyzing this information in real-time is key to understanding the market and making better trading decisions. With real-time data analysis, traders can process and join raw market data and external signals as they happen. By combining these data streams, traders gain a comprehensive, up-to-the-second view of market conditions. This allows them to act quickly and confidently, making quick adjustments to maximize profits and mitigate risks.

In this tutorial, you will learn how to join market data with external data to gain a holistic view of each asset.


  • Ensure that the PostgreSQL interactive terminal, psql, is installed in your environment. For detailed instructions, see Download PostgreSQL.
  • Install and run RisingWave. For detailed instructions on how to quickly get started, see the Quick start guide.
  • Ensure that a Python environment is set up and install the psycopg2 library.

Step 1: Set up the data source tables

Once RisingWave is installed and deployed, run the two SQL queries below to set up the tables. You will insert data into these tables to simulate live data streams.

  1. The table raw_market_data tracks raw market data of each asset, such as the price, volume, and bid-ask price.

    CREATE TABLE raw_market_data (
        asset_id INT,
        timestamp TIMESTAMPTZ,
        price NUMERIC,
        volume INT,
        bid_price NUMERIC,
        ask_price NUMERIC
  2. The table enrichment_data contains external data that adds context to the raw market data. It includes additional metrics such as historical volatility, sector performance, and sentiment scores.

    CREATE TABLE enrichment_data (
        asset_id INT,
        sector VARCHAR,
        historical_volatility NUMERIC,
        sector_performance NUMERIC,
        sentiment_score NUMERIC,
        timestamp TIMESTAMPTZ

Step 2: Run the data generator

To keep this demo simple, a Python script is used to generate and insert data into the tables created above.

Clone the awesome-stream-processing repository.

git clone https://github.com/risingwavelabs/awesome-stream-processing.git

Navigate to the market_data_enrichment folder.

cd awesome-stream-processing/tree/main/02-simple-demos/capital_markets/market_data_enrichment

Run the data_generator.py file. This Python script utilizes the psycopg2 library to establish a connection with RisingWave so you can generate and insert synthetic data into the tables positions and market_data.

If you are not running RisingWave locally or using default credentials, update the connection parameters accordingly:

default_params = {
    "dbname": "dev",
    "user": "root",
    "password": "",
    "host": "localhost",
    "port": "4566"

Step 3: Create materialized views

In this demo, you will create three materialized views to better understand the market.

Materialized views contain the results of a view expression and are stored in the RisingWave database. The results of a materialized view are computed incrementally and updated whenever new events arrive and do not require to be refreshed. When you query from a materialized view, it will return the most up-to-date computation results.

Calculate bid-ask spread

The avg_price_bid_ask_spread materialized view calculates the average price and average bid-ask spread for each asset in five-minute time windows by using TUMBLE() and grouping by the assed_id and the time window.

To learn more about TUMBLE(), see Time windows.

CREATE MATERIALIZED VIEW avg_price_bid_ask_spread AS
    ROUND(AVG(price), 2) AS average_price,
    ROUND(AVG(ask_price - bid_price), 2) AS bid_ask_spread,
    TUMBLE(raw_market_data, timestamp, '5 minutes')
GROUP BY asset_id, window_end;

You can query from avg_price_bid_ask_spread to see the results.

SELECT * FROM avg_price_bid_ask_spread LIMIT 5;
 asset_id | average_price | bid_ask_spread |     window_end      
        2 |        106.55 |           0.58 | 2024-11-19 16:20:00
        5 |         98.08 |           0.60 | 2024-11-19 16:25:00
        1 |         93.39 |           0.61 | 2024-11-19 16:20:00
        3 |        100.96 |           0.60 | 2024-11-19 16:25:00
        4 |         99.56 |           0.64 | 2024-11-19 16:20:00

Calculate rolling price volatility

The rolling_volatility materialized view uses the stddev_samp function to calculate the rolling price volatility within five-minute time windows by using TUMBLE() and grouping by the assed_id and the time window.

    ROUND(stddev_samp(price), 2) AS rolling_volatility,
    TUMBLE(raw_market_data, timestamp, '5 minutes')
    GROUP BY asset_id, window_end;

You can query from rolling_volatility to see the results.

SELECT * FROM rolling_volatility LIMIT 5;
 asset_id | rolling_volatility |     window_end      
        1 |              27.98 | 2024-11-19 16:35:00
        4 |              29.55 | 2024-11-19 16:35:00
        5 |              28.78 | 2024-11-19 16:30:00
        2 |              28.76 | 2024-11-19 16:20:00
        5 |              27.60 | 2024-11-19 16:25:00

Obtain a comprehensive view of each asset

The enriched_market_data materialized view combines the transformed market data with the enrichment data. TUMBLE() is used to group the data from enrichment_data into five-minute time windows for each asset. Then it is joined with the volatility and bid-ask spread data.

By combining these data sources, you can obtain a more holistic view of each asset, empowering you to make more informed market decisions.

CREATE MATERIALIZED VIEW enriched_market_data AS
    avg_price_bid_ask_spread AS bas
    rolling_volatility AS rv
    bas.asset_id = rv.asset_id AND
    bas.window_end = rv.window_end
    SELECT asset_id,
        AVG(historical_volatility) AS avg_historical_volatility,
        AVG(sector_performance) AS avg_sector_performance,
        AVG(sentiment_score) AS avg_sentiment_score
    FROM TUMBLE(enrichment_data, timestamp, '5 minutes')
    GROUP BY asset_id, sector, window_end
) AS ed
ON bas.asset_id = ed.asset_id AND
   bas.window_end = ed.window_end;

You can query from enriched_market_data to see the results.

SELECT * FROM enriched_market_data LIMIT 5;
 asset_id |   sector   | average_price | bid_ask_spread | rolling_volatility |   avg_historical_volatility    |     avg_sector_performance      |       avg_sentiment_score       |     window_end      
        1 | Energy     |         99.75 |           0.61 |              27.83 |                      0.2940625 |                        -0.00375 |                       0.0940625 | 2024-11-19 16:30:00
        4 | Technology |        100.62 |           0.60 |              30.52 | 0.3102702702702702702702702703 |  0.0045945945945945945945945946 | -0.0683783783783783783783783784 | 2024-11-19 16:30:00
        5 | Energy     |        100.24 |           0.60 |              28.80 | 0.2890697674418604651162790698 |   0.004186046511627906976744186 |  0.1609302325581395348837209302 | 2024-11-19 16:35:00
        2 | Energy     |        106.55 |           0.58 |              28.76 | 0.2922222222222222222222222222 |                           -0.01 | -0.2955555555555555555555555556 | 2024-11-19 16:20:00
        3 | Energy     |         98.77 |           0.64 |              29.45 | 0.2894594594594594594594594595 |  0.0035135135135135135135135135 |                           -0.10 | 2024-11-19 16:30:00

When finished, press Ctrl+C to close the connection between RisingWave and psycopg2.


In this tutorial, you learn:

  • How to get time-windowed aggregate results by using the tumble time window function.
  • How to join data sources with materialized views.