Procedures

Flink 1.18 and later versions support Call Statements, which make it easier to manipulate data and metadata of Paimon table by writing SQLs instead of submitting Flink jobs.

In 1.18, the procedure only supports passing arguments by position. You must pass all arguments in order, and if you don’t want to pass some arguments, you must use '' as placeholder. For example, if you want to compact table default.t with parallelism 4, but you don’t want to specify partitions and sort strategy, the call statement should be
CALL sys.compact('default.t', '', '', '', 'sink.parallelism=4').

In higher versions, the procedure supports passing arguments by name. You can pass arguments in any order and any optional argument can be omitted. For the above example, the call statement is
CALL sys.compact(`table` => 'default.t', options => 'sink.parallelism=4').

Specify partitions: we use string to represent partition filter. “,” means “AND” and “;” means “OR”. For example, if you want to specify two partitions date=01 and date=02, you need to write ‘date=01;date=02’; If you want to specify one partition with date=01 and day=01, you need to write ‘date=01,day=01’.

Table options syntax: we use string to represent table options. The format is ‘key1=value1,key2=value2…’.

All available procedures are listed below.

Procedure NameUsageExplanationExample
compactCALL [catalog.]sys.compact(‘table’)

CALL [catalog.]sys.compact(‘table’, ‘partitions’)

CALL [catalog.]sys.compact(‘table’, ‘partitions’, ‘orderstrategy’, ‘order_by’)

CALL [catalog.]sys.compact(‘table’, ‘partitions’, ‘order_strategy’, ‘order_by’, ‘options’)

CALL [catalog.]sys.compact(‘table’, ‘partitions’, ‘order_strategy’, ‘order_by’, ‘options’, ‘where’)

CALL [catalog.]sys.compact(‘table’, ‘partitions’, ‘order_strategy’, ‘order_by’, ‘options’, ‘where’, ‘partition_idle_time’)

