RabbitMQ连接器

RabbitMQ连接器的许可证

Flink的RabbitMQ连接器定义了对“RabbitMQ AMQP Java客户端”的Maven依赖,在Mozilla Public License 1.1(“MPL”),GNU通用公共许可证版本2(“GPL”)和Apache许可证版本2下获得三重许可。 (“ASL”)。

Flink本身既不重用“RabbitMQ AMQP Java Client”的源代码,也不从“RabbitMQ AMQP Java Client”打包二进制文件。

基于Flink的RabbitMQ连接器创建和发布衍生作品的用户(从而重新分发“RabbitMQ AMQP Java客户端”)必须意识到这可能受到Mozilla Public License 1.1(“MPL”),GNU中声明的条件的约束。通用公共许可证版本2(“GPL”)和Apache许可证版本2(“ASL”)。

RabbitMQ连接器

此连接器提供对RabbitMQ数据流的访问要使用此连接器,请将以下依赖项添加到项目中:

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-rabbitmq_2.11</artifactId>
  4. <version>1.7-SNAPSHOT</version>
  5. </dependency>

请注意,流连接器当前不是二进制分发的一部分。请参阅此处的链接以进行群集执行

安装RabbitMQ

按照RabbitMQ下载页面的说明进行 算子操作安装后,服务器自动启动,并且可以启动连接到RabbitMQ的应用程序。

RabbitMQ来源

此连接器提供了一个RMQSource使用RabbitMQ队列消息类。此源提供三种不同级别的保证,具体取决于它如何使用Flink配置:

  • 完全一次:为了实现RabbitMQ源的一次性保证,需要以下内容 -
    • 启用检查点启用检查点后,只有在检查点完成时才会确认消息(因此,从RabbitMQ队列中删除)。
    • 使用相关ID:Correlation ID是RabbitMQ应用程序函数。在将消息注入RabbitMQ时,必须在消息属性中设置它。源使用相关标识对从检查点还原时已重新处理的任何消息进行重复数据删除。
    • 非并行源:源必须是非并行的(并行度设置为1)才能实现精确一次。这种限制主要是由于RabbitMQ将消息从单个队列分派给多个消费者的方法。
  • 至少一次:当启用了检查点,但未使用相关性ID或源是并行时,源仅提供至少一次保证。

  • 不保证:如果未启用检查点,则源不具有任何强有力的传递保证。在此设置下,一旦源接收并处理消息,消息将自动被确认,而不是与Flink的检查点协作。下面是设置一次一次RabbitMQ源的代码示例。内联注释说明可以忽略配置的哪些部分以获得更宽松的保证。

  • Java

  • Scala
  1. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. // checkpointing is required for exactly-once or at-least-once guarantees
  3. env.enableCheckpointing(...);
  4. final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
  5. .setHost("localhost")
  6. .setPort(5000)
  7. ...
  8. .build();
  9. final DataStream<String> stream = env
  10. .addSource(new RMQSource<String>(
  11. connectionConfig, // config for the RabbitMQ connection
  12. "queueName", // name of the RabbitMQ queue to consume
  13. true, // use correlation ids; can be false if only at-least-once is required
  14. new SimpleStringSchema())) // deserialization schema to turn messages into Java objects
  15. .setParallelism(1); // non-parallel source is only required for exactly-once
  1. val env = StreamExecutionEnvironment.getExecutionEnvironment
  2. // checkpointing is required for exactly-once or at-least-once guarantees
  3. env.enableCheckpointing(...)
  4. val connectionConfig = new RMQConnectionConfig.Builder()
  5. .setHost("localhost")
  6. .setPort(5000)
  7. ...
  8. .build
  9. val stream = env
  10. .addSource(new RMQSource[String](
  11. connectionConfig, // config for the RabbitMQ connection
  12. "queueName", // name of the RabbitMQ queue to consume
  13. true, // use correlation ids; can be false if only at-least-once is required
  14. new SimpleStringSchema)) // deserialization schema to turn messages into Java objects
  15. .setParallelism(1) // non-parallel source is only required for exactly-once

RabbitMQ Sink

此连接器提供了一个RMQSink用于将消息发送到RabbitMQ队列的类。下面是设置RabbitMQ接收器的代码示例。

  1. final DataStream<String> stream = ...
  2. final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
  3. .setHost("localhost")
  4. .setPort(5000)
  5. ...
  6. .build();
  7. stream.addSink(new RMQSink<String>(
  8. connectionConfig, // config for the RabbitMQ connection
  9. "queueName", // name of the RabbitMQ queue to send messages to
  10. new SimpleStringSchema())); // serialization schema to turn Java objects to messages
  1. val stream: DataStream[String] = ...
  2. val connectionConfig = new RMQConnectionConfig.Builder()
  3. .setHost("localhost")
  4. .setPort(5000)
  5. ...
  6. .build
  7. stream.addSink(new RMQSink[String](
  8. connectionConfig, // config for the RabbitMQ connection
  9. "queueName", // name of the RabbitMQ queue to send messages to
  10. new SimpleStringSchema)) // serialization schema to turn Java objects to messages

有关RabbitMQ的更多信息,请点击此处