Versioned Tables

Flink SQL operates over dynamic tables that evolve, which may either be append-only or updating. Versioned tables represent a special type of updating table that remembers the past values for each key.

Concept

Dynamic tables define relations over time. Often, particularly when working with metadata, a key’s old value does not become irrelevant when it changes.

Flink SQL can define versioned tables over any dynamic table with a PRIMARY KEY constraint and time attribute.

A primary key constraint in Flink means that a column or set of columns of a table or view are unique and non-null. The primary key semantic on a upserting table means the materialized changes for a particular key (INSERT/UPDATE/DELETE) represent the changes to a single row over time. The time attribute on a upserting table defines when each change occurred.

Taken together, Flink can track the changes to a row over time and maintain the period for which each value was valid for that key.

Suppose a table tracks the prices of different products in a store.

  1. (changelog kind) update_time product_id product_name price
  2. ================= =========== ========== ============ =====
  3. +(INSERT) 00:01:00 p_001 scooter 11.11
  4. +(INSERT) 00:02:00 p_002 basketball 23.11
  5. -(UPDATE_BEFORE) 12:00:00 p_001 scooter 11.11
  6. +(UPDATE_AFTER) 12:00:00 p_001 scooter 12.99
  7. -(UPDATE_BEFORE) 12:00:00 p_002 basketball 23.11
  8. +(UPDATE_AFTER) 12:00:00 p_002 basketball 19.99
  9. -(DELETE) 18:00:00 p_001 scooter 12.99

Given this set of changes, we track how the price of a scooter changes over time. It is initially $11.11 at 00:01:00 when added to the catalog. The price then rises to $12.99 at 12:00:00 before being deleted from the catalog at 18:00:00.

If we queried the table for various products’ prices at different times, we would retrieve different results. At 10:00:00 the table would show one set of prices.

  1. update_time product_id product_name price
  2. =========== ========== ============ =====
  3. 00:01:00 p_001 scooter 11.11
  4. 00:02:00 p_002 basketball 23.11

While at 13:00:00, we would find another set of prices.

  1. update_time product_id product_name price
  2. =========== ========== ============ =====
  3. 12:00:00 p_001 scooter 12.99
  4. 12:00:00 p_002 basketball 19.99

Versioned Table Sources

Versioned tables are defined implicitly for any tables whose underlying sources or formats directly define changelogs. Examples include the upsert Kafka source as well as database changelog formats such as debezium and canal. As discussed above, the only additional requirement is the CREATE table statement must contain a PRIMARY KEY and an event-time attribute.

  1. CREATE TABLE products (
  2. product_id STRING,
  3. product_name STRING,
  4. price DECIMAL(32, 2),
  5. update_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
  6. PRIMARY KEY (product_id) NOT ENFORCED,
  7. WATERMARK FOR update_time AS update_time
  8. ) WITH (...);

Versioned Table Views

Flink also supports defining versioned views if the underlying query contains a unique key constraint and event-time attribute. Imagine an append-only table of currency rates.

  1. CREATE TABLE currency_rates (
  2. currency STRING,
  3. rate DECIMAL(32, 10),
  4. update_time TIMESTAMP(3),
  5. WATERMARK FOR update_time AS update_time
  6. ) WITH (
  7. 'connector' = 'kafka',
  8. 'topic' = 'rates',
  9. 'properties.bootstrap.servers' = 'localhost:9092',
  10. 'format' = 'json'
  11. );

The table currency_rates contains a row for each currency — with respect to USD — and receives a new row each time the rate changes. The JSON format does not support native changelog semantics, so Flink can only read this table as append-only.

  1. (changelog kind) update_time currency rate
  2. ================ ============= ========= ====
  3. +(INSERT) 09:00:00 Yen 102
  4. +(INSERT) 09:00:00 Euro 114
  5. +(INSERT) 09:00:00 USD 1
  6. +(INSERT) 11:15:00 Euro 119
  7. +(INSERT) 11:49:00 Pounds 108

Flink interprets each row as an INSERT to the table, meaning we cannot define a PRIMARY KEY over currency. However, it is clear to us (the query developer) that this table has all the necessary information to define a versioned table. Flink can reinterpret this table as a versioned table by defining a deduplication query which produces an ordered changelog stream with an inferred primary key (currency) and event time (update_time).

  1. -- Define a versioned view
  2. CREATE VIEW versioned_rates AS
  3. SELECT currency, rate, update_time -- (1) `update_time` keeps the event time
  4. FROM (
  5. SELECT *,
  6. ROW_NUMBER() OVER (PARTITION BY currency -- (2) the inferred unique key `currency` can be a primary key
  7. ORDER BY update_time DESC) AS rownum
  8. FROM currency_rates)
  9. WHERE rownum = 1;
  10. -- the view `versioned_rates` will produce a changelog as the following.
  11. (changelog kind) update_time currency rate
  12. ================ ============= ========= ====
  13. +(INSERT) 09:00:00 Yen 102
  14. +(INSERT) 09:00:00 Euro 114
  15. +(INSERT) 09:00:00 USD 1
  16. +(UPDATE_AFTER) 10:45:00 Euro 116
  17. +(UPDATE_AFTER) 11:15:00 Euro 119
  18. +(INSERT) 11:49:00 Pounds 108

Flink has a special optimization step that will efficiently transform this query into a versioned table usable in subsequent queries. In general, the results of a query with the following format produces a versioned table:

  1. SELECT [column_list]
  2. FROM (
  3. SELECT [column_list],
  4. ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
  5. ORDER BY time_attr DESC) AS rownum
  6. FROM table_name)
  7. WHERE rownum = 1

Parameter Specification:

  • ROW_NUMBER(): Assigns an unique, sequential number to each row, starting with one.
  • PARTITION BY col1[, col2...]: Specifies the partition columns, i.e. the deduplicate key. These columns form the primary key of the subsequent versioned table.
  • ORDER BY time_attr DESC: Specifies the ordering column, it must be a time attribute.
  • WHERE rownum = 1: The rownum = 1 is required for Flink to recognize this query is to generate a versioned table.