To compact a table. Arguments:
  • table(required): the target table identifier.
  • partitions(optional): partition filter.
  • order_strategy(optional): ‘order’ or ‘zorder’ or ‘hilbert’ or ‘none’.
  • order_by(optional): the columns need to be sort. Left empty if ‘order_strategy’ is ‘none’.
  • options(optional): additional dynamic options of the table.
  • where(optional): partition predicate(Can’t be used together with “partitions”). Note: as where is a keyword,a pair of backticks need to add around like where.
  • partition_idle_time(optional): this is used to do a full compaction for partition which had not received any new data for ‘partition_idle_time’. And only these partitions will be compacted. This argument can not be used with order compact.
  • — use partition filter
    CALL sys.compact(table => ‘default.T’, partitions => ‘p=0’, order_strategy => ‘zorder’, order_by => ‘a,b’, options => ‘sink.parallelism=4’)
    — use partition predicate
    CALL sys.compact(table => ‘default.T’, where => ‘dt>10 and h<20’, order_strategy => ‘zorder’, order_by => ‘a,b’, options => ‘sink.parallelism=4’)
    compact_databaseCALL [catalog.]sys.compact_database()

    CALL [catalog.]sys.compact_database(‘includingDatabases’)

    CALL [catalog.]sys.compact_database(‘includingDatabases’, ‘mode’)

    CALL [catalog.]sys.compact_database(‘includingDatabases’, ‘mode’, ‘includingTables’)

    CALL [catalog.]sys.compact_database(‘includingDatabases’, ‘mode’, ‘includingTables’, ‘excludingTables’)

    CALL [catalog.]sys.compact_database(‘includingDatabases’, ‘mode’, ‘includingTables’, ‘excludingTables’, ‘tableOptions’) CALL [catalog.]sys.compact_database(‘includingDatabases’, ‘mode’, ‘includingTables’, ‘excludingTables’, ‘tableOptions’, ‘partitionIdleTime’)
    To compact databases. Arguments:
  • includingDatabases: to specify databases. You can use regular expression.
  • mode: compact mode. “divided”: start a sink for each table, detecting the new table requires restarting the job; “combined” (default): start a single combined sink for all tables, the new table will be automatically detected.
  • includingTables: to specify tables. You can use regular expression.
  • excludingTables: to specify tables that are not compacted. You can use regular expression.
  • tableOptions: additional dynamic options of the table.
  • partition_idle_time: this is used to do a full compaction for partition which had not received any new data for ‘partition_idle_time’. And only these partitions will be compacted.
  • CALL sys.compact_database(‘db1|db2’, ‘combined’, ‘table.‘, ‘ignore’, ‘sink.parallelism=4’)
    create_tag— based on the specified snapshot
    CALL [catalog.]sys.create_tag(‘identifier’, ‘tagName’, snapshotId)
    — based on the latest snapshot
    CALL [catalog.]sys.create_tag(‘identifier’, ‘tagName’)
    To create a tag based on given snapshot. Arguments:
  • identifier: the target table identifier. Cannot be empty.
  • tagName: name of the new tag.
  • snapshotId (Long): id of the snapshot which the new tag is based on.
  • time_retained: The maximum time retained for newly created tags.
  • CALL sys.create_tag(‘default.T’, ‘my_tag’, 10, ‘1 d’)
    create_tag_from_timestamp— Create a tag from the first snapshot whose commit-time greater than the specified timestamp.
    CALL [catalog.]sys.create_tag_from_timestamp(‘identifier’, ‘tagName’, timestamp, time_retained)
    To create a tag based on given timestamp. Arguments:
  • identifier: the target table identifier. Cannot be empty.
  • tag: name of the new tag.
  • timestamp (Long): Find the first snapshot whose commit-time greater than this timestamp.
  • time_retained : The maximum time retained for newly created tags.
  • — for Flink 1.18
    CALL sys.create_tag_from_timestamp(‘default.T’, ‘my_tag’, 1724404318750, ‘1 d’) — for Flink 1.19 and later
    CALL sys.create_tag_from_timestamp(table => ‘default.T’, tag => ‘my_tag’, timestamp => 1724404318750, time_retained => ‘1 d’)
    create_tag_from_watermark— Create a tag from the first snapshot whose watermark greater than the specified timestamp.
    CALL [catalog.]sys.create_tag_from_watermark(‘identifier’, ‘tagName’, watermark, time_retained)
    To create a tag based on given watermark timestamp. Arguments:
  • identifier: the target table identifier. Cannot be empty.
  • tag: name of the new tag.
  • watermark (Long): Find the first snapshot whose watermark greater than the specified watermark.
  • time_retained : The maximum time retained for newly created tags.
  • — for Flink 1.18
    CALL sys.create_tag_from_watermark(‘default.T’, ‘my_tag’, 1724404318750, ‘1 d’) — for Flink 1.19 and later
    CALL sys.create_tag_from_watermark(table => ‘default.T’, tag => ‘my_tag’, watermark => 1724404318750, time_retained => ‘1 d’)
    delete_tagCALL [catalog.]sys.delete_tag(‘identifier’, ‘tagName’)To delete a tag. Arguments:
  • identifier: the target table identifier. Cannot be empty.
  • tagName: name of the tag to be deleted. If you specify multiple tags, delimiter is ‘,’.
  • CALL sys.delete_tag(‘default.T’, ‘my_tag’)
    merge_intoCALL [catalog].sys.merge_into(‘identifier’,’targetAlias’,
    ‘sourceSqls’,’sourceTable’,’mergeCondition’,
    ‘matchedUpsertCondition’,’matchedUpsertSetting’,
    ‘notMatchedInsertCondition’,’notMatchedInsertValues’,
    ‘matchedDeleteCondition’)

    To perform “MERGE INTO” syntax. See merge_into action for details of arguments.— for matched order rows,
    — increase the price,
    — and if there is no match,
    — insert the order from
    — the source table
    CALL sys.merge_into(
    target_table => ‘default.T’,
    source_table => ‘default.S’,
    merge_condition => ‘T.id=S.order_id’,
    matched_upsert_setting => ‘price=T.price+20’,
    not_matched_insert_values => ‘‘)

    remove_orphan_filesCALL [catalog.]sys.remove_orphan_files(‘identifier’)

    CALL [catalog.]sys.remove_orphan_files(‘identifier’, ‘olderThan’)

    CALL [catalog.]sys.remove_orphan_files(‘identifier’, ‘olderThan’, ‘dryRun’)
    To remove the orphan data files and metadata files. Arguments:
  • identifier: the target table identifier. Cannot be empty, you can use database_name. to clean whole database.
  • olderThan: to avoid deleting newly written files, this procedure only deletes orphan files older than 1 day by default. This argument can modify the interval.
  • dryRun: when true, view only orphan files, don’t actually remove files. Default is false.
  • CALL remove_orphan_files(‘default.T’, ‘2023-10-31 12:00:00’)

    CALL remove_orphan_files(‘default.‘, ‘2023-10-31 12:00:00’)

    CALL remove_orphan_files(‘default.T’, ‘2023-10-31 12:00:00’, true)
    reset_consumer— reset the new next snapshot id in the consumer
    CALL [catalog.]sys.reset_consumer(‘identifier’, ‘consumerId’, nextSnapshotId)

    — delete consumer
    CALL [catalog.]sys.reset_consumer(‘identifier’, ‘consumerId’)
    To reset or delete consumer. Arguments:
  • identifier: the target table identifier. Cannot be empty.
  • consumerId: consumer to be reset or deleted.
  • nextSnapshotId (Long): the new next snapshot id of the consumer.
  • CALL sys.reset_consumer(‘default.T’, ‘myid’, 10)
    rollback_to— rollback to a snapshot
    CALL sys.rollback_to(table => ‘identifier’, snapshot_id => snapshotId)

    — rollback to a tag
    CALL sys.rollback_to(table => ‘identifier’, tag => ‘tagName’)
    To rollback to a specific version of target table. Argument:
  • identifier: the target table identifier. Cannot be empty.
  • snapshotId (Long): id of the snapshot that will roll back to.
  • tagName: name of the tag that will roll back to.
  • CALL sys.rollback_to(table => ‘default.T’, snapshot_id => 10)
    expire_snapshots— for Flink 1.18
    CALL sys.expire_snapshots(table, retain_max)

    — for Flink 1.19 and later
    CALL sys.expire_snapshots(table, retain_max, retain_min, older_than, max_deletes)

    To expire snapshots. Argument:
  • table: the target table identifier. Cannot be empty.
  • retain_max: the maximum number of completed snapshots to retain.
  • retain_min: the minimum number of completed snapshots to retain.
  • order_than: timestamp before which snapshots will be removed.
  • max_deletes: the maximum number of snapshots that can be deleted at once.
  • — for Flink 1.18

    CALL sys.expire_snapshots(‘default.T’, 2)

    — for Flink 1.19 and later

    CALL sys.expire_snapshots(table => ‘default.T’, retain_max => 2)

    CALL sys.expire_snapshots(table => ‘default.T’, older_than => ‘2024-01-01 12:00:00’)

    CALL sys.expire_snapshots(table => ‘default.T’, older_than => ‘2024-01-01 12:00:00’, retain_min => 10)

    CALL sys.expire_snapshots(table => ‘default.T’, older_than => ‘2024-01-01 12:00:00’, max_deletes => 10)

    expire_partitionsCALL sys.expire_partitions(table, expiration_time, timestamp_formatter, expire_strategy)

    To expire partitions. Argument:
  • table: the target table identifier. Cannot be empty.
  • expiration_time: the expiration interval of a partition. A partition will be expired if it‘s lifetime is over this value. Partition time is extracted from the partition value.
  • timestamp_formatter: the formatter to format timestamp from string.
  • timestamp_pattern: the pattern to get a timestamp from partitions.
  • expire_strategy: specifies the expiration strategy for partition expiration, possible values: ‘values-time’ or ‘update-time’ , ‘values-time’ as default.
  • — for Flink 1.18

    CALL sys.expire_partitions(‘default.T’, ‘1 d’, ‘yyyy-MM-dd’, ‘$dt’, ‘values-time’)

    — for Flink 1.19 and later

    CALL sys.expire_partitions(table => ‘default.T’, expiration_time => ‘1 d’, timestamp_formatter => ‘yyyy-MM-dd’, expire_strategy => ‘values-time’)
    CALL sys.expire_partitions(table => ‘default.T’, expiration_time => ‘1 d’, timestamp_formatter => ‘yyyy-MM-dd HH:mm’, timestamp_pattern => ‘$dt $hm’, expire_strategy => ‘values-time’)

    repair— repair all databases and tables in catalog
    CALL sys.repair()

    — repair all tables in a specific database
    CALL sys.repair(‘databaseName’)

    — repair a table
    CALL sys.repair(‘databaseName.tableName’)

    — repair database and table in a string if you specify multiple tags, delimiter is ‘,’
    CALL sys.repair(‘databaseName01,database02.tableName01,database03’)
    Synchronize information from the file system to Metastore. Argument:
  • empty: all databases and tables in catalog.
  • databaseName : the target database name.
  • tableName: the target table identifier.
  • CALL sys.repair(‘test_db.T’)
    rewrite_file_indexCALL sys.rewrite_file_index(<identifier> [, <partitions>])

    Rewrite the file index for the table. Argument:
  • identifier: <databaseName>.<tableName>.
  • partitions : specific partitions.
  • — rewrite the file index for the whole table
    CALL sys.rewrite_file_index(‘test_db.T’)

    — repair all tables in a specific partition
    CALL sys.rewrite_file_index(‘test_db.T’, ‘pt=a’)

    create_branch— based on the specified tag
    CALL [catalog.]sys.create_branch(‘identifier’, ‘branchName’, ‘tagName’) — create empty branch
    CALL [catalog.]sys.create_branch(‘identifier’, ‘branchName’)
    To create a branch based on given tag, or just create empty branch. Arguments:
  • identifier: the target table identifier. Cannot be empty.
  • branchName: name of the new branch.
  • tagName: name of the tag which the new branch is based on.
  • CALL sys.create_branch(‘default.T’, ‘branch1’, ‘tag1’)

    CALL sys.create_branch(‘default.T’, ‘branch1’)

    delete_branchCALL [catalog.]sys.delete_branch(‘identifier’, ‘branchName’)To delete a branch. Arguments:
  • identifier: the target table identifier. Cannot be empty.
  • branchName: name of the branch to be deleted. If you specify multiple branches, delimiter is ‘,’.
  • CALL sys.delete_branch(‘default.T’, ‘branch1’)
    fast_forwardCALL [catalog.]sys.fast_forward(‘identifier’, ‘branchName’)To fast_forward a branch to main branch. Arguments:
  • identifier: the target table identifier. Cannot be empty.
  • branchName: name of the branch to be merged.
  • CALL sys.fast_forward(‘default.T’, ‘branch1’)