preloader
blog-post

How to install and configure the MongoDB Connector for Confluent Kafka

Table of Contents

Prerequisites

  • Install Confluent platform for your operating system following instructions here
  • Install MongoDB for your operating system following instructions here
  • Always make sure your system and packages are up to date using sudo apt-get update && sudo apt-get upgrade for Ubuntu and sudo yum update for CentOS

Steps

Before we setup the connectors, Lets import some test data in MongoDB.

  • Download some sample dataset from “https://catalog.data.gov/dataset?res_format=CSV" to a directory in your MongoDB server

  • cd into the directory and confirm file is available

    cd ~/sample_dataset;ls;
    
  • Use mongoimport command to import the file

    mongoimport --type csv -d entechlog -c Accidental_Drug_Related_Deaths --headerline --drop Accidental_Drug_Related_Deaths_2012-2018.csv
    

    • type: The input format to import: json, csv, or tsv.
    • d: Specifies what database to use.
    • c: Specifies what collection to use.
    • headerline: Specifies first row in csv file is field names.
    • drop: Specifies to drop the collection before importing documents.

  • Validate the data in MongoDB using MongoDB compass

  • Install MongoDB Connector for Apache Kafka, See “https://www.confluent.io/hub/mongodb/kafka-connect-mongodb” for the instructions

  • Make sure you have a replica set before configuring the connector. See “https://docs.mongodb.com/manual/tutorial/convert-standalone-to-replica-set/" for more details on how to enable replica set

  • If you have replica set enabled then you can skip next few steps until the error message highlighted in red

    sudo systemctl stop mongod
    sudo systemctl edit --full  mongod
    ExecStart=/usr/bin/mongod --replSet rs0 --config /etc/mongod.conf
    sudo systemctl start mongod;sudo systemctl status mongod;
    
  • Make sure to connect to a mongo shell and initiate the new replica set rs.initiate(), Validate the status using rs.status()

If you miss initiate step then connector will fail with

com.mongodb.MongoCommandException: Command failed with error 94 (NotYetInitialized): ‘Cannot use snapshots until replica set is finished initializing.’ on server entechlog-vm-01:27017

  • Here is the config for a source connector with StringConverter serde. This will not register the schema in confluent schema registry

  • You can either use curl OR postman OR control center UI to create the connector

    Curl

    curl -X PUT http://entechlog-vm-01:8083/connectors/ACCIDENTAL_DRUG_RELATED_DEATHS_0010_SRC_MONGODB/config -H "Content-Type: application/json" -d '{
      "name": "ACCIDENTAL_DRUG_RELATED_DEATHS_0010_SRC_MONGODB",
      "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
      "key.converter": "org.apache.kafka.connect.storage.StringConverter",
      "value.converter": "org.apache.kafka.connect.storage.StringConverter",
      "connection.uri": "mongodb://admin:admin@entechlog-vm-01:27017/admin?authSource=admin&readPreference=primary&ssl=false",
      "copy.existing": true,
      "database": "entechlog",
      "collection": "Accidental_Drug_Related_Deaths",
      "publish.full.document.only": true,
      "topic.prefix": "dev.mongodb"
    }'
    

    Postman

    Confluent Control Center

    Data in topic

  • Since “StringConverter” did not register a schema, Tried creating connectors with “AvroConverter” and “JsonConverter” as both key and value converters. “AvroConverter” registered a schema with fields as “string” which is not useful.

    Connector Config - AVRO

    curl -X PUT http://entechlog-vm-01:8083/connectors/ACCIDENTAL_DRUG_RELATED_DEATHS_0020_SRC_MONGODB/config -H "Content-Type: application/json" -d '{
      "name": "ACCIDENTAL_DRUG_RELATED_DEATHS_0020_SRC_MONGODB",
      "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
      "key.converter": "io.confluent.connect.avro.AvroConverter",
      "value.converter": "io.confluent.connect.avro.AvroConverter",
      "value.converter.schema.registry.url": "http://entechlog-vm-01:8081",
      "key.converter.schema.registry.url": "http://entechlog-vm-01:8081",
      "schema.registry.url": "http://entechlog-vm-01:8081",
      "connection.uri": "mongodb://admin:admin@entechlog-vm-01:27017/admin?authSource=admin&readPreference=primary&ssl=false",
      "copy.existing": true,
      "database": "entechlog",
      "collection": "Accidental_Drug_Related_Deaths",
      "publish.full.document.only": true,
      "topic.prefix": "dev.mongodb.avro"
    }'
    

    Schema Registry - AVRO

    Connector Config - JSON

    curl -X PUT http://entechlog-vm-01:8083/connectors/ACCIDENTAL_DRUG_RELATED_DEATHS_0030_SRC_MONGODB/config -H "Content-Type: application/json" -d '{
      "name": "ACCIDENTAL_DRUG_RELATED_DEATHS_0030_SRC_MONGODB",
      "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
      "key.converter": "org.apache.kafka.connect.json.JsonConverter",
      "value.converter": "org.apache.kafka.connect.json.JsonConverter",
      "value.converter.schema.registry.url": "http://entechlog-vm-01:8081",
      "key.converter.schema.registry.url": "http://entechlog-vm-01:8081",
      "schema.registry.url": "http://entechlog-vm-01:8081",
      "connection.uri": "mongodb://admin:admin@entechlog-vm-01:27017/admin?authSource=admin&readPreference=primary&ssl=false",
      "copy.existing": true,
      "database": "entechlog",
      "collection": "Accidental_Drug_Related_Deaths",
      "publish.full.document.only": true,
      "topic.prefix": "dev.mongodb.json"
    }'
    

References

Share this blog:
Comments

Related Articles