Exploring ksqlDB, Superset with real time Twitter data feed of my favorite celebrities

  • Home
  • /
  • Exploring ksqlDB, Superset with real time Twitter data feed of my favorite celebrities
Exploring ksqlDB, Superset with real time Twitter data feed of my favorite celebrities

Exploring ksqlDB, Superset with real time Twitter data feed of my favorite celebrities

Ksql d b 17 Feb 2020
Table of Contents
Prerequisites
Ingest Twitter data

Create kafka scource connector and start ingesting data from twitter

CREATE SOURCE CONNECTOR TWITTER_CELEBRITIES_SRC_0020 WITH(
    "connector.class"='com.github.jcustenborder.kafka.connect.twitter.TwitterSourceConnector',
    "twitter.oauth.consumerKey"='xyz',
    "twitter.oauth.consumerSecret"='abc',
    "twitter.oauth.accessToken"='123',
    "twitter.oauth.accessTokenSecret"='789',
    "kafka.status.topic"='dev.etl.twitter.celebrities.src.0020',
    "key.converter"='io.confluent.connect.avro.AvroConverter',
    "key.converter.schemas.enable"='true',
    "key.converter.schema.registry.url"='http://localhost:8081',
    "value.converter"='io.confluent.connect.avro.AvroConverter',
    "value.converter.schemas.enable"='true',
    "value.converter.schema.registry.url"='http://localhost:8081',
    "process.deletes"=false,
    "filter.keywords"='mohanlal,dhoni'
);

Check status of connector using

ksql> DESCRIBE CONNECTOR TWITTER_CELEBRITIES_SRC_0020;
Name                 : TWITTER_CELEBRITIES_SRC_0020
Class                : com.github.jcustenborder.kafka.connect.twitter.TwitterSourceConnector
Type                 : source
State                : RUNNING
WorkerId             : 127.0.1.1:8083
 Task ID | State   | Error Trace
---------------------------------
 0       | RUNNING |
---------------------------------
ksql>

Check status of topic using, If we had tweets coming in you should see the topic created

ksql> SHOW TOPICS;
 Kafka Topic                                                                                   | Partitions | Partition Replicas
---------------------------------------------------------------------------------------------------------------------------------
 _confluent-command                                                                            | 1          | 1
 _confluent-controlcenter-5-4-0-1-actual-group-consumption-rekey                               | 2          | 1
 dev.etl.twitter.celebrities.src.0020                                                          | 1          | 1

Check messages in topic using 

