Parquet format

Flink 支持读取 Parquet 文件并生成 Flink RowDataAvro 记录。 要使用 Parquet format,你需要将 flink-parquet 依赖添加到项目中:

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-parquet</artifactId>
  4. <version>1.19.0</version>
  5. </dependency>

要使用 Avro 格式,你需要将 parquet-avro 依赖添加到项目中:

  1. <dependency>
  2. <groupId>org.apache.parquet</groupId>
  3. <artifactId>parquet-avro</artifactId>
  4. <version>1.12.2</version>
  5. <optional>true</optional>
  6. <exclusions>
  7. <exclusion>
  8. <groupId>org.apache.hadoop</groupId>
  9. <artifactId>hadoop-client</artifactId>
  10. </exclusion>
  11. <exclusion>
  12. <groupId>it.unimi.dsi</groupId>
  13. <artifactId>fastutil</artifactId>
  14. </exclusion>
  15. </exclusions>
  16. </dependency>

为了在 PyFlink 作业中使用 Parquet format ,需要添加下列依赖:

PyFlink JAR
Download

在 PyFlink 中如何添加 JAR 包依赖请参考 Python 依赖管理

此格式与新的 Source 兼容,可以同时在批和流模式下使用。 因此,你可使用此格式处理以下两类数据:

  • 有界数据: 列出所有文件并全部读取。
  • 无界数据:监控目录中出现的新文件

当你开启一个 File Source,会被默认为有界读取。 如果你想在连续读取模式下使用 File Source,你必须额外调用 AbstractFileSource.AbstractFileSourceBuilder.monitorContinuously(Duration)

Vectorized reader

Java

  1. // Parquet rows are decoded in batches
  2. FileSource.forBulkFileFormat(BulkFormat,Path...)
  3. // Monitor the Paths to read data as unbounded data
  4. FileSource.forBulkFileFormat(BulkFormat,Path...)
  5. .monitorContinuously(Duration.ofMillis(5L))
  6. .build();

Python

  1. # Parquet rows are decoded in batches
  2. FileSource.for_bulk_file_format(BulkFormat, Path...)
  3. # Monitor the Paths to read data as unbounded data
  4. FileSource.for_bulk_file_format(BulkFormat, Path...) \
  5. .monitor_continuously(Duration.of_millis(5)) \
  6. .build()

Avro Parquet reader

Java

  1. // Parquet rows are decoded in batches
  2. FileSource.forRecordStreamFormat(StreamFormat,Path...)
  3. // Monitor the Paths to read data as unbounded data
  4. FileSource.forRecordStreamFormat(StreamFormat,Path...)
  5. .monitorContinuously(Duration.ofMillis(5L))
  6. .build();

Python

  1. # Parquet rows are decoded in batches
  2. FileSource.for_record_stream_format(StreamFormat, Path...)
  3. # Monitor the Paths to read data as unbounded data
  4. FileSource.for_record_stream_format(StreamFormat, Path...) \
  5. .monitor_continuously(Duration.of_millis(5)) \
  6. .build()

下面的案例都是基于有界数据的。 如果你想在连续读取模式下使用 File Source,你必须额外调用 AbstractFileSource.AbstractFileSourceBuilder.monitorContinuously(Duration)

在此示例中,你将创建由 Parquet 格式的记录构成的 Flink RowDatas DataStream。我们把 schema 信息映射为只读字段(“f7”、“f4” 和 “f99”)。 每个批次读取 500 条记录。其中,第一个布尔类型的参数用来指定是否需要将时间戳列处理为 UTC。 第二个布尔类型参数用来指定在进行 Parquet 字段映射时,是否要区分大小写。 这里不需要水印策略,因为记录中不包含事件时间戳。

Java

  1. final LogicalType[] fieldTypes =
  2. new LogicalType[] {
  3. new DoubleType(), new IntType(), new VarCharType()};
  4. final RowType rowType = RowType.of(fieldTypes, new String[] {"f7", "f4", "f99"});
  5. final ParquetColumnarRowInputFormat<FileSourceSplit> format =
  6. new ParquetColumnarRowInputFormat<>(
  7. new Configuration(),
  8. rowType,
  9. InternalTypeInfo.of(rowType),
  10. 500,
  11. false,
  12. true);
  13. final FileSource<RowData> source =
  14. FileSource.forBulkFileFormat(format, /* Flink Path */)
  15. .build();
  16. final DataStream<RowData> stream =
  17. env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");

Python

  1. row_type = DataTypes.ROW([
  2. DataTypes.FIELD('f7', DataTypes.DOUBLE()),
  3. DataTypes.FIELD('f4', DataTypes.INT()),
  4. DataTypes.FIELD('f99', DataTypes.VARCHAR()),
  5. ])
  6. source = FileSource.for_bulk_file_format(ParquetColumnarRowInputFormat(
  7. row_type=row_type,
  8. hadoop_config=Configuration(),
  9. batch_size=500,
  10. is_utc_timestamp=False,
  11. is_case_sensitive=True,
  12. ), PARQUET_FILE_PATH).build()
  13. ds = env.from_source(source, WatermarkStrategy.no_watermarks(), "file-source")

