Replicate Data to Storage Services

Starting from TiDB v6.5.0, TiCDC supports saving row change events to storage services, including Amazon S3, GCS, Azure Blob Storage, and NFS. This document describes how to create a changefeed that replicates incremental data to such storage services using TiCDC, and how data is stored. The organization of this document is as follows:

Replicate change data to storage services

Run the following command to create a changefeed task:

  1. cdc cli changefeed create \
  2. --server=http://10.0.10.25:8300 \
  3. --sink-uri="s3://logbucket/storage_test?protocol=canal-json" \
  4. --changefeed-id="simple-replication-task"

The output is as follows:

  1. Info: {"upstream_id":7171388873935111376,"namespace":"default","id":"simple-replication-task","sink_uri":"s3://logbucket/storage_test?protocol=canal-json","create_time":"2024-04-26T18:52:05.566016967+08:00","start_ts":437706850431664129,"engine":"unified","config":{"case_sensitive":false,"enable_old_value":true,"force_replicate":false,"ignore_ineligible_table":false,"check_gc_safe_point":true,"enable_sync_point":false,"sync_point_interval":600000000000,"sync_point_retention":86400000000000,"filter":{"rules":["*.*"],"event_filters":null},"mounter":{"worker_num":16},"sink":{"protocol":"canal-json","schema_registry":"","csv":{"delimiter":",","quote":"\"","null":"\\N","include_commit_ts":false},"column_selectors":null,"transaction_atomicity":"none","encoder_concurrency":16,"terminator":"\r\n","date_separator":"none","enable_partition_separator":false},"consistent":{"level":"none","max_log_size":64,"flush_interval":2000,"storage":""}},"state":"normal","creator_version":"v7.1.5"}
  • --server: The address of any TiCDC server in the TiCDC cluster.
  • --changefeed-id: The ID of the changefeed. The format must match the ^[a-zA-Z0-9]+(\-[a-zA-Z0-9]+)*$ regular expression. If this ID is not specified, TiCDC automatically generates a UUID (the version 4 format) as the ID.
  • --sink-uri: The downstream address of the changefeed. For details, see Configure sink URI.
  • --start-ts: The starting TSO of the changefeed. TiCDC starts pulling data from this TSO. The default value is the current time.
  • --target-ts: The ending TSO of the changefeed. TiCDC stops pulling data until this TSO. The default value is empty, which means that TiCDC does not automatically stop pulling data.
  • --config: The configuration file of the changefeed. For details, see TiCDC changefeed configuration parameters.

Configure sink URI

This section describes how to configure Sink URI for storage services, including Amazon S3, GCS, Azure Blob Storage, and NFS. Sink URI is used to specify the connection information of the TiCDC target system. The format is as follows:

  1. [scheme]://[host]/[path]?[query_parameters]

For [query_parameters] in the URI, the following parameters can be configured:

ParameterDescriptionDefault valueValue range
worker-countConcurrency for saving data changes to cloud storage in the downstream.16[1, 512]
flush-intervalInterval for saving data changes to cloud storage in the downstream.5s[2s, 10m]
file-sizeA data change file is stored to cloud storage if the number of bytes exceeds the value of this parameter.67108864[1048576, 536870912]
protocolThe protocol format of the messages sent to the downstream.N/Acanal-json and csv
enable-tidb-extensionWhen protocol is set to canal-json and enable-tidb-extension is set to true, TiCDC sends WATERMARK events and adds the TiDB extension field to Canal-JSON messages.falsefalse and true

Replicate Data to Storage Services - 图1

Note

Data change files are saved to the downstream when either flush-interval or file-size meets the requirements. The protocol parameter is mandatory. If TiCDC does not receive this parameter when creating a changefeed, the CDC:ErrSinkUnknownProtocol error is returned.

Configure sink URI for external storage

The following is an example configuration for Amazon S3:

  1. --sink-uri="s3://bucket/prefix?protocol=canal-json"

The following is an example configuration for GCS:

  1. --sink-uri="gcs://bucket/prefix?protocol=canal-json"

The following is an example configuration for Azure Blob Storage:

  1. --sink-uri="azure://bucket/prefix?protocol=canal-json"

Replicate Data to Storage Services - 图2

Tip

For more information about the URI parameters of Amazon S3, GCS, and Azure Blob Storage in TiCDC, see URI Formats of External Storage Services.

Configure sink URI for NFS

