Protobuf Format

Format: Serialization Schema Format: Deserialization Schema

The Protocol Buffers Protobuf format allows you to read and write Protobuf data, based on Protobuf generated classes.

Dependencies

In order to use the Protobuf format the following dependencies are required for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.

Maven dependencySQL Client
  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-protobuf</artifactId>
  4. <version>1.18.1</version>
  5. </dependency>
Copied to clipboard!
Download

How to create a table with Protobuf format

Here is an example to create a table using the Kafka connector and Protobuf format.

Below is the proto definition file.

  1. syntax = "proto2";
  2. package com.example;
  3. option java_package = "com.example";
  4. option java_multiple_files = true;
  5. message SimpleTest {
  6. optional int64 uid = 1;
  7. optional string name = 2;
  8. optional int32 category_type = 3;
  9. optional bytes content = 4;
  10. optional double price = 5;
  11. map<int64, InnerMessageTest> value_map = 6;
  12. repeated InnerMessageTest value_arr = 7;
  13. optional Corpus corpus_int = 8;
  14. optional Corpus corpus_str = 9;
  15. message InnerMessageTest{
  16. optional int64 v1 =1;
  17. optional int32 v2 =2;
  18. }
  19. enum Corpus {
  20. UNIVERSAL = 0;
  21. WEB = 1;
  22. IMAGES = 2;
  23. LOCAL = 3;
  24. NEWS = 4;
  25. PRODUCTS = 5;
  26. VIDEO = 7;
  27. }
  28. }
  1. Use protoc command to compile the .proto file to java classes
  2. Then compile and package the classes (there is no need to package proto-java into the jar)
  3. Finally you should provide the jar in your classpath, e.g. pass it using -j in sql-client
  1. CREATE TABLE simple_test (
  2. uid BIGINT,
  3. name STRING,
  4. category_type INT,
  5. content BINARY,
  6. price DOUBLE,
  7. value_map map<BIGINT, row<v1 BIGINT, v2 INT>>,
  8. value_arr array<row<v1 BIGINT, v2 INT>>,
  9. corpus_int INT,
  10. corpus_str STRING
  11. ) WITH (
  12. 'connector' = 'kafka',
  13. 'topic' = 'user_behavior',
  14. 'properties.bootstrap.servers' = 'localhost:9092',
  15. 'properties.group.id' = 'testGroup',
  16. 'format' = 'protobuf',
  17. 'protobuf.message-class-name' = 'com.example.SimpleTest',
  18. 'protobuf.ignore-parse-errors' = 'true'
  19. )

Format Options

OptionRequiredForwardedDefaultTypeDescription
format
requiredno(none)StringSpecify what format to use, here should be ‘protobuf’.
protobuf.message-class-name
requiredno(none)StringThe full name of a Protobuf generated class. The name must match the message name in the proto definition file. $ is supported for inner class names, like ‘com.exmample.OuterClass$MessageClass’
protobuf.ignore-parse-errors
optionalnofalseBooleanOptional flag to skip rows with parse errors instead of failing.
protobuf.read-default-values
optionalyesfalseBooleanThis option only works if the generated class’s version is proto2. If this value is set to true, the format will read empty values as the default values defined in the proto file. If the value is set to false, the format will generate null values if the data element does not exist in the binary protobuf message. If the proto syntax is proto3, this value will forcibly be set to true, because proto3’s standard is to use default values.
protobuf.write-null-string-literal
optionalno“”StringWhen serializing to protobuf data, this is the optional config to specify the string literal in Protobuf’s array/map in case of null values.

Data Type Mapping

The following table lists the type mapping from Flink type to Protobuf type.

Flink SQL typeProtobuf typeDescription
CHAR / VARCHAR / STRINGstring
BOOLEANbool
BINARY / VARBINARYbytes
INTint32
BIGINTint64
FLOATfloat
DOUBLEdouble
ARRAYrepeatedElements cannot be null, the string default value can be specified by write-null-string-literal
MAPmapKeys or values cannot be null, the string default value can be specified by write-null-string-literal
ROWmessage
VARCHAR / CHAR / TINYINT / SMALLINT / INTEGER / BIGINTenumThe enum value of protobuf can be mapped to string or number of flink row accordingly.
ROW<seconds BIGINT, nanos INT>google.protobuf.timestampThe google.protobuf.timestamp type can be mapped to seconds and fractions of seconds at nanosecond resolution in UTC epoch time using the row type as well as the protobuf definition.

Null Values

As protobuf does not permit null values in maps and array, we need to auto-generate default values when converting from Flink Rows to Protobuf.

Protobuf Data TypeDefault Value
int32 / int64 / float / double0
string“”
boolfalse
enumfirst element of enum
binaryByteString.EMPTY
messageMESSAGE.getDefaultInstance()

OneOf field

In the serialization process, there’s no guarantee that the Flink fields of the same one-of group only contain at most one valid value. When serializing, each field is set in the order of Flink schema, so the field in the higher position will override the field in lower position in the same one-of group.

You can refer to Language Guide (proto2) or Language Guide (proto3) for more information about Protobuf types.