Avro Records

Flink 支持三种方式来读取 Parquet 文件并创建 Avro records (PyFlink 只支持 Generic record):

Generic record

使用 JSON 定义 Avro schemas。你可以从 Avro specification 获取更多关于 Avro schemas 和类型的信息。 此示例使用了一个在 official Avro tutorial 中描述的示例相似的 Avro schema:

  1. {"namespace": "example.avro",
  2. "type": "record",
  3. "name": "User",
  4. "fields": [
  5. {"name": "name", "type": "string"},
  6. {"name": "favoriteNumber", "type": ["int", "null"]},
  7. {"name": "favoriteColor", "type": ["string", "null"]}
  8. ]
  9. }

这个 schema 定义了一个具有三个属性的的 user 记录:name,favoriteNumber 和 favoriteColor。你可以 在 record specification 找到更多关于如何定义 Avro schema 的详细信息。

在此示例中,你将创建包含由 Avro Generic records 格式构成的 Parquet records 的 DataStream。 Flink 会基于 JSON 字符串解析 Avro schema。也有很多其他的方式解析 schema,例如基于 java.io.File 或 java.io.InputStream。 请参考 Avro Schema 以获取更多详细信息。 然后,你可以通过 AvroParquetReaders 为 Avro Generic 记录创建 AvroParquetRecordFormat

Java

  1. // 解析 avro schema
  2. final Schema schema =
  3. new Schema.Parser()
  4. .parse(
  5. "{\"type\": \"record\", "
  6. + "\"name\": \"User\", "
  7. + "\"fields\": [\n"
  8. + " {\"name\": \"name\", \"type\": \"string\" },\n"
  9. + " {\"name\": \"favoriteNumber\", \"type\": [\"int\", \"null\"] },\n"
  10. + " {\"name\": \"favoriteColor\", \"type\": [\"string\", \"null\"] }\n"
  11. + " ]\n"
  12. + " }");
  13. final FileSource<GenericRecord> source =
  14. FileSource.forRecordStreamFormat(
  15. AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
  16. .build();
  17. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  18. env.enableCheckpointing(10L);
  19. final DataStream<GenericRecord> stream =
  20. env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");

Python

  1. # 解析 avro schema
  2. schema = AvroSchema.parse_string("""
  3. {
  4. "type": "record",
  5. "name": "User",
  6. "fields": [
  7. {"name": "name", "type": "string"},
  8. {"name": "favoriteNumber", "type": ["int", "null"]},
  9. {"name": "favoriteColor", "type": ["string", "null"]}
  10. ]
  11. }
  12. """)
  13. source = FileSource.for_record_stream_format(
  14. AvroParquetReaders.for_generic_record(schema), # file paths
  15. ).build()
  16. env = StreamExecutionEnvironment.get_execution_environment()
  17. env.enable_checkpointing(10)
  18. stream = env.from_source(source, WatermarkStrategy.no_watermarks(), "file-source")

Specific record

基于之前定义的 schema,你可以通过利用 Avro 代码生成来生成类。 一旦生成了类,就不需要在程序中直接使用 schema。 你可以使用 avro-tools.jar 手动生成代码,也可以直接使用 Avro Maven 插件对配置的源目录中的任何 .avsc 文件执行代码生成。 请参考 Avro Getting Started 获取更多信息。

此示例使用了样例 schema testdata.avsc

  1. [
  2. {"namespace": "org.apache.flink.formats.parquet.generated",
  3. "type": "record",
  4. "name": "Address",
  5. "fields": [
  6. {"name": "num", "type": "int"},
  7. {"name": "street", "type": "string"},
  8. {"name": "city", "type": "string"},
  9. {"name": "state", "type": "string"},
  10. {"name": "zip", "type": "string"}
  11. ]
  12. }
  13. ]

你可以使用 Avro Maven plugin 生成 Address Java 类。

  1. @org.apache.avro.specific.AvroGenerated
  2. public class Address extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
  3. // 生成的代码...
  4. }

你可以通过 AvroParquetReaders 为 Avro Specific 记录创建 AvroParquetRecordFormat, 然后创建一个包含由 Avro Specific records 格式构成的 Parquet records 的 DateStream。

  1. final FileSource<GenericRecord> source =
  2. FileSource.forRecordStreamFormat(
  3. AvroParquetReaders.forSpecificRecord(Address.class), /* Flink Path */)
  4. .build();
  5. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  6. env.enableCheckpointing(10L);
  7. final DataStream<GenericRecord> stream =
  8. env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");

Reflect record

除了需要预定义 Avro Generic 和 Specific 记录, Flink 还支持基于现有 Java POJO 类从 Parquet 文件创建 DateStream。 在这种场景中,Avro 会使用 Java 反射为这些 POJO 类生成 schema 和协议。 请参考 Avro reflect 文档获取更多关于 Java 类型到 Avro schemas 映射的详细信息。