ksql> PRINT 'dev.etl.twitter.celebrities.src.0020' FROM BEGINNING LIMIT 1;
Format:AVRO
2/16/20 5:00:40 PM EST, y������", {"CreatedAt": 1581890434000, "Id": 1229163687396728832, "Text": "You can check here the Indian Premier League(IPL) 13 CSK team full squad 2020.\n#Yellove #WhistlePodu 🦁💛 #RR #KKR #SRH #CSK #MI #RCB #Virat #Kohli #Dhoni #IPL12 #IPLMeme #cricketball #cricketnews #IPL2020 #IPL13 #TeamIndia #ViratKohli #Mahi \nhttps://t.co/295eLd2xNJ", "Source": "<a href=\"https://www.hootsuite.com\" rel=\"nofollow\">Hootsuite Inc.</a>", "Truncated": true, "InReplyToStatusId": -1, "InReplyToUserId": -1, "InReplyToScreenName": null, "GeoLocation": null, "Place": null, "Favorited": false, "Retweeted": false, "FavoriteCount": 0, "User": {"Id": 1067662737675710466, "Name": "Tentaran Sports", "ScreenName": "TentaranSports", "Location": "New Delhi, India", "Description": "#INDvNZ - India and New zealand team squad for New zealand tour of India 2020.\n👇👇👇👇👇👇", "ContributorsEnabled": false, "ProfileImageURL": "http://pbs.twimg.com/profile_images/1088038065598164994/wQxPihWz_normal.jpg", "BiggerProfileImageURL": "http://pbs.twimg.com/profile_images/1088038065598164994/wQxPihWz_bigger.jpg", "MiniProfileImageURL": "http://pbs.twimg.com/profile_images/1088038065598164994/wQxPihWz_mini.jpg", "OriginalProfileImageURL": "http://pbs.twimg.com/profile_images/1088038065598164994/wQxPihWz.jpg", "ProfileImageURLHttps": "https://pbs.twimg.com/profile_images/1088038065598164994/wQxPihWz_normal.jpg", "BiggerProfileImageURLHttps": "https://pbs.twimg.com/profile_images/1088038065598164994/wQxPihWz_bigger.jpg", "MiniProfileImageURLHttps": "https://pbs.twimg.com/profile_images/1088038065598164994/wQxPihWz_mini.jpg", "OriginalProfileImageURLHttps": "https://pbs.twimg.com/profile_images/1088038065598164994/wQxPihWz.jpg", "DefaultProfileImage": false, "URL": "https://www.tentaran.com/indian-team-squad-for-new-zealand-tour-2020/", "Protected": false, "FollowersCount": 1167, "ProfileBackgroundColor": "000000", "ProfileTextColor": "000000", "ProfileLinkColor": "1B95E0", "ProfileSidebarFillColor": "000000", "ProfileSidebarBorderColor": "000000", "ProfileUseBackgroundImage": false, "DefaultProfile": false, "ShowAllInlineMedia": false, "FriendsCount": 862, "CreatedAt": 1543385607000, "FavouritesCount": 7155, "UtcOffset": -1, "TimeZone": null, "ProfileBackgroundImageURL": "http://abs.twimg.com/images/themes/theme1/bg.png", "ProfileBackgroundImageUrlHttps": "https://abs.twimg.com/images/themes/theme1/bg.png", "ProfileBannerURL": "https://pbs.twimg.com/profile_banners/1067662737675710466/1575021259/web", "ProfileBannerRetinaURL": "https://pbs.twimg.com/profile_banners/1067662737675710466/1575021259/web_retina", "ProfileBannerIPadURL": "https://pbs.twimg.com/profile_banners/1067662737675710466/1575021259/ipad", "ProfileBannerIPadRetinaURL": "https://pbs.twimg.com/profile_banners/1067662737675710466/1575021259/ipad_retina", "ProfileBannerMobileURL": "https://pbs.twimg.com/profile_banners/1067662737675710466/1575021259/mobile", "ProfileBannerMobileRetinaURL": "https://pbs.twimg.com/profile_banners/1067662737675710466/1575021259/mobile_retina", "ProfileBackgroundTiled": false, "Lang": null, "StatusesCount": 23426, "GeoEnabled": true, "Verified": false, "Translator": false, "ListedCount": 0, "FollowRequestSent": false, "WithheldInCountries": []}, "Retweet": false, "Contributors": [], "RetweetCount": 0, "RetweetedByMe": false, "CurrentUserRetweetId": -1, "PossiblySensitive": false, "Lang": "en", "WithheldInCountries": [], "HashtagEntities": [{"Text": "Yellove", "Start": 79, "End": 87}, {"Text": "WhistlePodu", "Start": 88, "End": 100}, {"Text": "RR", "Start": 104, "End": 107}, {"Text": "KKR", "Start": 108, "End": 112}, {"Text": "SRH", "Start": 113, "End": 117}, {"Text": "CSK", "Start": 118, "End": 122}, {"Text": "MI", "Start": 123, "End": 126}, {"Text": "RCB", "Start": 127, "End": 131}, {"Text": "Virat", "Start": 132, "End": 138}, {"Text": "Kohli", "Start": 139, "End": 145}, {"Text": "Dhoni", "Start": 146, "End": 152}, {"Text": "IPL12", "Start": 153, "End": 159}, {"Text": "IPLMeme", "Start": 160, "End": 168}, {"Text": "cricketball", "Start": 169, "End": 181}, {"Text": "cricketnews", "Start": 182, "End": 194}, {"Text": "IPL2020", "Start": 195, "End": 203}, {"Text": "IPL13", "Start": 204, "End": 210}, {"Text": "TeamIndia", "Start": 211, "End": 221}, {"Text": "ViratKohli", "Start": 222, "End": 233}, {"Text": "Mahi", "Start": 234, "End": 239}], "UserMentionEntities": [], "MediaEntities": [], "SymbolEntities": [], "URLEntities": [{"URL": "https://t.co/295eLd2xNJ", "Text": "https://t.co/295eLd2xNJ", "ExpandedURL": "https://www.tentaran.com/ipl-chennai-super-kings-team-2020-players-list-csk/", "Start": 241, "End": 264, "DisplayURL": "tentaran.com/ipl-chennai-su\u2026"}]}
ksql>

