General User-defined Functions

User-defined functions are important features, because they significantly extend the expressiveness of Python Table API programs.

NOTE: Python UDF execution requires Python version (3.6, 3.7, 3.8 or 3.9) with PyFlink installed. It’s required on both the client side and the cluster side.

Scalar Functions

It supports to use Python scalar functions in Python Table API programs. In order to define a Python scalar function, one can extend the base class ScalarFunction in pyflink.table.udf and implement an evaluation method. The behavior of a Python scalar function is defined by the evaluation method which is named eval. The evaluation method can support variable arguments, such as eval(*args).

The following example shows how to define your own Python hash code function, register it in the TableEnvironment, and call it in a query. Note that you can configure your scalar function via a constructor before it is registered:

  1. from pyflink.table.expressions import call, col
  2. from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings
  3. from pyflink.table.udf import ScalarFunction, udf
  4. class HashCode(ScalarFunction):
  5. def __init__(self):
  6. self.factor = 12
  7. def eval(self, s):
  8. return hash(s) * self.factor
  9. settings = EnvironmentSettings.in_batch_mode()
  10. table_env = TableEnvironment.create(settings)
  11. hash_code = udf(HashCode(), result_type=DataTypes.BIGINT())
  12. # use the Python function in Python Table API
  13. my_table.select(col("string"), col("bigint"), hash_code(col("bigint")), call(hash_code, col("bigint")))
  14. # use the Python function in SQL API
  15. table_env.create_temporary_function("hash_code", udf(HashCode(), result_type=DataTypes.BIGINT()))
  16. table_env.sql_query("SELECT string, bigint, hash_code(bigint) FROM MyTable")

It also supports to use Java/Scala scalar functions in Python Table API programs.

  1. '''
  2. Java code:
  3. // The Java class must have a public no-argument constructor and can be founded in current Java classloader.
  4. public class HashCode extends ScalarFunction {
  5. private int factor = 12;
  6. public int eval(String s) {
  7. return s.hashCode() * factor;
  8. }
  9. }
  10. '''
  11. from pyflink.table.expressions import call, col
  12. from pyflink.table import TableEnvironment, EnvironmentSettings
  13. settings = EnvironmentSettings.in_batch_mode()
  14. table_env = TableEnvironment.create(settings)
  15. # register the Java function
  16. table_env.create_java_temporary_function("hash_code", "my.java.function.HashCode")
  17. # use the Java function in Python Table API
  18. my_table.select(call('hash_code', col("string")))
  19. # use the Java function in SQL API
  20. table_env.sql_query("SELECT string, bigint, hash_code(string) FROM MyTable")

There are many ways to define a Python scalar function besides extending the base class ScalarFunction. The following examples show the different ways to define a Python scalar function which takes two columns of bigint as the input parameters and returns the sum of them as the result.

  1. # option 1: extending the base class `ScalarFunction`
  2. class Add(ScalarFunction):
  3. def eval(self, i, j):
  4. return i + j
  5. add = udf(Add(), result_type=DataTypes.BIGINT())
  6. # option 2: Python function
  7. @udf(result_type=DataTypes.BIGINT())
  8. def add(i, j):
  9. return i + j
  10. # option 3: lambda function
  11. add = udf(lambda i, j: i + j, result_type=DataTypes.BIGINT())
  12. # option 4: callable function
  13. class CallableAdd(object):
  14. def __call__(self, i, j):
  15. return i + j
  16. add = udf(CallableAdd(), result_type=DataTypes.BIGINT())
  17. # option 5: partial function
  18. def partial_add(i, j, k):
  19. return i + j + k
  20. add = udf(functools.partial(partial_add, k=1), result_type=DataTypes.BIGINT())
  21. # register the Python function
  22. table_env.create_temporary_function("add", add)
  23. # use the function in Python Table API
  24. my_table.select(call('add', col('a'), col('b')))
  25. # You can also use the Python function in Python Table API directly
  26. my_table.select(add(col('a'), col('b')))

Table Functions

Similar to a Python user-defined scalar function, a user-defined table function takes zero, one, or multiple scalar values as input parameters. However in contrast to a scalar function, it can return an arbitrary number of rows as output instead of a single value. The return type of a Python UDTF could be of types Iterable, Iterator or generator.

