Procedures
Flink 1.18 and later versions support Call Statements, which make it easier to manipulate data and metadata of Paimon table by writing SQLs instead of submitting Flink jobs.
In 1.18, the procedure only supports passing arguments by position. You must pass all arguments in order, and if you don’t want to pass some arguments, you must use ''
as placeholder. For example, if you want to compact table default.t
with parallelism 4, but you don’t want to specify partitions and sort strategy, the call statement should beCALL sys.compact('default.t', '', '', '', 'sink.parallelism=4')
.
In higher versions, the procedure supports passing arguments by name. You can pass arguments in any order and any optional argument can be omitted. For the above example, the call statement isCALL sys.compact(`table` => 'default.t', options => 'sink.parallelism=4')
.
Specify partitions: we use string to represent partition filter. “,” means “AND” and “;” means “OR”. For example, if you want to specify two partitions date=01 and date=02, you need to write ‘date=01;date=02’; If you want to specify one partition with date=01 and day=01, you need to write ‘date=01,day=01’.
Table options syntax: we use string to represent table options. The format is ‘key1=value1,key2=value2…’.
All available procedures are listed below.
Procedure Name | Usage | Explanation | Example |
---|---|---|---|
compact | CALL [catalog.]sys.compact(‘table’) CALL [catalog.]sys.compact(‘table’, ‘partitions’) CALL [catalog.]sys.compact(‘table’, ‘partitions’, ‘orderstrategy’, ‘order_by’) CALL [catalog.]sys.compact(‘table’, ‘partitions’, ‘order_strategy’, ‘order_by’, ‘options’) CALL [catalog.]sys.compact(‘table’, ‘partitions’, ‘order_strategy’, ‘order_by’, ‘options’, ‘where’) CALL [catalog.]sys.compact(‘table’, ‘partitions’, ‘order_strategy’, ‘order_by’, ‘options’, ‘where’, ‘partition_idle_time’) | To compact a table. Arguments:where . | — use partition filter CALL sys.compact( table => ‘default.T’, partitions => ‘p=0’, order_strategy => ‘zorder’, order_by => ‘a,b’, options => ‘sink.parallelism=4’)— use partition predicate CALL sys.compact( table => ‘default.T’, where => ‘dt>10 and h<20’, order_strategy => ‘zorder’, order_by => ‘a,b’, options => ‘sink.parallelism=4’) |
compact_database | CALL [catalog.]sys.compact_database() CALL [catalog.]sys.compact_database(‘includingDatabases’) CALL [catalog.]sys.compact_database(‘includingDatabases’, ‘mode’) CALL [catalog.]sys.compact_database(‘includingDatabases’, ‘mode’, ‘includingTables’) CALL [catalog.]sys.compact_database(‘includingDatabases’, ‘mode’, ‘includingTables’, ‘excludingTables’) CALL [catalog.]sys.compact_database(‘includingDatabases’, ‘mode’, ‘includingTables’, ‘excludingTables’, ‘tableOptions’) CALL [catalog.]sys.compact_database(‘includingDatabases’, ‘mode’, ‘includingTables’, ‘excludingTables’, ‘tableOptions’, ‘partitionIdleTime’) | To compact databases. Arguments: | CALL sys.compact_database(‘db1|db2’, ‘combined’, ‘table.‘, ‘ignore’, ‘sink.parallelism=4’) |
create_tag | — based on the specified snapshot CALL [catalog.]sys.create_tag(‘identifier’, ‘tagName’, snapshotId) — based on the latest snapshot CALL [catalog.]sys.create_tag(‘identifier’, ‘tagName’) | To create a tag based on given snapshot. Arguments: | CALL sys.create_tag(‘default.T’, ‘my_tag’, 10, ‘1 d’) |
create_tag_from_timestamp | — Create a tag from the first snapshot whose commit-time greater than the specified timestamp. CALL [catalog.]sys.create_tag_from_timestamp(‘identifier’, ‘tagName’, timestamp, time_retained) | To create a tag based on given timestamp. Arguments: | — for Flink 1.18 CALL sys.create_tag_from_timestamp(‘default.T’, ‘my_tag’, 1724404318750, ‘1 d’) — for Flink 1.19 and later CALL sys.create_tag_from_timestamp( table => ‘default.T’, tag => ‘my_tag’, timestamp => 1724404318750, time_retained => ‘1 d’) |
create_tag_from_watermark | — Create a tag from the first snapshot whose watermark greater than the specified timestamp. CALL [catalog.]sys.create_tag_from_watermark(‘identifier’, ‘tagName’, watermark, time_retained) | To create a tag based on given watermark timestamp. Arguments: | — for Flink 1.18 CALL sys.create_tag_from_watermark(‘default.T’, ‘my_tag’, 1724404318750, ‘1 d’) — for Flink 1.19 and later CALL sys.create_tag_from_watermark( table => ‘default.T’, tag => ‘my_tag’, watermark => 1724404318750, time_retained => ‘1 d’) |
delete_tag | CALL [catalog.]sys.delete_tag(‘identifier’, ‘tagName’) | To delete a tag. Arguments: | CALL sys.delete_tag(‘default.T’, ‘my_tag’) |
merge_into | CALL [catalog].sys.merge_into(‘identifier’,’targetAlias’, ‘sourceSqls’,’sourceTable’,’mergeCondition’, ‘matchedUpsertCondition’,’matchedUpsertSetting’, ‘notMatchedInsertCondition’,’notMatchedInsertValues’, ‘matchedDeleteCondition’) | To perform “MERGE INTO” syntax. See merge_into action for details of arguments. | — for matched order rows, — increase the price, — and if there is no match, — insert the order from — the source table CALL sys.merge_into( target_table => ‘default.T’, source_table => ‘default.S’, merge_condition => ‘T.id=S.order_id’, matched_upsert_setting => ‘price=T.price+20’, not_matched_insert_values => ‘‘) |
remove_orphan_files | CALL [catalog.]sys.remove_orphan_files(‘identifier’) CALL [catalog.]sys.remove_orphan_files(‘identifier’, ‘olderThan’) CALL [catalog.]sys.remove_orphan_files(‘identifier’, ‘olderThan’, ‘dryRun’) | To remove the orphan data files and metadata files. Arguments: | CALL remove_orphan_files(‘default.T’, ‘2023-10-31 12:00:00’) CALL remove_orphan_files(‘default.‘, ‘2023-10-31 12:00:00’) CALL remove_orphan_files(‘default.T’, ‘2023-10-31 12:00:00’, true) |
reset_consumer | — reset the new next snapshot id in the consumer CALL [catalog.]sys.reset_consumer(‘identifier’, ‘consumerId’, nextSnapshotId) — delete consumer CALL [catalog.]sys.reset_consumer(‘identifier’, ‘consumerId’) | To reset or delete consumer. Arguments: | CALL sys.reset_consumer(‘default.T’, ‘myid’, 10) |
rollback_to | — rollback to a snapshot CALL sys.rollback_to( table => ‘identifier’, snapshot_id => snapshotId)— rollback to a tag CALL sys.rollback_to( table => ‘identifier’, tag => ‘tagName’) | To rollback to a specific version of target table. Argument: | CALL sys.rollback_to(table => ‘default.T’, snapshot_id => 10) |
expire_snapshots | — for Flink 1.18 CALL sys.expire_snapshots(table, retain_max) — for Flink 1.19 and later CALL sys.expire_snapshots(table, retain_max, retain_min, older_than, max_deletes) | To expire snapshots. Argument: | — for Flink 1.18 CALL sys.expire_snapshots(‘default.T’, 2) — for Flink 1.19 and later CALL sys.expire_snapshots( table => ‘default.T’, retain_max => 2)CALL sys.expire_snapshots( table => ‘default.T’, older_than => ‘2024-01-01 12:00:00’)CALL sys.expire_snapshots( table => ‘default.T’, older_than => ‘2024-01-01 12:00:00’, retain_min => 10)CALL sys.expire_snapshots( table => ‘default.T’, older_than => ‘2024-01-01 12:00:00’, max_deletes => 10) |
expire_partitions | CALL sys.expire_partitions(table, expiration_time, timestamp_formatter, expire_strategy) | To expire partitions. Argument: | — for Flink 1.18 CALL sys.expire_partitions(‘default.T’, ‘1 d’, ‘yyyy-MM-dd’, ‘$dt’, ‘values-time’) — for Flink 1.19 and later CALL sys.expire_partitions( table => ‘default.T’, expiration_time => ‘1 d’, timestamp_formatter => ‘yyyy-MM-dd’, expire_strategy => ‘values-time’)CALL sys.expire_partitions( table => ‘default.T’, expiration_time => ‘1 d’, timestamp_formatter => ‘yyyy-MM-dd HH:mm’, timestamp_pattern => ‘$dt $hm’, expire_strategy => ‘values-time’) |
repair | — repair all databases and tables in catalog CALL sys.repair() — repair all tables in a specific database CALL sys.repair(‘databaseName’) — repair a table CALL sys.repair(‘databaseName.tableName’) — repair database and table in a string if you specify multiple tags, delimiter is ‘,’ CALL sys.repair(‘databaseName01,database02.tableName01,database03’) | Synchronize information from the file system to Metastore. Argument: | CALL sys.repair(‘test_db.T’) |
rewrite_file_index | CALL sys.rewrite_file_index(<identifier> [, <partitions>]) | Rewrite the file index for the table. Argument: | — rewrite the file index for the whole table CALL sys.rewrite_file_index(‘test_db.T’) — repair all tables in a specific partition CALL sys.rewrite_file_index(‘test_db.T’, ‘pt=a’) |
create_branch | — based on the specified tag CALL [catalog.]sys.create_branch(‘identifier’, ‘branchName’, ‘tagName’) — create empty branch CALL [catalog.]sys.create_branch(‘identifier’, ‘branchName’) | To create a branch based on given tag, or just create empty branch. Arguments: | CALL sys.create_branch(‘default.T’, ‘branch1’, ‘tag1’) CALL sys.create_branch(‘default.T’, ‘branch1’) |
delete_branch | CALL [catalog.]sys.delete_branch(‘identifier’, ‘branchName’) | To delete a branch. Arguments: | CALL sys.delete_branch(‘default.T’, ‘branch1’) |
fast_forward | CALL [catalog.]sys.fast_forward(‘identifier’, ‘branchName’) | To fast_forward a branch to main branch. Arguments: | CALL sys.fast_forward(‘default.T’, ‘branch1’) |