preloader
blog-post

Exploring ksqlDB, Superset with real time Twitter data feed of my favorite celebrities

Prerequisites

Ingest Twitter data

  • Create Kafka source connector and start ingesting data from twitter

    CREATE SOURCE CONNECTOR TWITTER_CELEBRITIES_SRC_0020 WITH(
        "connector.class"='com.github.jcustenborder.kafka.connect.twitter.TwitterSourceConnector',
        "twitter.oauth.consumerKey"='xyz',
        "twitter.oauth.consumerSecret"='abc',
        "twitter.oauth.accessToken"='123',
        "twitter.oauth.accessTokenSecret"='789',
        "kafka.status.topic"='dev.etl.twitter.celebrities.src.0020',
        "key.converter"='io.confluent.connect.avro.AvroConverter',
        "key.converter.schemas.enable"='true',
        "key.converter.schema.registry.url"='http://localhost:8081',
        "value.converter"='io.confluent.connect.avro.AvroConverter',
        "value.converter.schemas.enable"='true',
        "value.converter.schema.registry.url"='http://localhost:8081',
        "process.deletes"=false,
        "filter.keywords"='mohanlal,dhoni'
    );
    
  • Check status of connector using

    ksql> DESCRIBE CONNECTOR TWITTER_CELEBRITIES_SRC_0020;
    Name                 : TWITTER_CELEBRITIES_SRC_0020
    Class                : com.github.jcustenborder.kafka.connect.twitter.TwitterSourceConnector
    Type                 : source
    State                : RUNNING
    WorkerId             : 127.0.1.1:8083
    Task ID | State   | Error Trace
    ---------------------------------
    0       | RUNNING |
    ---------------------------------
    ksql>
    
  • Check status of topic using, If we had tweets coming in you should see the topic created

    ksql> SHOW TOPICS;
    Kafka Topic                                                                                   | Partitions | Partition Replicas
    ---------------------------------------------------------------------------------------------------------------------------------
    _confluent-command                                                                            | 1          | 1
    _confluent-controlcenter-5-4-0-1-actual-group-consumption-rekey                               | 2          | 1
    dev.etl.twitter.celebrities.src.0020                                                          | 1          | 1
    
  • Check messages in topic using 

    ksql> PRINT 'dev.etl.twitter.celebrities.src.0020' FROM BEGINNING LIMIT 1;
    Format:AVRO
    2/16/20 5:00:40 PM EST, y������", {"CreatedAt": 1581890434000, "Id": 1229163687396728832, "Text": "You can check here the Indian Premier League(IPL) 13 CSK team full squad 2020.\n#Yellove #WhistlePodu 🦁💛 #RR #KKR #SRH #CSK #MI #RCB #Virat #Kohli #Dhoni #IPL12 #IPLMeme #cricketball #cricketnews #IPL2020 #IPL13 #TeamIndia #ViratKohli #Mahi \nhttps://t.co/295eLd2xNJ", "Source": "<a href=\"https://www.hootsuite.com\" rel=\"nofollow\">Hootsuite Inc.</a>", "Truncated": true, "InReplyToStatusId": -1, "InReplyToUserId": -1, "InReplyToScreenName": null, "GeoLocation": null, "Place": null, "Favorited": false, "Retweeted": false, "FavoriteCount": 0, "User": {"Id": 1067662737675710466, "Name": "Tentaran Sports", "ScreenName": "TentaranSports", "Location": "New Delhi, India", "Description": "#INDvNZ - India and New zealand team squad for New zealand tour of India 2020.\n👇👇👇👇👇👇", "ContributorsEnabled": false, "ProfileImageURL": "http://pbs.twimg.com/profile_images/1088038065598164994/wQxPihWz_normal.jpg", "BiggerProfileImageURL": "http://pbs.twimg.com/profile_images/1088038065598164994/wQxPihWz_bigger.jpg", "MiniProfileImageURL": "http://pbs.twimg.com/profile_images/1088038065598164994/wQxPihWz_mini.jpg", "OriginalProfileImageURL": "http://pbs.twimg.com/profile_images/1088038065598164994/wQxPihWz.jpg", "ProfileImageURLHttps": "https://pbs.twimg.com/profile_images/1088038065598164994/wQxPihWz_normal.jpg", "BiggerProfileImageURLHttps": "https://pbs.twimg.com/profile_images/1088038065598164994/wQxPihWz_bigger.jpg", "MiniProfileImageURLHttps": "https://pbs.twimg.com/profile_images/1088038065598164994/wQxPihWz_mini.jpg", "OriginalProfileImageURLHttps": "https://pbs.twimg.com/profile_images/1088038065598164994/wQxPihWz.jpg", "DefaultProfileImage": false, "URL": "https://www.tentaran.com/indian-team-squad-for-new-zealand-tour-2020/", "Protected": false, "FollowersCount": 1167, "ProfileBackgroundColor": "000000", "ProfileTextColor": "000000", "ProfileLinkColor": "1B95E0", "ProfileSidebarFillColor": "000000", "ProfileSidebarBorderColor": "000000", "ProfileUseBackgroundImage": false, "DefaultProfile": false, "ShowAllInlineMedia": false, "FriendsCount": 862, "CreatedAt": 1543385607000, "FavouritesCount": 7155, "UtcOffset": -1, "TimeZone": null, "ProfileBackgroundImageURL": "http://abs.twimg.com/images/themes/theme1/bg.png", "ProfileBackgroundImageUrlHttps": "https://abs.twimg.com/images/themes/theme1/bg.png", "ProfileBannerURL": "https://pbs.twimg.com/profile_banners/1067662737675710466/1575021259/web", "ProfileBannerRetinaURL": "https://pbs.twimg.com/profile_banners/1067662737675710466/1575021259/web_retina", "ProfileBannerIPadURL": "https://pbs.twimg.com/profile_banners/1067662737675710466/1575021259/ipad", "ProfileBannerIPadRetinaURL": "https://pbs.twimg.com/profile_banners/1067662737675710466/1575021259/ipad_retina", "ProfileBannerMobileURL": "https://pbs.twimg.com/profile_banners/1067662737675710466/1575021259/mobile", "ProfileBannerMobileRetinaURL": "https://pbs.twimg.com/profile_banners/1067662737675710466/1575021259/mobile_retina", "ProfileBackgroundTiled": false, "Lang": null, "StatusesCount": 23426, "GeoEnabled": true, "Verified": false, "Translator": false, "ListedCount": 0, "FollowRequestSent": false, "WithheldInCountries": []}, "Retweet": false, "Contributors": [], "RetweetCount": 0, "RetweetedByMe": false, "CurrentUserRetweetId": -1, "PossiblySensitive": false, "Lang": "en", "WithheldInCountries": [], "HashtagEntities": [{"Text": "Yellove", "Start": 79, "End": 87}, {"Text": "WhistlePodu", "Start": 88, "End": 100}, {"Text": "RR", "Start": 104, "End": 107}, {"Text": "KKR", "Start": 108, "End": 112}, {"Text": "SRH", "Start": 113, "End": 117}, {"Text": "CSK", "Start": 118, "End": 122}, {"Text": "MI", "Start": 123, "End": 126}, {"Text": "RCB", "Start": 127, "End": 131}, {"Text": "Virat", "Start": 132, "End": 138}, {"Text": "Kohli", "Start": 139, "End": 145}, {"Text": "Dhoni", "Start": 146, "End": 152}, {"Text": "IPL12", "Start": 153, "End": 159}, {"Text": "IPLMeme", "Start": 160, "End": 168}, {"Text": "cricketball", "Start": 169, "End": 181}, {"Text": "cricketnews", "Start": 182, "End": 194}, {"Text": "IPL2020", "Start": 195, "End": 203}, {"Text": "IPL13", "Start": 204, "End": 210}, {"Text": "TeamIndia", "Start": 211, "End": 221}, {"Text": "ViratKohli", "Start": 222, "End": 233}, {"Text": "Mahi", "Start": 234, "End": 239}], "UserMentionEntities": [], "MediaEntities": [], "SymbolEntities": [], "URLEntities": [{"URL": "https://t.co/295eLd2xNJ", "Text": "https://t.co/295eLd2xNJ", "ExpandedURL": "https://www.tentaran.com/ipl-chennai-super-kings-team-2020-players-list-csk/", "Start": 241, "End": 264, "DisplayURL": "tentaran.com/ipl-chennai-su\u2026"}]}
    ksql>
    
  • Now lets create a stream on this topic using

    CREATE STREAM STM_TWITTER_CELEBRITIES_SRC_0020 WITH (KAFKA_TOPIC='dev.etl.twitter.celebrities.src.0020', VALUE_FORMAT='Avro');
    
  • Issue this command to start reading data from the beginning of the topic

    SET 'auto.offset.reset'='earliest';
    
  • Selecting/Filtering columns with ksqlDB queries

    Push Query

    SELECT USER->SCREENNAME, LANG, TEXT FROM STM_TWITTER_CELEBRITIES_SRC_0020 EMIT CHANGES;
    

    Pull Query

    SELECT USER->SCREENNAME, LANG, TEXT FROM STM_TWITTER_CELEBRITIES_SRC_0020 WHERE LANG = 'en';
    

    You can’t run a pull against stream, KSQL currently only supports pull queries on materialized aggregate tables. i.e. those created by a CREATE TABLE AS SELECT , FROM GROUP BY style statement.

  • Aggregates in ksqlDB

    ksql> SELECT LANG,COUNT(*) FROM STM_TWITTER_CELEBRITIES_SRC_0020 GROUP BY LANG EMIT CHANGES;
    +----------------------------------------------------------------------------------+----------------------------------------------------------------------------------+
    |LANG                                                                              |KSQL_COL_1                                                                        |
    +----------------------------------------------------------------------------------+----------------------------------------------------------------------------------+
    |cy                                                                                |1                                                                                 |
    |sv                                                                                |1                                                                                 |
    |gu                                                                                |3                                                                                 |
    |ro                                                                                |1                                                                                 |
    |no                                                                                |1                                                                                 |
    |te                                                                                |2                                                                                 |
    |sl                                                                                |1                                                                                 |
    |or                                                                                |1                                                                                 |
    
  • Now lets examine a record which is a candidate for EXPLODE

    ksql> SELECT ID, HASHTAGENTITIES FROM STM_TWITTER_CELEBRITIES_SRC_0020 WHERE ID =1229243483900108801 EMIT CHANGES;
    +----------------------------------------------------------------------------------+----------------------------------------------------------------------------------+
    |ID                                                                                |HASHTAGENTITIES                                                                   |
    +----------------------------------------------------------------------------------+----------------------------------------------------------------------------------+
    |1229243483900108801                                                               |[{TEXT=Dhoni, START=51, END=57}, {TEXT=Dhonism, START=58, END=66}, {TEXT=happybirt|
    |                                                                                  |hdayABDevilliers, START=67, END=93}, {TEXT=Mr360, START=94, END=100}, {TEXT=ABDevi|
    |                                                                                  |lliers, START=101, END=114}]
    
  • Lets use EXPLODE function to take the array from the tweet of hashtags and/or users that have been mentioned in a tweet

    ksql> SELECT ID, EXPLODE(HASHTAGENTITIES)->TEXT AS HASHTAG FROM STM_TWITTER_CELEBRITIES_SRC_0020 WHERE ID =1229243483900108801 EMIT CHANGES;
    +----------------------------------------------------------------------------------+----------------------------------------------------------------------------------+
    |ID                                                                                |HASHTAG                                                                           |
    +----------------------------------------------------------------------------------+----------------------------------------------------------------------------------+
    |1229243483900108801                                                               |Dhoni                                                                             |
    |1229243483900108801                                                               |Dhonism                                                                           |
    |1229243483900108801                                                               |happybirthdayABDevilliers                                                         |
    |1229243483900108801                                                               |Mr360                                                                             |
    |1229243483900108801                                                               |ABDevilliers
    
  • Create a stream of hashtags (converted to lowercase)

    CREATE STREAM STM_TWITTER_CELEBRITIES_HASHTAGS AS SELECT ID, LCASE(EXPLODE(HASHTAGENTITIES)-> TEXT) AS HASHTAG FROM STM_TWITTER_CELEBRITIES_SRC_0020;
    
  • Create a table of hashtags and count each hashtags

    CREATE TABLE TBL_TWITTER_CELEBRITIES_HASHTAGS AS SELECT HASHTAG, COUNT(*) AS CT FROM STM_TWITTER_CELEBRITIES_HASHTAGS GROUP BY HASHTAG;
    
  • Issue Push Query on the table

    ksql> SELECT HASHTAG, CT FROM TBL_TWITTER_CELEBRITIES_HASHTAGS EMIT CHANGES;
    +----------------------------------------------------------------------------------+----------------------------------------------------------------------------------+
    |HASHTAG                                                                           |CT                                                                                |
    +----------------------------------------------------------------------------------+----------------------------------------------------------------------------------+
    |ipl12                                                                             |1                                                                                 |
    |cricketball                                                                       |1                                                                                 |
    |vineethsreenivasan                                                                |1                                                                                 |
    |ich2020                                                                           |1                                                                                 |
    
  • Issue Pull Query on the table, since we have materialized aggregate tables we can issue Pull Query

    ksql> SELECT HASHTAG, CT FROM TBL_TWITTER_CELEBRITIES_HASHTAGS WHERE ROWKEY='mohanlal';
    +----------------------------------------------------------------------------------+----------------------------------------------------------------------------------+
    |HASHTAG                                                                           |CT                                                                                |
    +----------------------------------------------------------------------------------+----------------------------------------------------------------------------------+
    |mohanlal                                                                          |1121                                                                              |
    Query terminated
    ksql>
    
  • Create a sink and offload this data to a PostgreSQL database

    CREATE SINK CONNECTOR TWITTER_CELEBRITIES_0010_SINK_POSTGRESS WITH (
        'connector.class'                  		= 'io.confluent.connect.jdbc.JdbcSinkConnector',
        'connection.url'                   		= 'jdbc:postgresql://entechlog-vm-01:5432/',
        'connection.user'                  		= 'postgres',
        'connection.password'              		= 'postgres',
        'topics'                           		= 'TBL_TWITTER_CELEBRITIES_HASHTAGS',
        'key.converter'                    		= 'org.apache.kafka.connect.storage.StringConverter',
        'value.converter'                  		= 'io.confluent.connect.avro.AvroConverter',
        'value.converter.schema.registry.url'   =  'http://entechlog-vm-01:8081',
        'auto.create'                      		= 'true',
        'insert.mode'                      		= 'upsert',
        'table.name.format'				= 'entechlog.tbl_twitter_celebrities_hashtags',
        'pk.mode'                          		= 'record_value',
        'pk.fields'                        		= 'HASHTAG',
        'transforms'                       		= 'dropSysCols',
        'transforms.dropSysCols.type'      		= 'org.apache.kafka.connect.transforms.ReplaceField$Value',
        'transforms.dropSysCols.blacklist' 		= 'ROWKEY,ROWTIME'
      );
    
  • Validate the data in PostgreSQL

    SELECT * FROM PG_CATALOG.PG_TABLES WHERE SCHEMANAME ='entechlog';
    
    SELECT * FROM entechlog.tbl_twitter_celebrities_hashtags ORDER BY 1 DESC;
    

