Support for the JDBC driver means that data can easily be exported from a Kafka topic and ingested directly to QuestDB by means of Kafka Connect.

This article assumes that users have successfully set up an installation of Kafka and are ready to start exporting messages to QuestDB.

Prerequisites

You will need the following:

  • Kafka
  • A running QuestDB instance

Configure Kafka

The following binaries must be available to Kafka:

  • Kafka Connect JDBC binary
  • PostgreSQL JDBC driver

To download these files, visit the Kafka Connect JDBC page which provides CLI installation and a direct download of the required .jar files. Select the ZIP file for download, unzip the contents of the archive and copy the required .jar files to the Kafka libs directory:

  1. unzip confluentinc-kafka-connect-jdbc-10.0.1.zip
  2. cd confluentinc-kafka-connect-jdbc-10.0.1
  3. cp kafka-connect-jdbc-10.0.1.jar /path/to/kafka_2.13-2.6.0/libs
  4. cp postgresql-42.2.10.jar /path/to/kafka_2.13-2.6.0/libs

A configuration file /path/to/kafka/config/connect-jdbc.properties must be created for Kafka Connect in standalone mode. The Postgres connection URL must match the destination QuestDB instance and a topic can be specified under the topics={mytopic} key. This example guide uses a topic example-topic and the Postgres server is running on the default port 8812.

Create a file config/connect-jdbc.properties with the following contents:

  1. name=local-jdbc-sink
  2. connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
  3. connection.url=jdbc:postgresql://127.0.0.1:8812/qdb?useSSL=false
  4. connection.user=admin
  5. connection.password=quest
  6. topics=example-topic
  7. insert.mode=insert
  8. dialect.name=PostgreSqlDatabaseDialect
  9. pk.mode=none
  10. auto.create=true

Start Kafka

The commands listed in this section must be run from the Kafka home directory and in the order shown below.

Start the Kafka Zookeeper used to coordinate the server:

  1. bin/zookeeper-server-start.sh config/zookeeper.properties

Start a Kafka server:

  1. bin/kafka-server-start.sh config/server.properties

Start Kafka Connect:

  1. bin/connect-standalone.sh config/connect-standalone.properties config/connect-jdbc.properties

Publish messages

Messages can be published via the console producer script:

  1. bin/kafka-console-producer.sh --topic example-topic --bootstrap-server localhost:9092

A > greater-than symbol indicates that a messages can be published to the example topic from the interactive session. Paste the following minified JSON as a single line to publish messages and create the table example-topic in the QuestDB instance:

  1. {"schema":{"type":"struct","fields":[{"type":"boolean","optional":false,"field":"flag"},{"type":"int8","optional":false,"field":"id8"},{"type":"int16","optional":false,"field":"id16"},{"type":"int32","optional":false,"field":"id32"},{"type":"int64","optional":false,"field":"id64"},{"type":"float","optional":false,"field":"idFloat"},{"type":"double","optional":false,"field":"idDouble"},{"type":"string","optional":true,"field":"msg"}],"optional":false,"name":"msgschema"},"payload":{"flag":false,"id8":222,"id16":222,"id32":222,"id64":222,"idFloat":222,"idDouble":333,"msg":"hi"}}

Verify the integration

To verify that the data has been ingested into the example-topic table, the following request to QuestDB’s /exp REST endpoint can be made to export the table contents via CURL:

  1. curl -G \
  2. --data-urlencode "query=select * from 'example-topic'" \
  3. http://localhost:9000/exp

The expected response based on the example JSON message published above will be the following:

  1. "flag","id8","id16","id32","id64","idFloat","idDouble","msg"
  2. false,-34,-34,222,222,222.0000,333.0,"hi"

JSON format

The JSON object sent in the example above has the following structure containing schema and payload objects:

  1. {
  2. "schema": {
  3. "type": "struct",
  4. "fields": [
  5. {
  6. "type": "boolean",
  7. "optional": false,
  8. "field": "flag"
  9. },
  10. {
  11. "type": "int8",
  12. "optional": false,
  13. "field": "id8"
  14. },
  15. {
  16. "type": "int16",
  17. "optional": false,
  18. "field": "id16"
  19. },
  20. {
  21. "type": "int32",
  22. "optional": false,
  23. "field": "id32"
  24. },
  25. {
  26. "type": "int64",
  27. "optional": false,
  28. "field": "id64"
  29. },
  30. {
  31. "type": "float",
  32. "optional": false,
  33. "field": "idFloat"
  34. },
  35. {
  36. "type": "double",
  37. "optional": false,
  38. "field": "idDouble"
  39. },
  40. {
  41. "type": "string",
  42. "optional": true,
  43. "field": "msg"
  44. }
  45. ],
  46. "optional": false,
  47. "name": "msgschema"
  48. },
  49. "payload": {
  50. "flag": false,
  51. "id8": 222,
  52. "id16": 222,
  53. "id32": 222,
  54. "id64": 222,
  55. "idFloat": 222,
  56. "idDouble": 333,
  57. "msg": "hi"
  58. }
  59. }