本例使用了一个简单的 Java POJO 类 Datum

  1. public class Datum implements Serializable {
  2. public String a;
  3. public int b;
  4. public Datum() {}
  5. public Datum(String a, int b) {
  6. this.a = a;
  7. this.b = b;
  8. }
  9. @Override
  10. public boolean equals(Object o) {
  11. if (this == o) {
  12. return true;
  13. }
  14. if (o == null || getClass() != o.getClass()) {
  15. return false;
  16. }
  17. Datum datum = (Datum) o;
  18. return b == datum.b && (a != null ? a.equals(datum.a) : datum.a == null);
  19. }
  20. @Override
  21. public int hashCode() {
  22. int result = a != null ? a.hashCode() : 0;
  23. result = 31 * result + b;
  24. return result;
  25. }
  26. }

你可以通过 AvroParquetReaders 为 Avro Reflect 记录创建一个 AvroParquetRecordFormat, 然后创建一个包含由 Avro Reflect records 格式构成的 Parquet records 的 DateStream。

  1. final FileSource<GenericRecord> source =
  2. FileSource.forRecordStreamFormat(
  3. AvroParquetReaders.forReflectRecord(Datum.class), /* Flink Path */)
  4. .build();
  5. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  6. env.enableCheckpointing(10L);
  7. final DataStream<GenericRecord> stream =
  8. env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");

使用 Parquet files 必备条件

为了支持读取 Avro Reflect 数据,Parquet 文件必须包含特定的 meta 信息。为了生成 Parquet 数据,Avro schema 信息中必须包含 namespace, 以便让程序在反射执行过程中能确定唯一的 Java Class 对象。

下面的案例展示了上文中的 User 对象的 schema 信息。但是当前案例包含了一个指定文件目录的 namespace(当前案例下的包路径),反射过程中可以找到对应的 User 类。

  1. // avro schema with namespace
  2. final String schema =
  3. "{\"type\": \"record\", "
  4. + "\"name\": \"User\", "
  5. + "\"namespace\": \"org.apache.flink.formats.parquet.avro\", "
  6. + "\"fields\": [\n"
  7. + " {\"name\": \"name\", \"type\": \"string\" },\n"
  8. + " {\"name\": \"favoriteNumber\", \"type\": [\"int\", \"null\"] },\n"
  9. + " {\"name\": \"favoriteColor\", \"type\": [\"string\", \"null\"] }\n"
  10. + " ]\n"
  11. + " }";

由上述 scheme 信息创建的 Parquet 文件包含以下 meta 信息:

  1. creator: parquet-mr version 1.12.2 (build 77e30c8093386ec52c3cfa6c34b7ef3321322c94)
  2. extra: parquet.avro.schema =
  3. {"type":"record","name":"User","namespace":"org.apache.flink.formats.parquet.avro","fields":[{"name":"name","type":"string"},{"name":"favoriteNumber","type":["int","null"]},{"name":"favoriteColor","type":["string","null"]}]}
  4. extra: writer.model.name = avro
  5. file schema: org.apache.flink.formats.parquet.avro.User
  6. --------------------------------------------------------------------------------
  7. name: REQUIRED BINARY L:STRING R:0 D:0
  8. favoriteNumber: OPTIONAL INT32 R:0 D:1
  9. favoriteColor: OPTIONAL BINARY L:STRING R:0 D:1
  10. row group 1: RC:3 TS:143 OFFSET:4
  11. --------------------------------------------------------------------------------
  12. name: BINARY UNCOMPRESSED DO:0 FPO:4 SZ:47/47/1.00 VC:3 ENC:PLAIN,BIT_PACKED ST:[min: Jack, max: Tom, num_nulls: 0]
  13. favoriteNumber: INT32 UNCOMPRESSED DO:0 FPO:51 SZ:41/41/1.00 VC:3 ENC:RLE,PLAIN,BIT_PACKED ST:[min: 1, max: 3, num_nulls: 0]
  14. favoriteColor: BINARY UNCOMPRESSED DO:0 FPO:92 SZ:55/55/1.00 VC:3 ENC:RLE,PLAIN,BIT_PACKED ST:[min: green, max: yellow, num_nulls: 0]

使用包 org.apache.flink.formats.parquet.avro 路径下已定义的 User 类:

  1. public class User {
  2. private String name;
  3. private Integer favoriteNumber;
  4. private String favoriteColor;
  5. public User() {}
  6. public User(String name, Integer favoriteNumber, String favoriteColor) {
  7. this.name = name;
  8. this.favoriteNumber = favoriteNumber;
  9. this.favoriteColor = favoriteColor;
  10. }
  11. public String getName() {
  12. return name;
  13. }
  14. public Integer getFavoriteNumber() {
  15. return favoriteNumber;
  16. }
  17. public String getFavoriteColor() {
  18. return favoriteColor;
  19. }
  20. }

你可以通过下面的程序读取类型为 User 的 Avro Reflect records:

  1. final FileSource<GenericRecord> source =
  2. FileSource.forRecordStreamFormat(
  3. AvroParquetReaders.forReflectRecord(User.class), /* Flink Path */)
  4. .build();
  5. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  6. env.enableCheckpointing(10L);
  7. final DataStream<GenericRecord> stream =
  8. env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");