Purpose
THIS IS NOT A HOW-TO ARTICLE - Please follow Step 2 for the full step-by-step to set up your solution.
This article seeks to supplement the Deployment guide provided in the repository. The post will include my “gotchas” and learnings that the guide might have missed. Check out part 1 of this article to get a quick overview of the solution
MS Doc - Build and deploy a social media analytics solution - Azure Architecture Center | Microsoft Learn
Pre-Requisite
- You need to have an Azure Subscription. You can start with a free $200 credit here!
- Go to this repository and follow along
- You need to have work/school account to access PowerBI
Alright, let’s get started!
This is what the architecture looks like once you complete Step 2 :
Before we go further...
Please replace the notebooks you have uploaded in this step (4.4 Upload Notebooks) with the notebooks in this directory: https://tinyurl.com/33ufnz63
The new notebooks contain bugfixes.
- Allows Hashtag queries..
- Removed empty leading spaces that occurred with some foreign languages (I.e. Japanese)
- Allows null Author values (Articles that didn’t have authors would make the solution fail)
Ingest Step
TwitterAPI - Before you run your pipeline make sure your Twitter’s Developer account is setup with elevated (link) access.
If you are not elevated you will get:
Before you run the Synapse Pipeline I recommend you try out the Search API using the Twitter’s Postman Collection (link).
Go to Twitter’s Developer’s Portal > Projects & Apps > Project > {App Name} to see your Keys and Secrets:
Use your bearer token from your Twitter’s Developer account:
NewsAPI - please check Everything - Documentation - News API for all the query options
Please note
- Unless you have an academic research Twitter account, you will only get tweets from the past 7 days.
- Currently, our pipeline will fail if the Twitter API or the News API retrieves 0 result. We highly recommend using the Twitter's postman collection and https://newsapi.org/v2/everything?q=<ENTER YOUR QUERY>&apiKey=<YOUR API> to see if you get any results back. If your result is 0 on either site you will get errors in your notebook:
When you go to this step (4.5 Setup Pipelines) you’ll see parameters you need to fill in:
(The deployment guide does not go into details what these inputs mean)
Key | Value |
data_lake_acocunt_name | Storage Account name |
file_system_name | This is your container in your storage (Storage > Container and it should be the same as what’s shown) |
keyvault_name | Your KeyVault resource name |
query | Search term for NewsAPI (checkout the documentation) and Twitter |
topic | Logical grouping for PowerBi. Not critical to the query themselves but it helps to organizing the views within PowerBi |
ENRICH
Once all of this is setup the Enrich step can take place. Enrichment step will use the notebooks to call Twitter and NewsAPI to aggregate queried data.
This step will reach out to Azure’s cognitive services to run Text Analytics and Sentiment analysis on the data then store that into the datalake storage.
There are two main functions for our enrichment process (both in the Ingest_Process_Tweets.ipynb and Ingest_Process_News.ipynb) :
The get_translation() (Tweets)function:
def get_translation(inp_text, to_languages):
"""
Params:
inp_text: text to be translated
to_languages: list of languages to translate to
Returns: {lang_code: translation}, language code of the original text
Call to translator cognitive service detects language and translates to the target languages.
Result is a dictionary of language codes to translated text, along with the language detected.
"""
# Translator setup
translator_path = "/translate"
translator_url = TRANSLATOR_ENDPOINT + translator_path
params = {
"api-version": "3.0",
"to": to_languages
}
headers = {
'Ocp-Apim-Subscription-Key': TRANSLATOR_KEY,
'Ocp-Apim-Subscription-Region': TRANSLATOR_REGION,
'Content-type': 'application/json',
'X-ClientTraceId': str(uuid.uuid4())
}
# create and send request
body = [{
'text': inp_text
}]
request = requests.post(translator_url, params=params, headers=headers, json=body)
response = request.json()
try:
from_language = response[0]["detectedLanguage"]["language"]
translations = response[0]["translations"]
res = {}
for trans in translations:
res[trans['to']] = trans['text']
return res, from_language
except Exception as err:
print("Encountered an exception. {}".format(err))
return err
Sends the searched result to the translation service and returns the first translated result (For our example: English)
and the get_sentiment() (Tweets) function:
def get_sentiment(inp_text):
documents = [inp_text]
response = text_analytics_client.analyze_sentiment(documents = documents)[0]
try:
overallscore = response.confidence_scores.positive + (0.5*response.confidence_scores.neutral) # check logic of this
return response.sentiment, overallscore
except Exception as err:
print("Encountered Sentiment exception. {}".format(err))
return "Neutral",0.5
Sends the retrieved title to the sentiment service
Please note - The services can handle multiple languages as their input / output source. The code is currently written to return the first translated text - English
STORE
PySpark UDF
We will leverage the functions mentioned in the Enrich step and use it as our User Defined Functions.
- PySpark has built in functions but it’s limited. We create UDF to extend capabilities in PySpark and use it in the DataFrame.
For our scenario we needed to create custom functions to call the Azure cognitive service APIs to enrich the data. The data will then be transformed with Translation, Text Analytics and be stored in the Data Lake Storage.
Example of a translations UDF:
def get_translations(originaltext,target_languages):
translations_text, query_language = get_translation(originaltext, ast.literal_eval(target_languages))
return Row('Translations', 'QueryLanguage')(translations_text, query_language)
get_translations_udf = udf(lambda originaltext,target_languages: get_translations(originaltext,target_languages),returnType=schema)
df_translations = df_tweets.select(['id','text']) \
.withColumn('Ttext1', get_translations_udf(col("text"),lit(str(target_languages)))) \
.withColumn('original_language', col("Ttext1.QueryLanguage")) \
.select(explode(col("Ttext1.Translations")).alias("language","TText"),'id','text','original_language') \
.withColumn('created_datetime', inserted_date_udf(col("id")))
df_translations = df_translations.select('id','text','TText','language','original_language','created_datetime')
df_translations.write.mode("append").saveAsTable("tweets_translations")
VISUALIZE
Now that we've got that data stored in ADLS, we need to provide a way to visualize the data!
Go to Azure and find your deployed Synapse resource. Go to your Portal > Synapse Resource > Overview.
Copy the Serverless SQL endpoint.
Go and open the PowerBI dashboard file
Paste the Serverless SQL endpoint When the prompt comes up. Set the ‘database’ to default:
When you click ‘Load’ for the first time you will be prompted to sign in:
Please click ‘Microsoft Account’ and login into your Work/School Account.
You should see something like this once everything is setup!
*Extra Step*
What if you want to share your dashboard with someone? You can do that by adding a user to Synapse Workspace.
Go to your Synapse Workspace > Manage > Access Control > + Add
Select ‘Synapse SQL Administrator’ and search work/school email to add user.
Once it’s setup the user should be able to sign in with Microsoft Account to access the dashboard!
Congratulations
That’s it! Hopefully these learnings of mine will help you get over any initial hurdles to get the solution up and running
Thank you for reading!