
How to install and configure the MongoDB Connector for Confluent Kafka

Table of Contents


  • Install Confluent platform for your operating system following instructions here
  • Install MongoDB for your operating system following instructions here
  • Always make sure your system and packages are up to date using sudo apt-get update && sudo apt-get upgrade for Ubuntu and sudo yum update for CentOS


Before we setup the connectors, Lets import some test data in MongoDB.

  • Download some sample dataset from “" to a directory in your MongoDB server

  • cd into the directory and confirm file is available

    cd ~/sample_dataset;ls;
  • Use mongoimport command to import the file

    mongoimport --type csv -d entechlog -c Accidental_Drug_Related_Deaths --headerline --drop Accidental_Drug_Related_Deaths_2012-2018.csv

    • type: The input format to import: json, csv, or tsv.
    • d: Specifies what database to use.
    • c: Specifies what collection to use.
    • headerline: Specifies first row in csv file is field names.
    • drop: Specifies to drop the collection before importing documents.

  • Validate the data in MongoDB using MongoDB compass

  • Install MongoDB Connector for Apache Kafka, See “” for the instructions

  • Make sure you have a replica set before configuring the connector. See “" for more details on how to enable replica set

  • If you have replica set enabled then you can skip next few steps until the error message highlighted in red

    sudo systemctl stop mongod
    sudo systemctl edit --full  mongod
    ExecStart=/usr/bin/mongod --replSet rs0 --config /etc/mongod.conf
    sudo systemctl start mongod;sudo systemctl status mongod;
  • Make sure to connect to a mongo shell and initiate the new replica set rs.initiate(), Validate the status using rs.status()

If you miss initiate step then connector will fail with

com.mongodb.MongoCommandException: Command failed with error 94 (NotYetInitialized): ‘Cannot use snapshots until replica set is finished initializing.’ on server entechlog-vm-01:27017

  • Here is the config for a source connector with StringConverter serde. This will not register the schema in confluent schema registry

  • You can either use curl OR postman OR control center UI to create the connector


    curl -X PUT http://entechlog-vm-01:8083/connectors/ACCIDENTAL_DRUG_RELATED_DEATHS_0010_SRC_MONGODB/config -H "Content-Type: application/json" -d '{
      "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
      "key.converter": "",
      "value.converter": "",
      "connection.uri": "mongodb://admin:admin@entechlog-vm-01:27017/admin?authSource=admin&readPreference=primary&ssl=false",
      "copy.existing": true,
      "database": "entechlog",
      "collection": "Accidental_Drug_Related_Deaths",
      "publish.full.document.only": true,
      "topic.prefix": "dev.mongodb"


    Confluent Control Center

    Data in topic

  • Since “StringConverter” did not register a schema, Tried creating connectors with “AvroConverter” and “JsonConverter” as both key and value converters. “AvroConverter” registered a schema with fields as “string” which is not useful.

    Connector Config - AVRO

    curl -X PUT http://entechlog-vm-01:8083/connectors/ACCIDENTAL_DRUG_RELATED_DEATHS_0020_SRC_MONGODB/config -H "Content-Type: application/json" -d '{
      "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
      "key.converter": "io.confluent.connect.avro.AvroConverter",
      "value.converter": "io.confluent.connect.avro.AvroConverter",
      "value.converter.schema.registry.url": "http://entechlog-vm-01:8081",
      "key.converter.schema.registry.url": "http://entechlog-vm-01:8081",
      "schema.registry.url": "http://entechlog-vm-01:8081",
      "connection.uri": "mongodb://admin:admin@entechlog-vm-01:27017/admin?authSource=admin&readPreference=primary&ssl=false",
      "copy.existing": true,
      "database": "entechlog",
      "collection": "Accidental_Drug_Related_Deaths",
      "publish.full.document.only": true,
      "topic.prefix": "dev.mongodb.avro"

    Schema Registry - AVRO

    Connector Config - JSON

    curl -X PUT http://entechlog-vm-01:8083/connectors/ACCIDENTAL_DRUG_RELATED_DEATHS_0030_SRC_MONGODB/config -H "Content-Type: application/json" -d '{
      "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
      "key.converter": "org.apache.kafka.connect.json.JsonConverter",
      "value.converter": "org.apache.kafka.connect.json.JsonConverter",
      "value.converter.schema.registry.url": "http://entechlog-vm-01:8081",
      "key.converter.schema.registry.url": "http://entechlog-vm-01:8081",
      "schema.registry.url": "http://entechlog-vm-01:8081",
      "connection.uri": "mongodb://admin:admin@entechlog-vm-01:27017/admin?authSource=admin&readPreference=primary&ssl=false",
      "copy.existing": true,
      "database": "entechlog",
      "collection": "Accidental_Drug_Related_Deaths",
      "publish.full.document.only": true,
      "topic.prefix": "dev.mongodb.json"


Share this blog:

Related Articles