DataProxy Plugin Extension

Overview

DataProxy implements an abstract unified MQ (Message Queue) sink model, so that developers and easily extend mq sink types under standard MessageQueueZoneSink. By default, Apache Pulsar, Apache Kafka and InLong TubeMQ are already integrated. This article guides developers to extend new MQ types accordingly.

Concepts and Models

DataProxy is a message flow architecture based on Apache Flume with its Source + Channel + Sink components. Here we focus on the sink layer alone.

  • MessageQueueZoneSink: The standard MQ sink provided by DataProxy, supposedly to support all kinds of MQ types.
  • MessageQueueHandler: The abstract MQ handler interface that deals with connecting, sending data to, and disconnecting the MQ cluster.
  • EventHandler: The interface to convert MQ message header and body when required. For example to convert the data protocol.

When a new MQ cluster type needs to be integrated, developers should at least implement the MessageQueueHandler interface as plugin. Optionally they can also implement the EventHandler interface to transform data as required. The plugin classes can be specified and pulled from manager as configuration information so that new MQ type can be easily extended on the fly.

Demonstration

The concepts introduced above can be represented by the following figure: DataProxy Plugin Extension - 图1

Development Process

In the rest of the article we use the Kafka MQ with ProtoBuffer message format as an example. Here’s what to do:

  • Implement the subclass plugin of MessageQueueHandler, namely KafKaHandler and its init / start /stop / send methods.
  • Implenent the EventHandler interface as ProtoBufferEventHandler and its parseHeader / parseBody method

Plugin Implementation

MessageQueueHandler

  1. private class KafkaHandler implements MessageQueueHandler {
  2. private EventHandler handler;
  3. @Override
  4. public void init(CacheClusterConfig config, MessageQueueZoneSinkContext sinkContext) {
  5. // initialize configuration and event handler
  6. }
  7. @Override
  8. public void start() {
  9. // create Kafka Producer
  10. }
  11. @Override
  12. public void stop() {
  13. // close Kafka Producer
  14. }
  15. @Override
  16. public boolean send(BatchPackProfile event) {
  17. // process and send data
  18. }
  19. }

EventHandler

  1. public class ProtoBufferEventHandler implements EventHandler {
  2. @Override
  3. public Map<String, String> parseHeader(IdTopicConfig idConfig, BatchPackProfile profile, String nodeId,
  4. INLONG_COMPRESSED_TYPE compressType) {
  5. // retrieve, process and convert event header
  6. }
  7. @Override
  8. public byte[] parseBody(IdTopicConfig idConfig, BatchPackProfile profile, INLONG_COMPRESSED_TYPE compressType)
  9. throws IOException {
  10. // retrieve and repack event to ProtoBuffer message
  11. }
  12. }

(See the full implementation org.apache.inlong.dataproxy.sink.mq.kafka.KafkaHandler from inlong-dataproxy module)

Plugin Configuration

dataproxy.conf

The sink configuration please refer to Sink Configuration Exampnle