SET 'auto.offset.reset' = 'earliest';
CREATE STREAM twitter_raw ( \
CreatedAt bigint,Id bigint, Text VARCHAR, SOURCE VARCHAR, Truncated VARCHAR, InReplyToStatusId VARCHAR, InReplyToUserId VARCHAR, InReplyToScreenName VARCHAR, GeoLocation VARCHAR, Place VARCHAR, Favorited VARCHAR, Retweeted VARCHAR, FavoriteCount VARCHAR, User VARCHAR, Retweet VARCHAR, Contributors VARCHAR, RetweetCount VARCHAR, RetweetedByMe VARCHAR, CurrentUserRetweetId VARCHAR, PossiblySensitive VARCHAR, Lang VARCHAR, WithheldInCountries VARCHAR, HashtagEntities VARCHAR, UserMentionEntities VARCHAR, MediaEntities VARCHAR, SymbolEntities VARCHAR, URLEntities VARCHAR) \
WITH (KAFKA_TOPIC='twitter_json_01', partitions=12, VALUE_FORMAT='JSON');
CREATE STREAM twitter AS \
SELECT TIMESTAMPTOSTRING(CreatedAt, 'yyyy-MM-dd HH:mm:ss.SSS') AS CreatedAt,\
EXTRACTJSONFIELD(user,'$.Name') AS user_Name, \
EXTRACTJSONFIELD(user,'$.ScreenName') AS user_ScreenName, \
EXTRACTJSONFIELD(user,'$.Location') AS user_Location, \
EXTRACTJSONFIELD(user,'$.Description') AS user_Description, \
Text,hashtagentities,lang \
FROM twitter_raw WHERE LCASE(hashtagentities) like '%NDCSydney%';
SELECT USER_NAME, TEXT FROM TWITTER WHERE LCASE(TEXT) LIKE '%gamussa%';
CREATE TABLE user_tweet_count AS \
SELECT user_screenname, count(*) AS tweet_count \
FROM twitter WINDOW TUMBLING (SIZE 1 HOUR) WHERE LCASE(TEXT) LIKE '%gamussa%' \
GROUP BY user_screenname ;
CREATE TABLE USER_TWEET_COUNT_DISPLAY AS \
SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss.SSS') AS WINDOW_START,\
USER_SCREENNAME, \
TWEET_COUNT FROM user_tweet_count;
create table top_3_conf as \
SELECT WINDOW_START, USER_SCREENNAME, TWEET_COUNT \
FROM USER_TWEET_COUNT_DISPLAY \
WHERE TWEET_COUNT> 3;
select USER_SCREENNAME, TWEET_COUNT from top_3_conf;
view raw live_tweeting.sql hosted with ❤ by GitHub