Now lets create a stream on this topic using

CREATE STREAM STM_TWITTER_CELEBRITIES_SRC_0020 WITH (KAFKA_TOPIC='dev.etl.twitter.celebrities.src.0020', VALUE_FORMAT='Avro');

Issue this command to start reading data from the beginning of the topic

SET 'auto.offset.reset'='earliest';

Selecting/Filtering columns with ksqlDB queries Push Query

SELECT USER->SCREENNAME, LANG, TEXT FROM STM_TWITTER_CELEBRITIES_SRC_0020 EMIT CHANGES;

Pull Query

SELECT USER->SCREENNAME, LANG, TEXT FROM STM_TWITTER_CELEBRITIES_SRC_0020 WHERE LANG = 'en';

You can’t run a pull against stream, KSQL currently only supports pull queries on materialized aggregate tables. i.e. those created by a CREATE TABLE AS SELECT , FROM GROUP BY style statement.

Aggregates in ksqlDB

ksql> SELECT LANG,COUNT(*) FROM STM_TWITTER_CELEBRITIES_SRC_0020 GROUP BY LANG EMIT CHANGES;
+----------------------------------------------------------------------------------+----------------------------------------------------------------------------------+
|LANG                                                                              |KSQL_COL_1                                                                        |
+----------------------------------------------------------------------------------+----------------------------------------------------------------------------------+
|cy                                                                                |1                                                                                 |
|sv                                                                                |1                                                                                 |
|gu                                                                                |3                                                                                 |
|ro                                                                                |1                                                                                 |
|no                                                                                |1                                                                                 |
|te                                                                                |2                                                                                 |
|sl                                                                                |1                                                                                 |
|or                                                                                |1                                                                                 |

Now lets examine a record which is a candidate for EXPLODE

ksql> SELECT ID, HASHTAGENTITIES FROM STM_TWITTER_CELEBRITIES_SRC_0020 WHERE ID =1229243483900108801 EMIT CHANGES;
+----------------------------------------------------------------------------------+----------------------------------------------------------------------------------+
|ID                                                                                |HASHTAGENTITIES                                                                   |
+----------------------------------------------------------------------------------+----------------------------------------------------------------------------------+
|1229243483900108801                                                               |[{TEXT=Dhoni, START=51, END=57}, {TEXT=Dhonism, START=58, END=66}, {TEXT=happybirt|
|                                                                                  |hdayABDevilliers, START=67, END=93}, {TEXT=Mr360, START=94, END=100}, {TEXT=ABDevi|
|                                                                                  |lliers, START=101, END=114}]

Lets use EXPLODE function to take the array from the tweet of hashtags and/or users that have been mentioned in a tweet

ksql> SELECT ID, EXPLODE(HASHTAGENTITIES)->TEXT AS HASHTAG FROM STM_TWITTER_CELEBRITIES_SRC_0020 WHERE ID =1229243483900108801 EMIT CHANGES;
+----------------------------------------------------------------------------------+----------------------------------------------------------------------------------+
|ID                                                                                |HASHTAG                                                                           |
+----------------------------------------------------------------------------------+----------------------------------------------------------------------------------+
|1229243483900108801                                                               |Dhoni                                                                             |
|1229243483900108801                                                               |Dhonism                                                                           |
|1229243483900108801                                                               |happybirthdayABDevilliers                                                         |
|1229243483900108801                                                               |Mr360                                                                             |
|1229243483900108801                                                               |ABDevilliers

Create a stream of hashtags (converted to lowercase)

CREATE STREAM STM_TWITTER_CELEBRITIES_HASHTAGS AS SELECT ID, LCASE(EXPLODE(HASHTAGENTITIES)-> TEXT) AS HASHTAG FROM STM_TWITTER_CELEBRITIES_SRC_0020;

Create a table of hashtags and count each hashtags

CREATE TABLE TBL_TWITTER_CELEBRITIES_HASHTAGS AS SELECT HASHTAG, COUNT(*) AS CT FROM STM_TWITTER_CELEBRITIES_HASHTAGS GROUP BY HASHTAG;

Issue Push Query on the table

ksql> SELECT HASHTAG, CT FROM TBL_TWITTER_CELEBRITIES_HASHTAGS EMIT CHANGES;
+----------------------------------------------------------------------------------+----------------------------------------------------------------------------------+
|HASHTAG                                                                           |CT                                                                                |
+----------------------------------------------------------------------------------+----------------------------------------------------------------------------------+
|ipl12                                                                             |1                                                                                 |
|cricketball                                                                       |1                                                                                 |
|vineethsreenivasan                                                                |1                                                                                 |
|ich2020                                                                           |1                                                                                 |