The following is an example configuration for NFS:

  1. --sink-uri="file:///my-directory/prefix?protocol=canal-json"

Storage path structure

This section describes the storage path structure of data change records, metadata, and DDL events.

Data change records

Data change records are saved to the following path:

  1. {scheme}://{prefix}/{schema}/{table}/{table-version-separator}/{partition-separator}/{date-separator}/CDC{num}.{extension}
  • scheme: specifies the storage type, for example, s3, gcs, azure, or file.
  • prefix: specifies the user-defined parent directory, for example, s3://**bucket/bbb/ccc**.
  • schema: specifies the schema name, for example, s3://bucket/bbb/ccc/**test**.
  • table: specifies the table name, for example, s3://bucket/bbb/ccc/test/**table1**.
  • table-version-separator: specifies the separator that separates the path by the table version, for example, s3://bucket/bbb/ccc/test/table1/**9999**.
  • partition-separator: specifies the separator that separates the path by the table partition, for example, s3://bucket/bbb/ccc/test/table1/9999/**20**.
  • date-separator: classifies the files by the transaction commit date. The default value is day. Value options are:
    • none: no date-separator. For example, all files with test.table1 version being 9999 are saved to s3://bucket/bbb/ccc/test/table1/9999.
    • year: the separator is the year of the transaction commit date, for example, s3://bucket/bbb/ccc/test/table1/9999/**2022**.
    • month: the separator is the year and month of the transaction commit date, for example, s3://bucket/bbb/ccc/test/table1/9999/**2022-01**.
    • day: the separator is the year, month, and day of the transaction commit date, for example, s3://bucket/bbb/ccc/test/table1/9999/**2022-01-02**.
  • num: saves the serial number of the file that records the data change, for example, s3://bucket/bbb/ccc/test/table1/9999/2022-01-02/CDC**000005**.csv.
  • extension: specifies the extension of the file. TiDB v6.5.0 supports the CSV and Canal-JSON formats.

Replicate Data to Storage Services - 图3

Note

The table version changes only after a DDL operation is performed on the upstream table, and the new table version is the TSO when the upstream TiDB completes the execution of the DDL. However, the change of the table version does not mean the change of the table schema. For example, adding a comment to a column does not cause the schema file content to change.

Index files

An index file is used to prevent written data from being overwritten by mistake. It is stored in the same path as the data change records.

  1. {scheme}://{prefix}/{schema}/{table}/{table-version-separator}/{partition-separator}/{date-separator}/meta/CDC.index

The index file records the largest file name used in the current directory. For example:

  1. CDC000005.csv

In this example, the files CDC000001.csv through CDC000004.csv in this directory are occupied. When a table scheduling or node restart occurs in the TiCDC cluster, the new node reads the index file and determines if CDC000005.csv is occupied. If it is not occupied, the new node writes the file starting from CDC000005.csv. If it is occupied, it starts writing from CDC000006.csv, which prevents overwriting data written by other nodes.

Metadata

Metadata is saved in the following path:

  1. {protocol}://{prefix}/metadata

Metadata is a JSON-formatted file, for example:

  1. {
  2. "checkpoint-ts":433305438660591626
  3. }
  • checkpoint-ts: Transactions with commit-ts smaller than checkpoint-ts are written to the target storage in the downstream.

DDL events

DDL events at the table level

When a DDL event of an upstream table causes a table version change, TiCDC automatically does the following:

  • Switches to a new path to write data change records. For example, when the version of test.table1 changes to 441349361156227074, TiCDC changes to the s3://bucket/bbb/ccc/test/table1/441349361156227074/2022-01-02/ path to write data change records.

  • Generates a schema file in the following path to store the table schema information:

    1. {scheme}://{prefix}/{schema}/{table}/meta/schema_{table-version}_{hash}.json

