KSQL Commands
Confluent KSQL is the streaming SQL engine that enables real-time data processing against Apache Kafka®. It provides an …
Create Kafka source connector and start ingesting data from twitter
CREATE SOURCE CONNECTOR TWITTER_CELEBRITIES_SRC_0020 WITH(
"connector.class"='com.github.jcustenborder.kafka.connect.twitter.TwitterSourceConnector',
"twitter.oauth.consumerKey"='xyz',
"twitter.oauth.consumerSecret"='abc',
"twitter.oauth.accessToken"='123',
"twitter.oauth.accessTokenSecret"='789',
"kafka.status.topic"='dev.etl.twitter.celebrities.src.0020',
"key.converter"='io.confluent.connect.avro.AvroConverter',
"key.converter.schemas.enable"='true',
"key.converter.schema.registry.url"='http://localhost:8081',
"value.converter"='io.confluent.connect.avro.AvroConverter',
"value.converter.schemas.enable"='true',
"value.converter.schema.registry.url"='http://localhost:8081',
"process.deletes"=false,
"filter.keywords"='mohanlal,dhoni'
);
Check status of connector using
ksql> DESCRIBE CONNECTOR TWITTER_CELEBRITIES_SRC_0020;
Name : TWITTER_CELEBRITIES_SRC_0020
Class : com.github.jcustenborder.kafka.connect.twitter.TwitterSourceConnector
Type : source
State : RUNNING
WorkerId : 127.0.1.1:8083
Task ID | State | Error Trace
---------------------------------
0 | RUNNING |
---------------------------------
ksql>
Check status of topic using, If we had tweets coming in you should see the topic created
ksql> SHOW TOPICS;
Kafka Topic | Partitions | Partition Replicas
---------------------------------------------------------------------------------------------------------------------------------
_confluent-command | 1 | 1
_confluent-controlcenter-5-4-0-1-actual-group-consumption-rekey | 2 | 1
dev.etl.twitter.celebrities.src.0020 | 1 | 1
Check messages in topic using
ksql> PRINT 'dev.etl.twitter.celebrities.src.0020' FROM BEGINNING LIMIT 1;
Format:AVRO
2/16/20 5:00:40 PM EST, y������", {"CreatedAt": 1581890434000, "Id": 1229163687396728832, "Text": "You can check here the Indian Premier League(IPL) 13 CSK team full squad 2020.\n#Yellove #WhistlePodu 🦁💛 #RR #KKR #SRH #CSK #MI #RCB #Virat #Kohli #Dhoni #IPL12 #IPLMeme #cricketball #cricketnews #IPL2020 #IPL13 #TeamIndia #ViratKohli #Mahi \nhttps://t.co/295eLd2xNJ", "Source": "<a href=\"https://www.hootsuite.com\" rel=\"nofollow\">Hootsuite Inc.</a>", "Truncated": true, "InReplyToStatusId": -1, "InReplyToUserId": -1, "InReplyToScreenName": null, "GeoLocation": null, "Place": null, "Favorited": false, "Retweeted": false, "FavoriteCount": 0, "User": {"Id": 1067662737675710466, "Name": "Tentaran Sports", "ScreenName": "TentaranSports", "Location": "New Delhi, India", "Description": "#INDvNZ - India and New zealand team squad for New zealand tour of India 2020.\n👇👇👇👇👇👇", "ContributorsEnabled": false, "ProfileImageURL": "http://pbs.twimg.com/profile_images/1088038065598164994/wQxPihWz_normal.jpg", "BiggerProfileImageURL": "http://pbs.twimg.com/profile_images/1088038065598164994/wQxPihWz_bigger.jpg", "MiniProfileImageURL": "http://pbs.twimg.com/profile_images/1088038065598164994/wQxPihWz_mini.jpg", "OriginalProfileImageURL": "http://pbs.twimg.com/profile_images/1088038065598164994/wQxPihWz.jpg", "ProfileImageURLHttps": "https://pbs.twimg.com/profile_images/1088038065598164994/wQxPihWz_normal.jpg", "BiggerProfileImageURLHttps": "https://pbs.twimg.com/profile_images/1088038065598164994/wQxPihWz_bigger.jpg", "MiniProfileImageURLHttps": "https://pbs.twimg.com/profile_images/1088038065598164994/wQxPihWz_mini.jpg", "OriginalProfileImageURLHttps": "https://pbs.twimg.com/profile_images/1088038065598164994/wQxPihWz.jpg", "DefaultProfileImage": false, "URL": "https://www.tentaran.com/indian-team-squad-for-new-zealand-tour-2020/", "Protected": false, "FollowersCount": 1167, "ProfileBackgroundColor": "000000", "ProfileTextColor": "000000", "ProfileLinkColor": "1B95E0", "ProfileSidebarFillColor": "000000", "ProfileSidebarBorderColor": "000000", "ProfileUseBackgroundImage": false, "DefaultProfile": false, "ShowAllInlineMedia": false, "FriendsCount": 862, "CreatedAt": 1543385607000, "FavouritesCount": 7155, "UtcOffset": -1, "TimeZone": null, "ProfileBackgroundImageURL": "http://abs.twimg.com/images/themes/theme1/bg.png", "ProfileBackgroundImageUrlHttps": "https://abs.twimg.com/images/themes/theme1/bg.png", "ProfileBannerURL": "https://pbs.twimg.com/profile_banners/1067662737675710466/1575021259/web", "ProfileBannerRetinaURL": "https://pbs.twimg.com/profile_banners/1067662737675710466/1575021259/web_retina", "ProfileBannerIPadURL": "https://pbs.twimg.com/profile_banners/1067662737675710466/1575021259/ipad", "ProfileBannerIPadRetinaURL": "https://pbs.twimg.com/profile_banners/1067662737675710466/1575021259/ipad_retina", "ProfileBannerMobileURL": "https://pbs.twimg.com/profile_banners/1067662737675710466/1575021259/mobile", "ProfileBannerMobileRetinaURL": "https://pbs.twimg.com/profile_banners/1067662737675710466/1575021259/mobile_retina", "ProfileBackgroundTiled": false, "Lang": null, "StatusesCount": 23426, "GeoEnabled": true, "Verified": false, "Translator": false, "ListedCount": 0, "FollowRequestSent": false, "WithheldInCountries": []}, "Retweet": false, "Contributors": [], "RetweetCount": 0, "RetweetedByMe": false, "CurrentUserRetweetId": -1, "PossiblySensitive": false, "Lang": "en", "WithheldInCountries": [], "HashtagEntities": [{"Text": "Yellove", "Start": 79, "End": 87}, {"Text": "WhistlePodu", "Start": 88, "End": 100}, {"Text": "RR", "Start": 104, "End": 107}, {"Text": "KKR", "Start": 108, "End": 112}, {"Text": "SRH", "Start": 113, "End": 117}, {"Text": "CSK", "Start": 118, "End": 122}, {"Text": "MI", "Start": 123, "End": 126}, {"Text": "RCB", "Start": 127, "End": 131}, {"Text": "Virat", "Start": 132, "End": 138}, {"Text": "Kohli", "Start": 139, "End": 145}, {"Text": "Dhoni", "Start": 146, "End": 152}, {"Text": "IPL12", "Start": 153, "End": 159}, {"Text": "IPLMeme", "Start": 160, "End": 168}, {"Text": "cricketball", "Start": 169, "End": 181}, {"Text": "cricketnews", "Start": 182, "End": 194}, {"Text": "IPL2020", "Start": 195, "End": 203}, {"Text": "IPL13", "Start": 204, "End": 210}, {"Text": "TeamIndia", "Start": 211, "End": 221}, {"Text": "ViratKohli", "Start": 222, "End": 233}, {"Text": "Mahi", "Start": 234, "End": 239}], "UserMentionEntities": [], "MediaEntities": [], "SymbolEntities": [], "URLEntities": [{"URL": "https://t.co/295eLd2xNJ", "Text": "https://t.co/295eLd2xNJ", "ExpandedURL": "https://www.tentaran.com/ipl-chennai-super-kings-team-2020-players-list-csk/", "Start": 241, "End": 264, "DisplayURL": "tentaran.com/ipl-chennai-su\u2026"}]}
ksql>
Now lets create a stream on this topic using
CREATE STREAM STM_TWITTER_CELEBRITIES_SRC_0020 WITH (KAFKA_TOPIC='dev.etl.twitter.celebrities.src.0020', VALUE_FORMAT='Avro');
Issue this command to start reading data from the beginning of the topic
SET 'auto.offset.reset'='earliest';
Selecting/Filtering columns with ksqlDB queries
Push Query
SELECT USER->SCREENNAME, LANG, TEXT FROM STM_TWITTER_CELEBRITIES_SRC_0020 EMIT CHANGES;
Pull Query
SELECT USER->SCREENNAME, LANG, TEXT FROM STM_TWITTER_CELEBRITIES_SRC_0020 WHERE LANG = 'en';
You can’t run a pull against stream, KSQL currently only supports pull queries on materialized aggregate tables. i.e. those created by a CREATE TABLE AS SELECT , FROM GROUP BY
style statement.
Aggregates in ksqlDB
ksql> SELECT LANG,COUNT(*) FROM STM_TWITTER_CELEBRITIES_SRC_0020 GROUP BY LANG EMIT CHANGES;
+----------------------------------------------------------------------------------+----------------------------------------------------------------------------------+
|LANG |KSQL_COL_1 |
+----------------------------------------------------------------------------------+----------------------------------------------------------------------------------+
|cy |1 |
|sv |1 |
|gu |3 |
|ro |1 |
|no |1 |
|te |2 |
|sl |1 |
|or |1 |
Now lets examine a record which is a candidate for EXPLODE
ksql> SELECT ID, HASHTAGENTITIES FROM STM_TWITTER_CELEBRITIES_SRC_0020 WHERE ID =1229243483900108801 EMIT CHANGES;
+----------------------------------------------------------------------------------+----------------------------------------------------------------------------------+
|ID |HASHTAGENTITIES |
+----------------------------------------------------------------------------------+----------------------------------------------------------------------------------+
|1229243483900108801 |[{TEXT=Dhoni, START=51, END=57}, {TEXT=Dhonism, START=58, END=66}, {TEXT=happybirt|
| |hdayABDevilliers, START=67, END=93}, {TEXT=Mr360, START=94, END=100}, {TEXT=ABDevi|
| |lliers, START=101, END=114}]
Lets use EXPLODE function to take the array from the tweet of hashtags and/or users that have been mentioned in a tweet
ksql> SELECT ID, EXPLODE(HASHTAGENTITIES)->TEXT AS HASHTAG FROM STM_TWITTER_CELEBRITIES_SRC_0020 WHERE ID =1229243483900108801 EMIT CHANGES;
+----------------------------------------------------------------------------------+----------------------------------------------------------------------------------+
|ID |HASHTAG |
+----------------------------------------------------------------------------------+----------------------------------------------------------------------------------+
|1229243483900108801 |Dhoni |
|1229243483900108801 |Dhonism |
|1229243483900108801 |happybirthdayABDevilliers |
|1229243483900108801 |Mr360 |
|1229243483900108801 |ABDevilliers
Create a stream of hashtags (converted to lowercase)
CREATE STREAM STM_TWITTER_CELEBRITIES_HASHTAGS AS SELECT ID, LCASE(EXPLODE(HASHTAGENTITIES)-> TEXT) AS HASHTAG FROM STM_TWITTER_CELEBRITIES_SRC_0020;
Create a table of hashtags and count each hashtags
CREATE TABLE TBL_TWITTER_CELEBRITIES_HASHTAGS AS SELECT HASHTAG, COUNT(*) AS CT FROM STM_TWITTER_CELEBRITIES_HASHTAGS GROUP BY HASHTAG;
Issue Push Query on the table
ksql> SELECT HASHTAG, CT FROM TBL_TWITTER_CELEBRITIES_HASHTAGS EMIT CHANGES;
+----------------------------------------------------------------------------------+----------------------------------------------------------------------------------+
|HASHTAG |CT |
+----------------------------------------------------------------------------------+----------------------------------------------------------------------------------+
|ipl12 |1 |
|cricketball |1 |
|vineethsreenivasan |1 |
|ich2020 |1 |
Issue Pull Query on the table, since we have materialized aggregate tables we can issue Pull Query
ksql> SELECT HASHTAG, CT FROM TBL_TWITTER_CELEBRITIES_HASHTAGS WHERE ROWKEY='mohanlal';
+----------------------------------------------------------------------------------+----------------------------------------------------------------------------------+
|HASHTAG |CT |
+----------------------------------------------------------------------------------+----------------------------------------------------------------------------------+
|mohanlal |1121 |
Query terminated
ksql>
Create a sink and offload this data to a PostgreSQL database
CREATE SINK CONNECTOR TWITTER_CELEBRITIES_0010_SINK_POSTGRESS WITH (
'connector.class' = 'io.confluent.connect.jdbc.JdbcSinkConnector',
'connection.url' = 'jdbc:postgresql://entechlog-vm-01:5432/',
'connection.user' = 'postgres',
'connection.password' = 'postgres',
'topics' = 'TBL_TWITTER_CELEBRITIES_HASHTAGS',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'value.converter' = 'io.confluent.connect.avro.AvroConverter',
'value.converter.schema.registry.url' = 'http://entechlog-vm-01:8081',
'auto.create' = 'true',
'insert.mode' = 'upsert',
'table.name.format' = 'entechlog.tbl_twitter_celebrities_hashtags',
'pk.mode' = 'record_value',
'pk.fields' = 'HASHTAG',
'transforms' = 'dropSysCols',
'transforms.dropSysCols.type' = 'org.apache.kafka.connect.transforms.ReplaceField$Value',
'transforms.dropSysCols.blacklist' = 'ROWKEY,ROWTIME'
);
Validate the data in PostgreSQL
SELECT * FROM PG_CATALOG.PG_TABLES WHERE SCHEMANAME ='entechlog';
SELECT * FROM entechlog.tbl_twitter_celebrities_hashtags ORDER BY 1 DESC;
# Install Superset dependencies
sudo apt-get install build-essential libssl-dev libffi-dev python-dev python-pip libsasl2-dev libldap2-dev
sudo apt-get install build-essential libssl-dev libffi-dev python3.6-dev python-pip libsasl2-dev libldap2-dev
# Python virtualenv
pip install virtualenv
sudo apt-get install python3-venv
# Create and activate a virtualenv
python3 -m venv venv
. venv/bin/activate
# Install setup tools
pip install --upgrade setuptools pip
# Install superset
pip install apache-superset
# Initialize the database
superset db upgrade
# Create an admin user (you will be prompted to set a username, first and last name before setting a password)
export FLASK_APP=superset
flask fab create-admin
# Load some data to play with, This is not a optional step
superset load_examples
# Create default roles and permissions
superset init
# Install Postgres Database dependencies
sudo apt-get install libpq-dev
pip install psycopg2
# Start web server on port 6066, use -p to bind to another port, use -h 0.0.0.0 so that superset can be accessed outside localhost
#superset run -p 8088 --with-threads --reload --debugger
superset run -h 0.0.0.0 -p 6066 --with-threads --reload --debugger
Access Superset using “http://hostname:6066/superset/welcome”
Create new data source from Sources–>Database–>Add a new record, Here SQLAlchemy URI for PostgreSQL database will look like postgresql://postgres:XXXXXXXXXX@entechlog-vm-01/postgres
Create new data source from Sources–>Tables–>Add a new record. The table name will be same as the table from your PostgreSQL sink
Create new chart from Charts–>Add a new record. Here
Now run and save the query into a dashboard, Your dashboard should look like below. This is the visual representation of hashtags for the twitter keywords for which we have real time data feed coming in.
Confluent KSQL is the streaming SQL engine that enables real-time data processing against Apache Kafka®. It provides an …
In this article we will discuss some of the common Confluent Kafka challenges and solutions to fix them. Issue 01 Issue …