Temporal Table Function

A Temporal table function provides access to the version of a temporal table at a specific point in time. In order to access the data in a temporal table, one must pass a time attribute that determines the version of the table that will be returned. Flink uses the SQL syntax of table functions to provide a way to express it.

Unlike a versioned table, temporal table functions can only be defined on top of append-only streams — it does not support changelog inputs. Additionally, a temporal table function cannot be defined in pure SQL DDL.

Defining a Temporal Table Function

Temporal table functions can be defined on top of append-only streams using the Table API. The table is registered with one or more key columns, and a time attribute used for versioning.

Suppose we have an append-only table of currency rates that we would like to register as a temporal table function.

  1. SELECT * FROM currency_rates;
  2. update_time currency rate
  3. ============= ========= ====
  4. 09:00:00 Yen 102
  5. 09:00:00 Euro 114
  6. 09:00:00 USD 1
  7. 11:15:00 Euro 119
  8. 11:49:00 Pounds 108

Using the Table API, we can register this stream using currency for the key and update_time as the versioning time attribute.

Java

  1. TemporalTableFunction rates = tEnv
  2. .from("currency_rates")
  3. .createTemporalTableFunction("update_time", "currency");
  4. tEnv.createTemporarySystemFunction("rates", rates);

Scala

  1. rates = tEnv
  2. .from("currency_rates")
  3. .createTemporalTableFunction("update_time", "currency")
  4. tEnv.createTemporarySystemFunction("rates", rates);

Python

  1. Still not supported in Python Table API.

Temporal Table Function Join

Once defined, a temporal table function is used as a standard table function. Append-only tables (left input/probe side) can join with a temporal table (right input/build side), i.e., a table that changes over time and tracks its changes, to retrieve the value for a key as it was at a particular point in time.

Consider an append-only table orders that tracks customers’ orders in different currencies.

  1. SELECT * FROM orders;
  2. order_time amount currency
  3. ========== ====== =========
  4. 10:15 2 Euro
  5. 10:30 1 USD
  6. 10:32 50 Yen
  7. 10:52 3 Euro
  8. 11:04 5 USD

Given these tables, we would like to convert orders to a common currency — USD.

SQL

  1. SELECT
  2. SUM(amount * rate) AS amount
  3. FROM
  4. orders,
  5. LATERAL TABLE (rates(order_time))
  6. WHERE
  7. rates.currency = orders.currency

Java

  1. Table result = orders
  2. .joinLateral(call("rates", $("o_proctime")), $("o_currency").isEqual($("r_currency")))
  3. .select($("(o_amount").times($("r_rate")).sum().as("amount"));

Scala

  1. val result = orders
  2. .joinLateral($"rates(order_time)", $"orders.currency = rates.currency")
  3. .select($"(o_amount * r_rate).sum as amount"))

Python

  1. Still not supported in Python API.