Taking the schema_441349361156227074_3131721815.json schema file as an example, the table schema information in this file is as follows:

  1. {
  2. "Table":"table1",
  3. "Schema":"test",
  4. "Version":1,
  5. "TableVersion":441349361156227074,
  6. "Query":"ALTER TABLE test.table1 ADD OfficeLocation blob(20)",
  7. "Type":5,
  8. "TableColumns":[
  9. {
  10. "ColumnName":"Id",
  11. "ColumnType":"INT",
  12. "ColumnNullable":"false",
  13. "ColumnIsPk":"true"
  14. },
  15. {
  16. "ColumnName":"LastName",
  17. "ColumnType":"CHAR",
  18. "ColumnLength":"20"
  19. },
  20. {
  21. "ColumnName":"FirstName",
  22. "ColumnType":"VARCHAR",
  23. "ColumnLength":"30"
  24. },
  25. {
  26. "ColumnName":"HireDate",
  27. "ColumnType":"DATETIME"
  28. },
  29. {
  30. "ColumnName":"OfficeLocation",
  31. "ColumnType":"BLOB",
  32. "ColumnLength":"20"
  33. }
  34. ],
  35. "TableColumnsTotal":"5"
  36. }
  • Table: Table name.
  • Schema: Schema name.
  • Version: Protocol version of the storage sink.
  • TableVersion: Table version.
  • Query: DDL statement.
  • Type: DDL type.
  • TableColumns: An array of one or more maps, each of which describes a column in the source table.
    • ColumnName: Column name.
    • ColumnType: Column type. For details, see Data type.
    • ColumnLength: Column length. For details, see Data type.
    • ColumnPrecision: Column precision. For details, see Data type.
    • ColumnScale: The number of digits following the decimal point (the scale). For details, see Data type.
    • ColumnNullable: The column can be NULL when the value of this option is true.
    • ColumnIsPk: The column is part of the primary key when the value of this option is true.
  • TableColumnsTotal: The size of the TableColumns array.

DDL events at the database level

When a database-level DDL event is performed in the upstream database, TiCDC automatically generates a schema file in the following path to store the database schema information:

  1. {scheme}://{prefix}/{schema}/meta/schema_{table-version}_{hash}.json

Taking the schema_441349361156227000_3131721815.json schema file as an example, the database schema information in this file is as follows:

  1. {
  2. "Table": "",
  3. "Schema": "schema1",
  4. "Version": 1,
  5. "TableVersion": 441349361156227000,
  6. "Query": "CREATE DATABASE `schema1`",
  7. "Type": 1,
  8. "TableColumns": null,
  9. "TableColumnsTotal": 0
  10. }

Data type

This section describes the data types used in the schema_{table-version}_{hash}.json file (hereafter referred to as “schema file” in the following sections). The data types are defined as T(M[, D]). For details, see Data Types.

Integer types

Integer types in TiDB are defined as IT[(M)] [UNSIGNED], where

  • IT is the integer type, which can be TINYINT, SMALLINT, MEDIUMINT, INT, BIGINT, or BIT.
  • M is the display width of the type.

Integer types are defined as follows in the schema file:

  1. {
  2. "ColumnName":"COL1",
  3. "ColumnType":"{IT} [UNSIGNED]",
  4. "ColumnPrecision":"{M}"
  5. }

Decimal types

Decimal types in TiDB are defined as DT[(M,D)][UNSIGNED], where

  • DT is the floating-point type, which can be FLOAT, DOUBLE, DECIMAL, or NUMERIC.
  • M is the precision of the data type, or the total number of digits.
  • D is the number of digits following the decimal point.

Decimal types are defined as follows in the schema file:

  1. {
  2. "ColumnName":"COL1",
  3. "ColumnType":"{DT} [UNSIGNED]",
  4. "ColumnPrecision":"{M}",
  5. "ColumnScale":"{D}"
  6. }

Date and time types

Date types in TiDB are defined as DT, where

  • DT is the date type, which can be DATE or YEAR.

The date types are defined as follows in the schema file:

  1. {
  2. "ColumnName":"COL1",
  3. "ColumnType":"{DT}"
  4. }

The time types in TiDB are defined as TT[(M)], where

  • TT is the time type, which can be TIME, DATETIME, or TIMESTAMP.
  • M is the precision of seconds in the range from 0 to 6.

The time types are defined as follows in the schema file:

  1. {
  2. "ColumnName":"COL1",
  3. "ColumnType":"{TT}",
  4. "ColumnScale":"{M}"
  5. }

String types

The string types in TiDB are defined as ST[(M)], where

  • ST is the string type, which can be CHAR, VARCHAR, TEXT, BINARY, BLOB, or JSON.
  • M is the maximum length of the string.

The string types are defined as follows in the schema file:

  1. {
  2. "ColumnName":"COL1",
  3. "ColumnType":"{ST}",
  4. "ColumnLength":"{M}"
  5. }

Enum and Set types

The Enum and Set types are defined as follows in the schema file:

  1. {
  2. "ColumnName":"COL1",
  3. "ColumnType":"{ENUM/SET}",
  4. }