The following example shows how to define your own Python multi emit function, register it in the TableEnvironment, and call it in a query.

  1. from pyflink.table.expressions import col
  2. from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings
  3. from pyflink.table.udf import TableFunction, udtf
  4. class Split(TableFunction):
  5. def eval(self, string):
  6. for s in string.split(" "):
  7. yield s, len(s)
  8. env_settings = EnvironmentSettings.in_streaming_mode()
  9. table_env = TableEnvironment.create(env_settings)
  10. my_table = ... # type: Table, table schema: [a: String]
  11. # register the Python Table Function
  12. split = udtf(Split(), result_types=[DataTypes.STRING(), DataTypes.INT()])
  13. # use the Python Table Function in Python Table API
  14. my_table.join_lateral(split(col("a")).alias("word", "length"))
  15. my_table.left_outer_join_lateral(split(col("a")).alias("word", "length"))
  16. # use the Python Table function in SQL API
  17. table_env.create_temporary_function("split", udtf(Split(), result_types=[DataTypes.STRING(), DataTypes.INT()]))
  18. table_env.sql_query("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)")
  19. table_env.sql_query("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE")

It also supports to use Java/Scala table functions in Python Table API programs.

  1. '''
  2. Java code:
  3. // The generic type "Tuple2<String, Integer>" determines the schema of the returned table as (String, Integer).
  4. // The java class must have a public no-argument constructor and can be founded in current java classloader.
  5. public class Split extends TableFunction<Tuple2<String, Integer>> {
  6. private String separator = " ";
  7. public void eval(String str) {
  8. for (String s : str.split(separator)) {
  9. // use collect(...) to emit a row
  10. collect(new Tuple2<String, Integer>(s, s.length()));
  11. }
  12. }
  13. }
  14. '''
  15. from pyflink.table.expressions import call, col
  16. from pyflink.table import TableEnvironment, EnvironmentSettings
  17. env_settings = EnvironmentSettings.in_streaming_mode()
  18. table_env = TableEnvironment.create(env_settings)
  19. my_table = ... # type: Table, table schema: [a: String]
  20. # Register the java function.
  21. table_env.create_java_temporary_function("split", "my.java.function.Split")
  22. # Use the table function in the Python Table API. "alias" specifies the field names of the table.
  23. my_table.join_lateral(call('split', col('a')).alias("word", "length")).select(col('a'), col('word'), col('length'))
  24. my_table.left_outer_join_lateral(call('split', col('a')).alias("word", "length")).select(col('a'), col('word'), col('length'))
  25. # Register the python function.
  26. # Use the table function in SQL with LATERAL and TABLE keywords.
  27. # CROSS JOIN a table function (equivalent to "join" in Table API).
  28. table_env.sql_query("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)")
  29. # LEFT JOIN a table function (equivalent to "left_outer_join" in Table API).
  30. table_env.sql_query("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE")

Like Python scalar functions, you can use the above five ways to define Python TableFunctions.

Note The only difference is that the return type of Python Table Functions needs to be an iterable, iterator or generator.

  1. # option 1: generator function
  2. @udtf(result_types=DataTypes.BIGINT())
  3. def generator_func(x):
  4. yield 1
  5. yield 2
  6. # option 2: return iterator
  7. @udtf(result_types=DataTypes.BIGINT())
  8. def iterator_func(x):
  9. return range(5)
  10. # option 3: return iterable
  11. @udtf(result_types=DataTypes.BIGINT())
  12. def iterable_func(x):
  13. result = [1, 2, 3]
  14. return result

Aggregate Functions

A user-defined aggregate function (UDAGG) maps scalar values of multiple rows to a new scalar value.

NOTE: Currently the general user-defined aggregate function is only supported in the GroupBy aggregation and Group Window Aggregation in streaming mode. For batch mode, it’s currently not supported and it is recommended to use the Vectorized Aggregate Functions.

The behavior of an aggregate function is centered around the concept of an accumulator. The accumulator is an intermediate data structure that stores the aggregated values until a final aggregation result is computed.

For each set of rows that need to be aggregated, the runtime will create an empty accumulator by calling create_accumulator(). Subsequently, the accumulate(...) method of the aggregate function will be called for each input row to update the accumulator. Currently after each row has been processed, the get_value(...) method of the aggregate function will be called to compute the aggregated result.

