Guide for Developing a Storage Sink Consumer

This document describes how to design and implement a TiDB data change consumer.

Guide for Developing a Storage Sink Consumer - 图1

Note

The storage sink cannot handle the DROP DATABASE DDL. Therefore, avoid executing this DDL. If you do need to execute this DDL, execute it manually in the downstream MySQL.

TiCDC does not provide any standard way for implementing a consumer. This document provides a consumer example program written in Golang. This program can read data from the storage service and write the data to a MySQL-compatible database. You can refer to the data format and instructions provided in this example to implement a consumer on your own.

Consumer program written in Golang

Design a consumer

The following diagram shows the overall consumption process of the consumer:

TiCDC storage consumer overview

The components of the consumer and their features are described as follows:

  1. type StorageReader struct {
  2. }
  3. // Read the files from storage.
  4. // Add new files and delete files that do not exist in storage.
  5. func (c *StorageReader) ReadFiles() {}
  6. // Query newly added files and the latest checkpoint from storage. One file can only be returned once.
  7. func (c *StorageReader) ExposeNewFiles() (int64, []string) {}
  8. // ConsumerManager is responsible for assigning tasks to TableConsumer.
  9. // Different consumers can consume data concurrently,
  10. // but data of one table must be processed by the same TableConsumer.
  11. type ConsumerManager struct {
  12. // StorageCheckpoint is recorded in the metadata file, and it can be fetched by calling `StorageReader.ExposeNewFiles()`.
  13. // This checkpoint indicates that the data whose transaction commit time is less than this checkpoint has been stored in storage.
  14. StorageCheckpoint int64
  15. // This checkpoint indicates where the consumer has consumed.
  16. // ConsumerManager periodically collects TableConsumer.Checkpoint,
  17. // then Checkpoint is updated to the minimum value of all TableConsumer.Checkpoint.
  18. Checkpoint int64
  19. tableFiles[schema][table]*TableConsumer
  20. }
  21. // Query newly added files from StorageReader.
  22. // For a newly created table, create a TableConsumer for it.
  23. // If any, send new files to the corresponding TableConsumer.
  24. func (c *ConsumerManager) Dispatch() {}
  25. type TableConsumer struct {
  26. // This checkpoint indicates where this TableConsumer has consumed.
  27. // Its initial value is ConsumerManager.Checkpoint.
  28. // TableConsumer.Checkpoint is equal to TableVersionConsumer.Checkpoint.
  29. Checkpoint int64
  30. schema,table string
  31. // Must be consumed sequentially according to the order of table versions.
  32. verConsumers map[version int64]*TableVersionConsumer
  33. currentVer, previousVer int64
  34. }
  35. // Send newly added files to the corresponding TableVersionConsumer.
  36. // For any DDL, assign a TableVersionConsumer for the new table version.
  37. func (tc *TableConsumer) Dispatch() {}
  38. // If DDL query is empty or its tableVersion is less than TableConsumer.Checkpoint,
  39. // - ignore this DDL, and consume the data under the table version.
  40. // Otherwise,
  41. // - execute the DDL first, and then consume the data under the table version.
  42. // - For tables that are dropped, auto-recycling is performed after the drop table DDL is executed.
  43. func (tc *TableConsumer) ExecuteDDL() {}
  44. type TableVersionConsumer struct {
  45. // This checkpoint indicates where the TableVersionConsumer has consumed.
  46. // Its initial value is TableConsumer.Checkpoint.
  47. Checkpoint int64
  48. schema,table,version string
  49. // For the same table version, data in different partitions can be consumed concurrently.
  50. # partitionNum int64
  51. // Must be consumed sequentially according to the data file number.
  52. fileSet map[filename string]*TableVersionConsumer
  53. currentVersion
  54. }
  55. // If data commit ts is less than TableConsumer.Checkpoint
  56. // or bigger than ConsumerManager.StorageCheckpoint,
  57. // - ignore this data.
  58. // Otherwise,
  59. // - process this data and write it to MySQL.
  60. func (tc *TableVersionConsumer) ExecuteDML() {}

Process DDL events

The consumer traverses the directory for the first time. The following is an example:

  1. ├── metadata
  2. └── test
  3. ├── tbl_1
  4. └── 437752935075545091
  5. ├── CDC000001.json
  6. └── schema.json

The consumer parses the table schema of the schema.json file and obtains the DDL Query statements:

  • If no Query statement is found or TableVersion is less than the consumer checkpoint, the consumer skips this statement.
  • If Query statements exist or TableVersion is equal to or greater than the consumer checkpoint, the consumer executes the DDL statements in the downstream MySQL.

Then the consumer starts replicating the CDC000001.json file.

In the following example, the DDL Query statement in the test/tbl_1/437752935075545091/schema.json file is not empty:

  1. {
  2. "Table":"test",
  3. "Schema":"tbl_1",
  4. "Version": 1,
  5. "TableVersion":437752935075545091,
  6. "Query": "create table tbl_1 (Id int primary key, LastName char(20), FirstName varchar(30), HireDate datetime, OfficeLocation Blob(20))",
  7. "TableColumns":[
  8. {
  9. "ColumnName":"Id",
  10. "ColumnType":"INT",
  11. "ColumnNullable":"false",
  12. "ColumnIsPk":"true"
  13. },
  14. {
  15. "ColumnName":"LastName",
  16. "ColumnType":"CHAR",
  17. "ColumnLength":"20"
  18. },
  19. {
  20. "ColumnName":"FirstName",
  21. "ColumnType":"VARCHAR",
  22. "ColumnLength":"30"
  23. },
  24. {
  25. "ColumnName":"HireDate",
  26. "ColumnType":"DATETIME"
  27. },
  28. {
  29. "ColumnName":"OfficeLocation",
  30. "ColumnType":"BLOB",
  31. "ColumnLength":"20"
  32. }
  33. ],
  34. "TableColumnsTotal":"5"
  35. }

When the consumer traverses the directory again, it finds a new version directory of the table. Note that the consumer can consume data in the new directory only after all files in the test/tbl_1/437752935075545091 directory have been consumed.

  1. ├── metadata
  2. └── test
  3. ├── tbl_1
  4. ├── 437752935075545091
  5. ├── CDC000001.json
  6. └── schema.json
  7. └── 437752935075546092
  8. └── CDC000001.json
  9. └── schema.json

The consumption logic is consistent. Specifically, the consumer parses the table schema of the schema.json file and obtains and processes DDL Query statements accordingly. Then the consumer starts replicating the CDC000001.json file.

Process DML events

After DDL events are properly processed, you can process DML events in the {schema}/{table}/{table-version-separator}/ directory based on the specific file format (CSV or Canal-JSON) and file number.

TiCDC ensures that data is replicated at least once. Therefore, there might be duplicate data. You need to compare the commit ts of the change data with the consumer checkpoint. If the commit ts is less than the consumer checkpoint, you need to perform deduplication.