Data Visualization using Apache Superset

Install Superset

# Install Superset dependencies
sudo apt-get install build-essential libssl-dev libffi-dev python-dev python-pip libsasl2-dev libldap2-dev
sudo apt-get install build-essential libssl-dev libffi-dev python3.6-dev python-pip libsasl2-dev libldap2-dev

# Python virtualenv
pip install virtualenv
sudo apt-get install python3-venv

# Create and activate a virtualenv
python3 -m venv venv
. venv/bin/activate

# Install setup tools
pip install --upgrade setuptools pip

# Install superset
pip install apache-superset

# Initialize the database
superset db upgrade

# Create an admin user (you will be prompted to set a username, first and last name before setting a password)
export FLASK_APP=superset
flask fab create-admin

# Load some data to play with, This is not a optional step
superset load_examples

# Create default roles and permissions
superset init

# Install Postgres Database dependencies
sudo apt-get install libpq-dev
pip install psycopg2

# Start web server on port 6066, use -p to bind to another port, use -h 0.0.0.0 so that superset can be accessed outside localhost
#superset run -p 8088 --with-threads --reload --debugger
superset run -h 0.0.0.0 -p 6066 --with-threads --reload --debugger
  • Access Superset using “http://hostname:6066/superset/welcome

  • Create new data source from Sources–>Database–>Add a new record, Here SQLAlchemy URI for PostgreSQL database will look like postgresql://postgres:XXXXXXXXXX@entechlog-vm-01/postgres

  • Create new data source from Sources–>Tables–>Add a new record. The table name will be same as the table from your PostgreSQL sink

  • Create new chart from Charts–>Add a new record. Here

    • Data Source -> Table Name
    • Visualization Type - > Word cloud
    • Series -> HASHTAG
    • Metric - > MAX(CT)
  • Now run and save the query into a dashboard, Your dashboard should look like below. This is the visual representation of hashtags for the twitter keywords for which we have real time data feed coming in.

References

Share this blog:
Comments

Related Articles