How to install and configure the MongoDB Connector for Confluent Kafka

  • Home
  • /
  • How to install and configure the MongoDB Connector for Confluent Kafka
How to install and configure the MongoDB Connector for Confluent Kafka

How to install and configure the MongoDB Connector for Confluent Kafka

Kafka 23 Feb 2020
Prerequisites
  • Install Confluent platform for your operating system following instructions at “https://docs.confluent.io/current/installation/installing_cp/index.html”
  • Install MongoDB for your operating system following instructions at “https://docs.mongodb.com/manual/installation/"
  • Always make sure your system and packages are upto date using sudo apt-get update && sudo apt-get upgrade for Ubuntu and sudo yum update for CentOS

Before we setup the connectors, Lets import some test data in MongoDB.

Download some sample dataset from “https://catalog.data.gov/dataset?res_format=CSV" to a directory in your MongoDB server.

cd into the directory and confirm file is available
cd ~/sample_dataset;ls;

Use mongoimport command to import the file
mongoimport --type csv -d entechlog -c Accidental_Drug_Related_Deaths --headerline --drop Accidental_Drug_Related_Deaths_2012-2018.csv

  • type: The input format to import: json, csv, or tsv.
  • d: Specifies what database to use.
  • c: Specifies what collection to use.
  • headerline: Specifies first row in csv file is field names.
  • drop: Specifies to drop the collection before importing documents.

Validate the data in MongoDB using MongoDB compass.

post thumb

Install MongoDB Connector for Apache Kafka, See “https://www.confluent.io/hub/mongodb/kafka-connect-mongodb” for the instructions

Make sure you have a replica set before configuring the connector. See “https://docs.mongodb.com/manual/tutorial/convert-standalone-to-replica-set/" for more details on how to enable replica set

If you have replica set enabled then you can skip next few steps until the error message highlighted in red

sudo systemctl stop mongod
sudo systemctl edit --full  mongod
ExecStart=/usr/bin/mongod --replSet rs0 --config /etc/mongod.conf
sudo systemctl start mongod;sudo systemctl status mongod;

Make sure to connect to a mongo shell and initiate the new replica set rs.initiate(), Validate the status using rs.status()

If you miss initiate step then connector will fail with com.mongodb.MongoCommandException: Command failed with error 94 (NotYetInitialized): ‘Cannot use snapshots until replica set is finished initializing.’ on server entechlog-vm-01:27017

Here is the config for a source connector with StringConverter serde. This will not register the schema in confluent schema registry

You can either use curl OR postman OR control center UI to create the connector

Curl

curl -X PUT http://entechlog-vm-01:8083/connectors/ACCIDENTAL_DRUG_RELATED_DEATHS_0010_SRC_MONGODB/config -H "Content-Type: application/json" -d '{
  "name": "ACCIDENTAL_DRUG_RELATED_DEATHS_0010_SRC_MONGODB",
  "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "value.converter": "org.apache.kafka.connect.storage.StringConverter",
  "connection.uri": "mongodb://admin:admin@entechlog-vm-01:27017/admin?authSource=admin&readPreference=primary&ssl=false",
  "copy.existing": true,
  "database": "entechlog",
  "collection": "Accidental_Drug_Related_Deaths",
  "publish.full.document.only": true,
  "topic.prefix": "dev.mongodb"
}'

Postman post thumb

Confluent Control Center post thumb

Data in topic post thumb

Since “StringConverter” did not register a schema, Tried creating connectors with “AvroConverter” and “JsonConverter” as both key and value converters. “AvroConverter” registered a schema with fields as “string” which is not useful. I have reached out to someone in MongoDB to see if this a issue with the connector OR if I am configuring something incorrectly.

Connector Config - AVRO

curl -X PUT http://entechlog-vm-01:8083/connectors/ACCIDENTAL_DRUG_RELATED_DEATHS_0020_SRC_MONGODB/config -H "Content-Type: application/json" -d '{
  "name": "ACCIDENTAL_DRUG_RELATED_DEATHS_0020_SRC_MONGODB",
  "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
  "key.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter.schema.registry.url": "http://entechlog-vm-01:8081",
  "key.converter.schema.registry.url": "http://entechlog-vm-01:8081",
  "schema.registry.url": "http://entechlog-vm-01:8081",
  "connection.uri": "mongodb://admin:admin@entechlog-vm-01:27017/admin?authSource=admin&readPreference=primary&ssl=false",
  "copy.existing": true,
  "database": "entechlog",
  "collection": "Accidental_Drug_Related_Deaths",
  "publish.full.document.only": true,
  "topic.prefix": "dev.mongodb.avro"
}'

Schema Registry - AVRO post thumb

Connector Config - JSON

curl -X PUT http://entechlog-vm-01:8083/connectors/ACCIDENTAL_DRUG_RELATED_DEATHS_0030_SRC_MONGODB/config -H "Content-Type: application/json" -d '{
  "name": "ACCIDENTAL_DRUG_RELATED_DEATHS_0030_SRC_MONGODB",
  "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
  "key.converter": "org.apache.kafka.connect.json.JsonConverter",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "value.converter.schema.registry.url": "http://entechlog-vm-01:8081",
  "key.converter.schema.registry.url": "http://entechlog-vm-01:8081",
  "schema.registry.url": "http://entechlog-vm-01:8081",
  "connection.uri": "mongodb://admin:admin@entechlog-vm-01:27017/admin?authSource=admin&readPreference=primary&ssl=false",
  "copy.existing": true,
  "database": "entechlog",
  "collection": "Accidental_Drug_Related_Deaths",
  "publish.full.document.only": true,
  "topic.prefix": "dev.mongodb.json"
}'
References
Share: