Sentiment Analysis Using Azure Streaming Analytics

This blog is a short write-up about the sentiment analysis workflow I developed during the first Covid lockdown.  You can find the dashboard website I created here and a blog I had written in my institute here.

Basically it's an Azure and c# solution. The main components involve Azure streaming analytics, Azure Event Hubs, and Azure Blob Store.

The code for the sample app can be found here. This sample app analyzes real-time Twitter feeds using sentiment140 (http://www.sentiment140.com/) to determine whether the tweeted text has a positive, negative, or neutral reaction to the current situation. The keywords used for picking the tweets are “lifting lockdown” and “lifting restrictions” in the same order of the words.

The feed from Twitter API is fed to Azure Event hub for handling huge stream of data, and then this stream of data is fed to azure streaming analytics which runs a tumbling window aggregation function for window width of 1hr, to aggregate the sentiment scores. This aggregated data is then stored in Azure blob store and visualized on this website.

The Tweeter reader is the main component which reads the tweets based on keywords and sends them to eventhub to start the sentiment analysis workflow.

public static IEnumerable < TweetData > StreamStatuses(TwitterConfig config) {
    var streamReader = ReadTweets(config);
    int count = 0;
    while (true) {
        string line = null;
        try {
            line = streamReader.ReadLine();
        } catch (Exception) {}
        if (!string.IsNullOrWhiteSpace(line) && !line.StartsWith("{\"delete\"")) {
            var result = JsonConvert.DeserializeObject < TweetData > (line);
            result.RawJson = line;
            yield
            return result;
        }
        // Oops the Twitter has ended... or more likely some error have occurred.
        // Reconnect to the twitter feed.
        if (line == null) {
            streamReader = ReadTweets(config);
        }
        count++;
        if (count == 500) {
            count = 0;
            System.Threading.Thread.Sleep(1000);
        }
    }
}
static TextReader ReadTweets(TwitterConfig config) {
    var oauth_version = "1.0";
    var oauth_signature_method = "HMAC-SHA1";
    // unique request details
    var oauth_nonce = Convert.ToBase64String(new ASCIIEncoding().GetBytes(DateTime.Now.Ticks.ToString()));
    var oauth_timestamp = Convert.ToInt64(
        (DateTime.UtcNow - new DateTime(1970, 1, 1, 0, 0, 0, 0, DateTimeKind.Utc)).TotalSeconds).ToString();
    System.Net.ServicePointManager.SecurityProtocol |= System.Net.SecurityProtocolType.Tls12;
    var resource_url = "https://stream.twitter.com/1.1/statuses/filter.json";
    // create oauth signature
    var baseString = string.Format("oauth_consumer_key={0}&oauth_nonce={1}&oauth_signature_method={2}&" + "oauth_timestamp={3}&oauth_token={4}&oauth_version={5}&track={6}", config.OAuthConsumerKey, oauth_nonce, oauth_signature_method, oauth_timestamp, config.OAuthToken, oauth_version, Uri.EscapeDataString(config.Keywords));
    baseString = string.Concat("POST&", Uri.EscapeDataString(resource_url), "&", Uri.EscapeDataString(baseString));
    var compositeKey = string.Concat(Uri.EscapeDataString(config.OAuthConsumerSecret), "&", Uri.EscapeDataString(config.OAuthTokenSecret));
    string oauth_signature;
    using(var hasher = new HMACSHA1(ASCIIEncoding.ASCII.GetBytes(compositeKey))) {
        oauth_signature = Convert.ToBase64String(hasher.ComputeHash(ASCIIEncoding.ASCII.GetBytes(baseString)));
    }
    // create the request header
    var authHeader = string.Format("OAuth oauth_nonce=\"{0}\", oauth_signature_method=\"{1}\", " + "oauth_timestamp=\"{2}\", oauth_consumer_key=\"{3}\", " + "oauth_token=\"{4}\", oauth_signature=\"{5}\", " + "oauth_version=\"{6}\"", Uri.EscapeDataString(oauth_nonce), Uri.EscapeDataString(oauth_signature_method), Uri.EscapeDataString(oauth_timestamp), Uri.EscapeDataString(config.OAuthConsumerKey), Uri.EscapeDataString(config.OAuthToken), Uri.EscapeDataString(oauth_signature), Uri.EscapeDataString(oauth_version));
    // make the request
    ServicePointManager.Expect100Continue = false;
    var postBody = "track=" + config.Keywords;
    resource_url += "?" + postBody;
    HttpWebRequest request = (HttpWebRequest) WebRequest.Create(resource_url);
    request.Headers.Add("Authorization", authHeader);
    request.Method = "POST";
    request.ContentType = "application/x-www-form-urlencoded";
    request.PreAuthenticate = true;
    request.AllowWriteStreamBuffering = true;
    request.CachePolicy = new System.Net.Cache.RequestCachePolicy(System.Net.Cache.RequestCacheLevel.BypassCache);
    // bail out and retry after 5 seconds
    var tresponse = request.GetResponseAsync();
    if (tresponse.Wait(5000)) return new StreamReader(tresponse.Result.GetResponseStream());
    else {
        request.Abort();
        return StreamReader.Null;
    }
}
public class TweetData {
    public string created_at {
        get;
        set;
    }
    public long id {
        get;
        set;
    }
    public string id_str {
        get;
        set;
    }
    public string text {
        get;
        set;
    }
    public string source {
        get;
        set;
    }
    public bool truncated {
        get;
        set;
    }
    public object in_reply_to_status_id {
        get;
        set;
    }
    public object in_reply_to_status_id_str {
        get;
        set;
    }
    public object in_reply_to_user_id {
        get;
        set;
    }
    public object in_reply_to_user_id_str {
        get;
        set;
    }
    public object in_reply_to_screen_name {
        get;
        set;
    }
    public User User {
        get;
        set;
    }
    public object geo {
        get;
        set;
    }
    public object coordinates {
        get;
        set;
    }
    public Place place {
        get;
        set;
    }
    public object contributors {
        get;
        set;
    }
    public long quoted_status_id {
        get;
        set;
    }
    public string quoted_status_id_str {
        get;
        set;
    }
    public QuotedStatus quoted_status {
        get;
        set;
    }
    public QuotedStatusPermalink quoted_status_permalink {
        get;
        set;
    }
    public bool is_quote_status {
        get;
        set;
    }
    public int quote_count {
        get;
        set;
    }
    public int reply_count {
        get;
        set;
    }
    public int retweet_count {
        get;
        set;
    }
    public int favorite_count {
        get;
        set;
    }
    public Entities3 entities {
        get;
        set;
    }
    public bool favorited {
        get;
        set;
    }
    public bool retweeted {
        get;
        set;
    }
    public string filter_level {
        get;
        set;
    }
    public string lang {
        get;
        set;
    }
    public string timestamp_ms {
        get;
        set;
    }
    public string RawJson {
        get;
        set;
    }
    public RetweetedStatus retweeted_status {
        get;
        set;
    }
    public ExtendedTweet extended_tweet {
        get;
        set;
    }
}

The stream analytics query :

SELECT System.Timestamp as Time, SentimentScore,  COUNT(*) as ScoreCounts
INTO output1
FROM TwitterStream TIMESTAMP BY CreatedAt
GROUP BY  TumblingWindow(hour, 1), SentimentScore


SELECT System.Timestamp as Time, SentimentScore,  COUNT(*) as ScoreCounts
INTO output2
FROM TwitterStream TIMESTAMP BY CreatedAt
GROUP BY  TumblingWindow(day, 1), SentimentScore

References

  1. https://docs.microsoft.com/en-us/azure/stream-analytics/stream-analytics-twitter-sentiment-analysis-trends