Near Real Time Prediction On Sentiment Of Tweets By Injecting Data Into Elasticsearch And Displaying On Kibana
A simple demo of near realtime sentiment analysis on Tweets using Elasticsearch, Kibana and Twitter API in Python 3
Interested in president Trump's perfomance ten month after his inauguration, I use Twitter public stream to analyze people's sentiment on topics mentioned his name in a near rea-time way. In this demo, I will show how to pipe data from Twitter, predict the sentiment with Textblob, inject Hot Data to ElasticSeach using Twitter APIs and, finally, present the output on Kibana Dashboard.
The following screenshot is one example of showing poeple's sentiment to president Trump's role in Topics (Twitter Hashtag) on the Kibana dashboard. To access the the animation, please visit the interaction with the Dashboard
For simplicity, sentiment of Tweets are predicted by trained, and the output are injected into the locally installed Elasticsearch.
For Mac users who'd like to install manually, please check You may add the enviroment variable to your bash profile as described in the webpage above or change directory to unzipped file and then, launch Elasticsearch by typing in 'bin/elasticsearch'in Terminal.
To install Kibana to get the "Hot Data" visualized, please check
import json
from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream
from textblob import TextBlob #predict the sentiment of Tweet, see 'https://textblob.readthedocs.io/en/dev/'
from elasticsearch import Elasticsearch #pip install Elasticsearch if not intalled yet
from datetime import datetime
import calendar
import numpy as np
from http.client import IncompleteRead
#log in to your Twitter Application Management to create an App, url: 'https://apps.twitter.com'
consumer_key = '<Twitter_Consumer_Key>'
consumer_secret = '<Twitter_Consumer_Secret>'
access_token = '<Twitter_Access_Token>'
access_token_secret = '<Twitter_Access_Token_Secret>'
# create instance of elasticsearch
es = Elasticsearch()
class TweetStreamListener(StreamListener):
# re-write the on_data function in the TweetStreamListener
# This function enables more functions than 'on_status', see 'https://stackoverflow.com/questions/31054656/what-is-the-difference-between-on-data-and-on-status-in-the-tweepy-library'
def on_data(self, data):
# To understand the key-values pulled from Twitter, see 'https://dev.twitter.com/overview/api/tweets'
dict_data = json.loads(data)
# pass Tweet into TextBlob to predict the sentiment
tweet = TextBlob(dict_data["text"]) if "text" in dict_data.keys() else None
# if the object contains Tweet
if tweet:
# determine if sentiment is positive, negative, or neutral
if tweet.sentiment.polarity < 0:
sentiment = "negative"
elif tweet.sentiment.polarity == 0:
sentiment = "neutral"
else:
sentiment = "positive"
# print the predicted sentiment with the Tweets
print(sentiment, tweet.sentiment.polarity, dict_data["text"])
# extract the first hashtag from the object
# transform the Hashtags into proper case
if len(dict_data["entities"]["hashtags"])>0:
hashtags=dict_data["entities"]["hashtags"][0]["text"].title()
else:
#Elasticeach does not take None object
hashtags="None"
# add text and sentiment info to elasticsearch
es.index(index="logstash-a",
# create/inject data into the cluster with index as 'logstash-a'
# create the naming pattern in Management/Kinaba later in order to push the data to a dashboard
doc_type="test-type",
body={"author": dict_data["user"]["screen_name"],
"followers":dict_data["user"]["followers_count"],
#parse the milliscond since epoch to elasticsearch and reformat into datatime stamp in Kibana later
"date": datetime.strptime(dict_data["created_at"], '%a %b %d %H:%M:%S %z %Y'),
"message": dict_data["text"] if "text" in dict_data.keys() else " ",
"hashtags":hashtags,
"polarity": tweet.sentiment.polarity,
"subjectivity": tweet.sentiment.subjectivity,
"sentiment": sentiment})
return True
# on failure, print the error code and do not disconnect
def on_error(self, status):
print(status)
if __name__ == '__main__':
# create instance of the tweepy tweet stream listener
listener = TweetStreamListener()
# set twitter keys/tokens
auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
# The most exception break up the kernel in my test is ImcompleteRead. This exception handler ensures
# the stream to resume when breaking up by ImcompleteRead
while True:
try:
# create instance of the tweepy stream
stream = Stream(auth, listener)
# search twitter for keyword "trump"
stream.filter(track=['trump'])
except IncompleteRead:
continue
except KeyboardInterrupt:
# or however you want to exit this loop
stream.disconnect()
break
-
Launch Elasticsearch
- type in Elasticsearch in Terminal if the environment variable has already been added to the bash profile
- or change the directory to the unzipped Elasticsearch folder and type ./bin/elasticsearch to launch
-
Run the python script
-
In a few seconds after the python script starts to run, launch Kibanada by changing the directory to the unzipped Kibana folder and type ./bin/kibana
- go to http://localhost:5601, move to Management --> Index -->+ Create Index Pattern to create an index name or pattern
- check for the of Kibana index pattern. In this example, the index name in the Python script is logstash-a, so I set the naming pattern in as logstash-*. 'date' is the millisecond since epoch attribute I set in the Python script, so I choose date from the drag down box 'Time Filter field name'
- click 'Create' to create the index pattern
-
To transform the date from millisecond to readable datetimestamp, select + Add Scripted Field in Index Patterns
-
Select Language, Type, Format as the following graph and add doc['date'].value to the field of script
-
See about how to build up the customized graphs in Visualize and put them together in Dashboard