The following example illustrates the aggregation process:

UDAGG mechanism

In the above example, we assume a table that contains data about beverages. The table consists of three columns (id, name, and price) and 5 rows. We would like to find the highest price of all beverages in the table, i.e., perform a max() aggregation.

In order to define an aggregate function, one has to extend the base class AggregateFunction in pyflink.table and implement the evaluation method named accumulate(...). The result type and accumulator type of the aggregate function can be specified by one of the following two approaches:

  • Implement the method named get_result_type() and get_accumulator_type().
  • Wrap the function instance with the decorator udaf in pyflink.table.udf and specify the parameters result_type and accumulator_type.

The following example shows how to define your own aggregate function and call it in a query.

  1. from pyflink.common import Row
  2. from pyflink.table import AggregateFunction, DataTypes, TableEnvironment, EnvironmentSettings
  3. from pyflink.table.expressions import call
  4. from pyflink.table.udf import udaf
  5. from pyflink.table.expressions import col, lit
  6. from pyflink.table.window import Tumble
  7. class WeightedAvg(AggregateFunction):
  8. def create_accumulator(self):
  9. # Row(sum, count)
  10. return Row(0, 0)
  11. def get_value(self, accumulator):
  12. if accumulator[1] == 0:
  13. return None
  14. else:
  15. return accumulator[0] / accumulator[1]
  16. def accumulate(self, accumulator, value, weight):
  17. accumulator[0] += value * weight
  18. accumulator[1] += weight
  19. def retract(self, accumulator, value, weight):
  20. accumulator[0] -= value * weight
  21. accumulator[1] -= weight
  22. def get_result_type(self):
  23. return DataTypes.BIGINT()
  24. def get_accumulator_type(self):
  25. return DataTypes.ROW([
  26. DataTypes.FIELD("f0", DataTypes.BIGINT()),
  27. DataTypes.FIELD("f1", DataTypes.BIGINT())])
  28. env_settings = EnvironmentSettings.in_streaming_mode()
  29. table_env = TableEnvironment.create(env_settings)
  30. # the result type and accumulator type can also be specified in the udaf decorator:
  31. # weighted_avg = udaf(WeightedAvg(), result_type=DataTypes.BIGINT(), accumulator_type=...)
  32. weighted_avg = udaf(WeightedAvg())
  33. t = table_env.from_elements([(1, 2, "Lee"),
  34. (3, 4, "Jay"),
  35. (5, 6, "Jay"),
  36. (7, 8, "Lee")]).alias("value", "count", "name")
  37. # call function "inline" without registration in Table API
  38. result = t.group_by(col("name")).select(weighted_avg(col("value"), col("count")).alias("avg")).execute()
  39. result.print()
  40. # register function
  41. table_env.create_temporary_function("weighted_avg", WeightedAvg())
  42. # call registered function in Table API
  43. result = t.group_by(col("name")).select(call("weighted_avg", col("value"), col("count")).alias("avg")).execute()
  44. result.print()
  45. # register table
  46. table_env.create_temporary_view("source", t)
  47. # call registered function in SQL
  48. result = table_env.sql_query(
  49. "SELECT weighted_avg(`value`, `count`) AS avg FROM source GROUP BY name").execute()
  50. result.print()
  51. # use the general Python aggregate function in GroupBy Window Aggregation
  52. tumble_window = Tumble.over(lit(1).hours) \
  53. .on(col("rowtime")) \
  54. .alias("w")
  55. result = t.window(tumble_window) \
  56. .group_by(col('w'), col('name')) \
  57. .select(col('w').start, col('w').end, weighted_avg(col('value'), col('count'))) \
  58. .execute()
  59. result.print()

The accumulate(...) method of our WeightedAvg class takes three input arguments. The first one is the accumulator and the other two are user-defined inputs. In order to calculate a weighted average value, the accumulator needs to store the weighted sum and count of all the data that have already been accumulated. In our example, we use a Row object as the accumulator. Accumulators will be managed by Flink’s checkpointing mechanism and are restored in case of failover to ensure exactly-once semantics.

Mandatory and Optional Methods

The following methods are mandatory for each AggregateFunction:

  • create_accumulator()
  • accumulate(...)
  • get_value(...)

The following methods of AggregateFunction are required depending on the use case:

  • retract(...) is required when there are operations that could generate retraction messages before the current aggregation operation, e.g. group aggregate, outer join.
    This method is optional, but it is strongly recommended to be implemented to ensure the UDAF can be used in any use case.
  • merge(...) is required for session window ang hop window aggregations.
  • get_result_type() and get_accumulator_type() is required if the result type and accumulator type would not be specified in the udaf decorator.

ListView and MapView

If an accumulator needs to store large amounts of data, pyflink.table.ListView and pyflink.table.MapView could be used instead of list and dict. These two data structures provide the similar functionalities as list and dict, however usually having better performance by leveraging Flink’s state backend to eliminate unnecessary state access. You can use them by declaring DataTypes.LIST_VIEW(...) and DataTypes.MAP_VIEW(...) in the accumulator type, e.g.:

  1. from pyflink.table import ListView
  2. class ListViewConcatAggregateFunction(AggregateFunction):
  3. def get_value(self, accumulator):
  4. # the ListView is iterable
  5. return accumulator[1].join(accumulator[0])
  6. def create_accumulator(self):
  7. return Row(ListView(), '')
  8. def accumulate(self, accumulator, *args):
  9. accumulator[1] = args[1]
  10. # the ListView support add, clear and iterate operations.
  11. accumulator[0].add(args[0])
  12. def get_accumulator_type(self):
  13. return DataTypes.ROW([
  14. # declare the first column of the accumulator as a string ListView.
  15. DataTypes.FIELD("f0", DataTypes.LIST_VIEW(DataTypes.STRING())),
  16. DataTypes.FIELD("f1", DataTypes.BIGINT())])
  17. def get_result_type(self):
  18. return DataTypes.STRING()

Currently there are 2 limitations to use the ListView and MapView:

  1. The accumulator must be a Row.
  2. The ListView and MapView must be the first level children of the Row accumulator.

Please refer to the documentation of the corresponding classes for more information about this advanced feature.

NOTE: For reducing the data transmission cost between Python UDF worker and Java process caused by accessing the data in Flink states(e.g. accumulators and data views), there is a cached layer between the raw state handler and the Python state backend. You can adjust the values of these configuration options to change the behavior of the cache layer for best performance: python.state.cache-size, python.map-state.read-cache-size, python.map-state.write-cache-size, python.map-state.iterate-response-batch-size. For more details please refer to the Python Configuration Documentation.

Table Aggregate Functions

A user-defined table aggregate function (UDTAGG) maps scalar values of multiple rows to zero, one, or multiple rows (or structured types). The returned record may consist of one or more fields. If an output record consists of only a single field, the structured record can be omitted, and a scalar value can be emitted that will be implicitly wrapped into a row by the runtime.

NOTE: Currently the general user-defined table aggregate function is only supported in the GroupBy aggregation in streaming mode.

Similar to an aggregate function, the behavior of a table aggregate is centered around the concept of an accumulator. The accumulator is an intermediate data structure that stores the aggregated values until a final aggregation result is computed.

For each set of rows that needs to be aggregated, the runtime will create an empty accumulator by calling create_accumulator(). Subsequently, the accumulate(...) method of the function is called for each input row to update the accumulator. Once all rows have been processed, the emit_value(...) method of the function is called to compute and return the final result.

The following example illustrates the aggregation process:

UDTAGG mechanism

In the example, we assume a table that contains data about beverages. The table consists of three columns (id, name, and price) and 5 rows. We would like to find the 2 highest prices of all beverages in the table, i.e., perform a TOP2() table aggregation. We need to consider each of the 5 rows. The result is a table with the top 2 values.

In order to define a table aggregate function, one has to extend the base class TableAggregateFunction in pyflink.table and implement one or more evaluation methods named accumulate(...).

The result type and accumulator type of the aggregate function can be specified by one of the following two approaches:

  • Implement the method named get_result_type() and get_accumulator_type().
  • Wrap the function instance with the decorator udtaf in pyflink.table.udf and specify the parameters result_type and accumulator_type.

