User-defined Functions

PyFlink Table API empowers users to do data transformations with Python user-defined functions.

Currently, it supports two kinds of Python user-defined functions: the general Python user-defined functions which process data one row at a time and vectorized Python user-defined functions which process data one batch at a time.

Bundling UDFs

To run Python UDFs (as well as Pandas UDFs) in any non-local mode, it is strongly recommended bundling your Python UDF definitions using the config option python-files, if your Python UDFs live outside the file where the main() function is defined. Otherwise, you may run into ModuleNotFoundError: No module named 'my_udf' if you define Python UDFs in a file called my_udf.py.

Loading resources in UDFs

There are scenarios when you want to load some resources in UDFs first, then running computation (i.e., eval) over and over again, without having to re-load the resources. For example, you may want to load a large deep learning model only once, then run batch prediction against the model multiple times.

Overriding the open method of UserDefinedFunction is exactly what you need.

  1. class Predict(ScalarFunction):
  2. def open(self, function_context):
  3. import pickle
  4. with open("resources.zip/resources/model.pkl", "rb") as f:
  5. self.model = pickle.load(f)
  6. def eval(self, x):
  7. return self.model.predict(x)
  8. predict = udf(Predict(), result_type=DataTypes.DOUBLE(), func_type="pandas")

Testing User-Defined Functions

Suppose you have defined a Python user-defined function as following:

  1. add = udf(lambda i, j: i + j, result_type=DataTypes.BIGINT())

To unit test it, you need to extract the original Python function using ._func and then unit test it:

  1. f = add._func
  2. assert f(1, 2) == 3