MongoDB Kafka Connector

July 02, 2022

Apache Kafka is an open-source publish/subscribe messaging system. And Kafka Connect is a component of Apache Kafka that solves the problem of connecting Apache Kafka to datastores such as MongoDB. Kafka Connect solves this problem by providing the following resources:

  • A fault-tolerant runtime for transferring data to and from datastores.
  • A framework for the Apache Kafka community to share solutions for connecting Apache Kafka to different datastores.

We would focus on using MongoDB as a data lake for our use case. And the MongoDB Kafka sink connector is a Kafka Connect connector that reads data from Apache Kafka and writes data to MongoDB. The official MongoDB Kafka Connector is at https://github.com/mongodb/mongo-kafka

sink

Start Kafka Environment

Download the latest Kafka: https://www.apache.org/dyn/closer.cgi?path=/kafka/3.2.0/kafka_2.13-3.2.0.tgz

$ curl https://dlcdn.apache.org/kafka/3.2.0/kafka_2.13-3.2.0.tgz -o kafka_2.13-3.2.0.tgz
$ tar -xzf kafka_2.13-3.2.0.tgz
$ cd kafka_2.13-3.2.0

Run the following commands to start all services in the correct order. Start the ZooKeeper service.

$ bin/zookeeper-server-start.sh config/zookeeper.properties

Open another terminal session and run. Start the Kafka broker service:

$ bin/kafka-server-start.sh config/server.properties

Once all services have successfully launched, you will have a basic Kafka environment running and ready to use.

Install plugin

Download the jar: https://search.maven.org/artifact/org.mongodb.kafka/mongo-kafka-connect and change directory to folder /libs

curl -L https://search.maven.org/remotecontent?filepath=org/mongodb/kafka/mongo-kafka-connect/1.7.0/mongo-kafka-connect-1.7.0-all.jar -o plugin/mongo-kafka-connect-1.7.0-all.jar

Edit config/connect-standalone.properties, change the plugin.path configuration property match the path to the jar:

plugin.path=/home/ubuntu/kafka_2.13-3.2.0/libs/mongo-kafka-connect-1.7.0-all.jar

For example

# These are defaults. This file just demonstrates how to override some settings.

bootstrap.servers=localhost:9092

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will

# need to configure these based on the format they want their data in when loaded from or stored into Kafka

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply it to

key.converter.schemas.enable=true
value.converter.schemas.enable=true

offset.storage.file.filename=/tmp/connect.offsets

# Flush much faster than normal, which is useful for testing/debugging

offset.flush.interval.ms=10000

# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins

# (connectors, converters, transformations). The list should consist of top level directories that include

# any combination of:

# a) directories immediately containing jars with plugins and their dependencies

# b) uber-jars with plugins and their dependencies

# c) directories immediately containing the package directory structure of classes of plugins and their dependencies

# Note: symlinks will be followed to discover dependencies or plugins.

# Examples:

# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,

#plugin.path=
plugin.path=/home/ubuntu/kafka_2.13-3.2.0/libs/mongo-kafka-connect-1.7.0-all.jar

Create config properites

In /config folder, create file MongoSinkConnector.properties

name=mongo-sink
topics=quickstart.sampleData
connector.class=com.mongodb.kafka.connect.MongoSinkConnector

Message types

key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

Specific global MongoDB Sink Connector configuration

connection.url=mongodb://localhost:27017
database=quickstart
collection=topicData
change.data.capture.handler=com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler

In /config folder, create file MongoSourceConnector.properties

name=mongo-source
connector.class=com.mongodb.kafka.connect.MongoSourceConnector

Connection and source configuration

connection.uri=mongodb://localhost:27017
database=quickstart
collection=sampleData

Install Mongodb

Import the public key used by the package management system. From a terminal, issue the following command to import the MongoDB public GPG Key from (https://www.mongodb.org/static/pgp/server-5.0.asc)[https://www.mongodb.org/static/pgp/server-5.0.asc]

wget -qO - https://www.mongodb.org/static/pgp/server-5.0.asc | sudo apt-key add -

Create the /etc/apt/sources.list.d/mongodb-org-5.0.list file for Ubuntu 20.04 (Focal):

echo "deb [ arch=amd64,arm64 ] https://repo.mongodb.org/apt/ubuntu focal/mongodb-org/5.0 multiverse" | sudo tee /etc/apt/sources.list.d/mongodb-org-5.0.list

Reload local package database

sudo apt-get update

Install the MongoDB packages. Issue the following

sudo apt-get install -y mongodb-org

If you hit an error with:

The following packages have unmet dependencies:

mongodb-org-mongos : Depends: libssl1.1 (>= 1.1.1) but it is not installable
mongodb-org-server : Depends: libssl1.1 (>= 1.1.1) but it is not installable
mongodb-org-shell : Depends: libssl1.1 (>= 1.1.1) but it is not installable

E: Unable to correct problems, you have held broken packages.

Fix it with the command below:

echo "deb http://security.ubuntu.com/ubuntu impish-security main" | sudo tee /etc/apt/sources.list.d/impish-security.list

sudo apt-get update

sudo apt-get install libssl1.1

Verify that MongoDB has started successfully.

sudo systemctl status mongod

And if it’s inactive and need to restart, run

sudo systemctl restart mongod

Start the Kafka Connect

Run the command:

bin/connect-standalone.sh config/connect-standalone.properties config/MongoSourceConnector.properties config/MongoSinkConnector.properties

Write some data to the topic

A Kafka client communicates with the Kafka brokers via the network for writing events. Once received, the brokers will store the events in a durable and fault-tolerant manner for as long as you need—even forever.

Run the console producer client to write a few events into your topic. By default, each line you enter will result in a separate event being written to the topic.

$ bin/kafka-console-producer.sh --topic connect-test --bootstrap-server localhost:9092
This is my first event
This is my second event

Send the Contents of a Document through Your Connectors

To send the contents of a document through your connectors, insert a document into the MongoDB collection from which your source connector reads data.

To insert a new document into your collection, enter the MongoDB shell from the shell in your Docker container using the following command:

mongosh mongodb://127.0.0.1:27017/

From the MongoDB shell, insert a document into the sampleData collection of the quickstart database using the following commands:

use quickstart
db.sampleData.insertOne({"hello":"world"})

After you insert a document into the sampleData collection, confirm that your connectors processed the change. Check the contents of the topicData collection using the following command:

db.topicData.find()

You should see output that resembles the following:

[
   {
      _id: ObjectId(...),
      hello: 'world',
      travel: 'MongoDB Kafka Connector'
   }
]

Reference: MongoDB Kafka Connector: https://www.mongodb.com/docs/kafka-connector/current/


Profile picture

Experience in software development, application architecture, and deploying cloud solutions for enterprise customers. Strong hands-on skills with a Master's degree in Computer Science and business acumen with a master of business administration (MBA) in Finance. Certified in Amazon Web Services (AWS), Google Cloud Platform (GCP), Microsoft Azure, Kubernetes (CKA, CKAD, CKS, KCNA) and Scrum (PSM, PSPO) with experience in building banking products from scratch. Connect on Linkedin

© 2022, @victorleungtw