The following example shows how to define your own aggregate function and call it in a query.

  1. from pyflink.common import Row
  2. from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings
  3. from pyflink.table.expressions import col
  4. from pyflink.table.udf import udtaf, TableAggregateFunction
  5. class Top2(TableAggregateFunction):
  6. def emit_value(self, accumulator):
  7. yield Row(accumulator[0])
  8. yield Row(accumulator[1])
  9. def create_accumulator(self):
  10. return [None, None]
  11. def accumulate(self, accumulator, row):
  12. if row[0] is not None:
  13. if accumulator[0] is None or row[0] > accumulator[0]:
  14. accumulator[1] = accumulator[0]
  15. accumulator[0] = row[0]
  16. elif accumulator[1] is None or row[0] > accumulator[1]:
  17. accumulator[1] = row[0]
  18. def get_accumulator_type(self):
  19. return DataTypes.ARRAY(DataTypes.BIGINT())
  20. def get_result_type(self):
  21. return DataTypes.ROW(
  22. [DataTypes.FIELD("a", DataTypes.BIGINT())])
  23. env_settings = EnvironmentSettings.in_streaming_mode()
  24. table_env = TableEnvironment.create(env_settings)
  25. # the result type and accumulator type can also be specified in the udtaf decorator:
  26. # top2 = udtaf(Top2(), result_type=DataTypes.ROW([DataTypes.FIELD("a", DataTypes.BIGINT())]), accumulator_type=DataTypes.ARRAY(DataTypes.BIGINT()))
  27. top2 = udtaf(Top2())
  28. t = table_env.from_elements([(1, 'Hi', 'Hello'),
  29. (3, 'Hi', 'hi'),
  30. (5, 'Hi2', 'hi'),
  31. (7, 'Hi', 'Hello'),
  32. (2, 'Hi', 'Hello')],
  33. ['a', 'b', 'c'])
  34. # call function "inline" without registration in Table API
  35. t.group_by(col('b')).flat_aggregate(top2).select(col('*')).execute().print()
  36. # the result is:
  37. # b a
  38. # 0 Hi2 5.0
  39. # 1 Hi2 NaN
  40. # 2 Hi 7.0
  41. # 3 Hi 3.0

The accumulate(...) method of our Top2 class takes two inputs. The first one is the accumulator and the second one is the user-defined input. In order to calculate a result, the accumulator needs to store the 2 highest values of all the data that has been accumulated. Accumulators are automatically managed by Flink’s checkpointing mechanism and are restored in case of a failure to ensure exactly-once semantics. The result values are emitted together with a ranking index.

Mandatory and Optional Methods

The following methods are mandatory for each TableAggregateFunction:

  • create_accumulator()
  • accumulate(...)
  • emit_value(...)

The following methods of TableAggregateFunction are required depending on the use case:

  • retract(...) is required when there are operations that could generate retraction messages before the current aggregation operation, e.g. group aggregate, outer join.
    This method is optional, but it is strongly recommended to be implemented to ensure the UDTAF can be used in any use case.
  • get_result_type() and get_accumulator_type() is required if the result type and accumulator type would not be specified in the udtaf decorator.

ListView and MapView

Similar to Aggregation function, we can also use ListView and MapView in Table Aggregate Function.

  1. from pyflink.common import Row
  2. from pyflink.table import ListView
  3. from pyflink.table.types import DataTypes
  4. from pyflink.table.udf import TableAggregateFunction
  5. class ListViewConcatTableAggregateFunction(TableAggregateFunction):
  6. def emit_value(self, accumulator):
  7. result = accumulator[1].join(accumulator[0])
  8. yield Row(result)
  9. yield Row(result)
  10. def create_accumulator(self):
  11. return Row(ListView(), '')
  12. def accumulate(self, accumulator, *args):
  13. accumulator[1] = args[1]
  14. accumulator[0].add(args[0])
  15. def get_accumulator_type(self):
  16. return DataTypes.ROW([
  17. DataTypes.FIELD("f0", DataTypes.LIST_VIEW(DataTypes.STRING())),
  18. DataTypes.FIELD("f1", DataTypes.BIGINT())])
  19. def get_result_type(self):
  20. return DataTypes.ROW([DataTypes.FIELD("a", DataTypes.STRING())])