Issue Pull Query on the table, since we have materialized aggregate tables we can issue Pull Query

ksql> SELECT HASHTAG, CT FROM TBL_TWITTER_CELEBRITIES_HASHTAGS WHERE ROWKEY='mohanlal';
+----------------------------------------------------------------------------------+----------------------------------------------------------------------------------+
|HASHTAG                                                                           |CT                                                                                |
+----------------------------------------------------------------------------------+----------------------------------------------------------------------------------+
|mohanlal                                                                          |1121                                                                              |
Query terminated
ksql>

Create a sink and offload this data to a PostgreSQL database

CREATE SINK CONNECTOR TWITTER_CELEBRITIES_0010_SINK_POSTGRESS WITH (
    'connector.class'                  		= 'io.confluent.connect.jdbc.JdbcSinkConnector',
    'connection.url'                   		= 'jdbc:postgresql://entechlog-vm-01:5432/',
    'connection.user'                  		= 'postgres',
    'connection.password'              		= 'postgres',
    'topics'                           		= 'TBL_TWITTER_CELEBRITIES_HASHTAGS',
    'key.converter'                    		= 'org.apache.kafka.connect.storage.StringConverter',
    'value.converter'                  		= 'io.confluent.connect.avro.AvroConverter',
    'value.converter.schema.registry.url'   =  'http://entechlog-vm-01:8081',
    'auto.create'                      		= 'true',
    'insert.mode'                      		= 'upsert',
    'table.name.format'				= 'entechlog.tbl_twitter_celebrities_hashtags',
    'pk.mode'                          		= 'record_value',
    'pk.fields'                        		= 'HASHTAG',
    'transforms'                       		= 'dropSysCols',
    'transforms.dropSysCols.type'      		= 'org.apache.kafka.connect.transforms.ReplaceField$Value',
    'transforms.dropSysCols.blacklist' 		= 'ROWKEY,ROWTIME'
  );

Validate the data in PostgreSQL

SELECT * FROM PG_CATALOG.PG_TABLES WHERE SCHEMANAME ='entechlog';
SELECT * FROM entechlog.tbl_twitter_celebrities_hashtags ORDER BY 1 DESC;
Data Visualization using Apache Superset

Install Superset

# Install Superset dependencies
sudo apt-get install build-essential libssl-dev libffi-dev python-dev python-pip libsasl2-dev libldap2-dev
sudo apt-get install build-essential libssl-dev libffi-dev python3.6-dev python-pip libsasl2-dev libldap2-dev

# Python virtualenv
pip install virtualenv
sudo apt-get install python3-venv

# Create and activate a virtualenv
python3 -m venv venv
. venv/bin/activate

# Install setup tools
pip install --upgrade setuptools pip

# Install superset
pip install apache-superset

# Initialize the database
superset db upgrade

# Create an admin user (you will be prompted to set a username, first and last name before setting a password)
export FLASK_APP=superset
flask fab create-admin

# Load some data to play with, This is not a optional step
superset load_examples

# Create default roles and permissions
superset init

# Install Postgres Database dependencies
sudo apt-get install libpq-dev
pip install psycopg2

# Start web server on port 6066, use -p to bind to another port, use -h 0.0.0.0 so that superset can be accessed outside localhost
#superset run -p 8088 --with-threads --reload --debugger
superset run -h 0.0.0.0 -p 6066 --with-threads --reload --debugger

Access Superset using “http://hostname:6066/superset/welcome

Create new data source from Sources–>Database–>Add a new record, Here SQLAlchemy URI for PostgreSQL database will look like postgresql://postgres:XXXXXXXXXX@entechlog-vm-01/postgres

post thumb

Create new data source from Sources–>Tables–>Add a new record. The table name will be same as the table from your PostgreSQL sink

Create new chart from Charts–>Add a new record. Here Data Source -> Table Name Visualization Type - > Word cloud Series -> HASHTAG Metric - > MAX(CT)

Now run and save the query into a dashboard, Your dashboard should look like below. This is the visual representation of hashtags for the twitter keywords for which we have real time data feed coming in.

post thumb
References

Top

Share: