Table Of Contents

6.10. Kafka Connector

6.10.1. Introduction

The quasardb Kafka connector makes use of Kafka Connect to extend QuasarDB’s support to allow for storing of data using Kafka topics.

You may read and download the connector’s code from GitHub at https://github.com/bureau14/kafka-connect-qdb.

6.10.2. Installation

QuasarDB C library

Since our Kafka connector makes use of the QuasarDB C library, you need to install the qdb-api library on every working machine as well.

For more information on QuasarDB package installation, please see any of the following pages:

Plugin jar

The QuasarDB Kafka Connector is distributed as an uber jar containing all of the classfiles for the plugin. You can download the latest build at https://download.quasardb.net/quasardb/nightly/api/kafka/.

After downloading the uberjar, put the file in your plugin path as defined in your Kafka Connect’s worker configuration (example below).

Please refer to the Official Kafka documentation for more information on installing plugins.

6.10.3. Configuration

Worker

After we have downloaded our plugin, we must put it in a location on our Kafka Connect workers’ machine and create a worker configuration.

Assuming you want to put your Kafka plugins in /usr/local/share/kafka/plugins, we tell the worker where to find the Kafka broker and plugins like this:

bootstrap.servers=kafka:9092
plugin.path=/usr/local/share/kafka/plugins

The QuasarDB Kafka Connect plugin currently only works with schemaless JSON, which is defined as follows:

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

In addition, we must provide some additional configuration, such as the flush interval and where it needs to keep track of its offsets (in case of crashes):

offset.storage.file.filename=/var/lib/kafka/connect.offsets
offset.flush.interval.ms=60000

This will tell the plugin to flush its buffer every minute. You might want to tune this variable, depending upon your workload.

For completeness, here is an example of the complete worker.properties:

bootstrap.servers=kafka:9092
plugin.path=/usr/local/share/kafka/plugins
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.file.filename=/var/lib/kafka/connect.offsets
offset.flush.interval.ms=60000

Connector

After configuring your workers to detect the plugin, we must still write a configuration for the connector, to tell it how to write the data into QuasarDB. For this, we define a connector.properties file as follows:

name=qdb-sink
connector.class=QdbSinkConnector
tasks.max=10
topics=topic1,topic2
qdb.cluster=qdb://my.qdb-server:2836/
qdb.table_from_topic=topic1=table1,topic2=table2

This will tell the QuasarDB Sink to connect to the QuasarDB cluster at qdb://my.qdb-server:2836, and perform a translation of topics to tables:

  • Kafka topic topic1 is translated to QuasarDB table table1
  • Kafka topic topic2 is translated to QuasarDB table table2

The QuasarDB Sink that all columns present in the table match a column with the same name in the Json data structure. We rely on Kafka Connect Transformations for you to extract and rename the fields to match the format in QuasarDB tables. For more information, see the official documentation.

Running the worker

Assuming you saved the worker.properties and connector.properties in /opt/kafka/config/, we can then launch a Kafka Connect worker like this:

/opt/kafka/bin/connect-standalone.sh /opt/kafka/config/worker.properties /opt/kafka/config/connector.properties

You should see periodic log messages of the Kafka connector flushing its output towards QuasarDB.

6.10.4. Performance Considerations

To achieve the best performance, please take note of the following considerations when modelling your Kafka topics and installing the plugins:

  • Partition your topics by table destination; this will benefit distributed workers tremendously, and avoid the performance penalty you get when writing into the same QuasarDB shard by multiple concurrent workers
  • Try to buffer as long as possible, but not longer: ideally you let each of your Kafka workers buffer exactly one shard of data; e.g. when your shard size is 5 minutes, you provide your Kafka worker with a offset.flush.interval.ms=300000.
  • Tune your workers’ systems as outlined in the Performance tuning documentation.