Does news affect Bitcoin rates? Use Logstash & Free Kusto to analyze Bitcoin rates and news reports
Published Mar 08 2023 03:37 AM 3,942 Views

In this tutorial, we will show how to use the free Azure Data Explorer offering (ADX/Kusto) along with the Kusto Logstash open-source connector to collect, ingest and analyze real-time Bitcoin rates and news reports (sourced from The New York Times). Once the data is in ADX, you can use the attached ADX dashboard template file to interactively visualize news reports and their categories displayed alongside the Bitcoin rates timechart.

The end result: a dashboard that presents news reports and Bitcoin rates, over time. Click on the magnifying glass icon to zoom inThe end result: a dashboard that presents news reports and Bitcoin rates, over time. Click on the magnifying glass icon to zoom in

We will use Logstash's http_poller plugin to pull real-time data from external sources into Kusto. Then, we'll use Kusto to transform, query and visualize the data. 
Here is the high-level architecture of this tutorial:

Click on the magnifying glass icon to zoom inClick on the magnifying glass icon to zoom in

Step 1: Create your free personal Kusto cluster

If you don't already have a Kusto cluster, the Kusto free cluster offering is a great option to start with.

Free cluster allows anyone with a Microsoft account or an Azure Active Directory user identity to create a free ADX cluster without needing an Azure subscription or a credit card.
It's a frictionless way to create a free cluster that can be used for any purpose. It's the ideal solution for anyone who wants to get started quickly with Azure Data Explorer and experience the incredible engine performance and enjoy the productive Kusto Query Language.
For more details on creating the free Kusto cluster, see create a free Azure Data Explorer cluster

 

Step 2: Install Logstash

Logstash is an open-source data processing pipeline that ingests data from many sources simultaneously, transforms the data (optional), and then sends the data to your favorite "stash". It is often used as a part of the ELK stack (which stands for Elasticsearch, Logstash, and Kibana) to process and analyze data. The good news is that Logstash can send data to ADX, making migration from Elasticsearch to ADX easier. To install Logstash, see this reference

 

Step 3: Install Kusto output-plugin for Logstash 

The Logstash output plugin communicates with ADX and sends data to the service. 
Run the following command inside the Logstash root directory to install the plugin:

 

bin/logstash-plugin install logstash-output-kusto

 

For more information on the plugin, see Logstash Output Plugin for ADX.

Step 4:  Configuring the Logstash pipeline (the data inputs and the output to Kusto)

A Logstash configuration file is a file that specifies the settings for a Logstash pipeline. It tells Logstash how to process data and what to do with it.

A Logstash configuration file is written in JSON format and consists of three sections: inputs, filters, and outputs.

For this tutorial, we will use the following configuration:

 

input 
{
	http_poller 
	{
		urls => { bitcoin => "https://api.coindesk.com/v1/bpi/currentprice.json" }
		request_timeout => 60
		# Supports "cron", "every", "at" and "in" schedules by rufus scheduler
		# * * * * * == every minute, every hour of every day of every month and every day of the week.
		schedule => { cron => "* * * * * UTC"}
		codec => "json"
		tags => "bitcoin"
    }
	http_poller 
	{
		#	urls => {news => "https://api.rss2json.com/v1/api.json?rss_url=http://rss.nytimes.com/services/xml/rss/nyt/HomePage.xml" }
		urls => {news => "https://api.rss2json.com/v1/api.json?rss_url=http://rss.nytimes.com/services/xml/rss/nyt/HomePage.xml&api_key=AaBbCc1234&count=80" }
		request_timeout => 60
		# Supports "cron", "every", "at" and "in" schedules by rufus scheduler
		#schedule => { cron => "*/10 * * * * UTC"}
		schedule => { cron => "* * * * * UTC"}
		codec => "json"
		tags => "news"
	}
}
#filter
#{
	#mutate{convert => {"[bpi][USD][rate]" => "float"}}
#}
output
{
	stdout 
	{
		codec => rubydebug
	}
    if "bitcoin" in [tags] {
		kusto
		{
			path=>"/tmp/kusto/%{+YYYY-MM-dd-HH-mm}.txt"
			ingest_url => "https://<FreeCluster data ingestion URI name, for  example: ingest-kvckrk4t8hxbve8j8dazkn>.<FreeCluster URI region, for example: southcentralus>.kusto.windows.net"
			app_id => "<Your app ID. Example: bb58bc3e-185f-4ec1-bc6a-0a27806d3538>"
			app_key => "<Your app Key. Example: abc12~KFY~AaBbCcExaMple> "
			app_tenant=>"<The app tenant. Example: 72f988bf-aabb-ccdd>"
			database=>"<The destination DB name. Example: DB1>"
			table=>"<The destination table name for the bitcoin data. Example: fromlogstash>"
			json_mapping=>"<The mapping name. Example: fromlogstash_mapping>"
		}
	}
	if "news" in [tags] {
	kusto
			{
				path=>"/tmp/kusto/%{+YYYY-MM-dd-HH-mm}.txt"
			ingest_url => "https://<FreeCluster data ingestion URI name, for example: ingest-kvckrk4t8hxbve8j8dazkn>.<FreeCluster URI region, for example: southcentralus>.kusto.windows.net"
			app_id => "<Your app ID. Example: bb58bc3e-185f-4ec1-bc6a-0a27806d3538>"
			app_key => "<Your app Key. Example: abc12~KFY~AaBbCcExaMple> "
			app_tenant=>"<The app tenant. Example: 72f988bf-aabb-ccdd>"
			database=>"<The destination DB name. Example: DB1>"
			table=>"<The destination table name for the news data. Example: fromlogstashnews>"
			json_mapping=>"<The mapping name. Example: fromlogstashnews_mapping>"
			}
	}
}

 

The inputs section specifies the source of the data that Logstash will process. This could be a log file, a database, or some other source of data.
In our case, we'll use the Http_poller Logstash input plugin to get the real-time Bitcoin rates and news reports data. 

This Http_poller input plugin allows you to call an HTTP API, decode the output of it into event(s), and send them out. 

 

Step 4.1: Get the Bitcoin rates


To get the Bitcoin rates we use the API of the CoinDesk website. One service that CoinDesk provides is the Bitcoin Price Index (BPI). This Bitcoin pricing data is calculated every minute and is published in USD, EUR, and GBP. The Bitcoin Price Index data is made available programmatically via REST API. 

Step 4.2: Get the news reports


To get the news reports we use the New York Times RSS feed (Really Simple Syndication). The RSS feed offers a way to get NYTimes.com content, updated throughout the day, including the latest headlines, summaries, and keywords for each article.
We also use the free API of rss2json.com, to convert the RSS feed into valid JSON.

The  free plan of rss2json.com permits a maximum of 10,000 requests per day, which suffices if we intend to invoke the service every minute (we use the Cron schedule expression of the Logstash's http_poller plugin for polling the feed every minute).
However, if you desire to poll the news feed continuously, above a certain number of daily calls, you may have to register for a free account on the rss2json.com website and generate an API key for the news feed address (http://rss.nytimes.com/services/xml/rss/nyt/HomePage.xml). 
Once you have obtained the API key, add it to the end of the URL of the http_poller, as demonstrated in the config file above. Additionally, we made use of the "Count" parameter, which defines the number of feed items to return (default value of 10). In our case, we set the "Count" parameter to 80.
For both inputs, we use the HTTP_poller's Schedule option to periodically poll from the URLs every minute.
We also use the Tag option to tag the events. This can help with processing later and send it to the right table in ADX. 

 

The filters section specifies optional transformations that Logstash should apply to the data. This could include parsing the data, adding or removing fields, or performing some other transformation. 

This example shows how to convert the USD rate of Bitcoin from a string (with a decimal point) to float.

While this is a valid option, we commented out the filter section since we will later show how to do this conversion inside ADX using an update policy

 

The outputs section specifies where Logstash should send the processed data. This could be another log file, a database, or some other destination.
In our case, the destination will be two tables in the free Kusto cluster. We have a table for the Bitcoin rates and another table for news reports. The Tags from the input are used to determine the right destination table.


We also use stdout to print the output to the standard output of the shell running Logstash. This output can be convenient when debugging plugin configurations, by allowing instant access to the event data after it has passed through the inputs and filters.
For more details on the parameters, see: Configure Logstash to send data to ADX 

 

Step 5: Set the Kusto output plugin parameters (credentials):
Azure Active Directory (Azure AD) application authentication is used by the Logstash connector to access and ingest data to the Azure Data Explorer cluster without a user present. The application ID, key (client secret), and the tenant ID are credentials required to connect to ADX. If you don't know how to create an AAD application, please refer to Create an Azure Active Directory application registration in ADX. Be sure to use an application with ingest privileges.
Then, authorize the Azure AD App created for database DB1 by executing the following KQL command in the ADX database query window, which grants the required permissions:

 

.add database DB1 ingestors ('aadapp=<You app_id. Example: 2a904276-12342a904276>')

 

Step 6. Create a destination tables in Kusto

Run the following KQL commands to create the destination tables:

 

.create table fromlogstash (timestamp: datetime, message: string) 

.create table fromlogstash_extracted (timestamp: datetime, USD_rate: real, Euro_rate: real, GBP_rate: real)

.create table fromlogstashnews (timestamp: datetime, message: string) 

.set async fromlogstashnews_and_categories_extracted <|
fromlogstashnews
| mv-expand todynamic(message)
| extend title=tostring(message.['title'])
| extend content=tostring(message.content)
| extend categories=(message.categories)
| project-away message
| take 0

 

Let's take a moment to introduce the four tables that we have created:
Table 1: fromlogstash - the raw JSON data of the Bitcoin data will land in this table.
Table 1. Click on the magnifying glass icon to zoom inTable 1. Click on the magnifying glass icon to zoom in

Table 2:  fromlogstash_extracted - for better performance and easier data management, we'll use an update policy to extract the data out of the raw JSON data and "promote" them into independent columns.
Table 2. Click on the magnifying glass icon to zoom inTable 2. Click on the magnifying glass icon to zoom in

Table3: fromlogstashnews - will store the raw JSON data of the news reports. The reports are in an array with keys like the title of the report, the description of the report, the categories, etc.
The JSON data of each record (which is pulled every minute) is structured as an array of news reports, where each report is represented by a JSON object containing various keys such as the report's title, description, categories, and other relevant details. Table 3. Click on the magnifying glass icon to zoom inTable 3. Click on the magnifying glass icon to zoom in

Table 4: fromlogstashnews_and_categories_extracted -  we'll use an update policy to extract the relevant JSON data from the raw message and promote it into individual columns.
Using the ".set" command with "take 0" results in the creation of an empty table with the schema based on the query. The behavior of the ".set" command is such that it creates the table only if it does not already exist. With "take 0", only the table structure was created without any data being ingested. 

Table 4. Click on the magnifying glass icon to zoom inTable 4. Click on the magnifying glass icon to zoom in

 

Step 7: Create a table data mapping

Data mappings are used during ingestion to map incoming data to columns inside tables. We'll use JSON mapping to map incoming JSON data from Logstash to columns inside our tables. We'll create two ingestion mappings (for each data "landing" table):

 

.create table fromlogstash ingestion json mapping 'fromlogstash_mapping' '[{"column":"timestamp","path":"$.@timestamp"},{"column":"message","path":"$.bpi"}]'
.create table fromlogstashnews ingestion json mapping 'fromlogstashnews_mapping' '[{"column":"timestamp","path":"$.@timestamp"},{"column":"message","path":"$.items"}]'

 

Step 8: Tweak the ingestion batching policy

During the ingestion process, throughput is optimized by batching small ingress data chunks together before ingestion. The ingestion batching policy defines data aggregation for batching. In this article, you can define and assign an ingestion batching policy for a table using the table batching policy. The  ingestion batching policy can be set on databases or tables. Default values are as follows: 5 minutes maximum delay time, 1000 items, total size of 1 GBWe will reduce the parameter from 5 minutes to 40 seconds to speed up the ingestion process during our testing phase.

 

.alter table fromlogstash policy ingestionbatching @'{"MaximumBatchingTimeSpan":"00:00:40", "MaximumNumberOfItems": 10, "MaximumRawDataSizeMB": 1024}'
.alter table fromlogstashnews policy ingestionbatching @'{"MaximumBatchingTimeSpan":"00:00:40", "MaximumNumberOfItems": 10, "MaximumRawDataSizeMB": 1024}'

 

Step 9: Create update policies
Update policies will convert the raw JSON data ingested into the "fromlogstash" table, into separate columns, and append the result to the target table "fromlogstash_extracted". Besides, we'll use the update policy to remove commas from the Bitcoin rate strings and convert them into floating-point numbers. The update policy's query will include the function below:

 

.create-or-alter function  ExtractLogstashLogs() {
fromlogstash
// 1. Interpret a string as a JSON value
| extend d=parse_json(message) 
// 2. Create calculated columns: 
// 2.1 Access the USD.rate property in the JSON 
| extend USD_rate = tostring(d.USD.rate)
| extend EUR_rate = tostring(d.EUR.rate)
| extend GBP_rate = tostring(d.GBP.rate)
// 2.2 Remove the comma (,) from the strings (for example, the rate can be 43,632.556)
| extend USD_replaced=replace_string(USD_rate, ',', '')
| extend EUR_replaced=replace_string(EUR_rate, ',', '')
| extend GBP_replaced=replace_string(GBP_rate, ',', '')
// 3. Convert the string to real (floating-point number)
| extend USD_real = toreal(USD_replaced)
| extend EUR_real = toreal(EUR_replaced)
| extend GBP_real = toreal(GBP_replaced)
// 4. Select the columns to include in the result
| project timestamp, USD_real, EUR_real, GBP_real
}

 

Additionally, we'll use another update policy to extract the JSON data of the "fromlogstashnews" table (the title, content, and categories of each news report) and store them in individual columnsThe update policy's query will include the following function.

 

.create-or-alter function  ExtractNewsDataAndCategories() {
fromlogstashnews
| mv-expand todynamic(message)
| extend title=tostring(message.['title'])
| extend content=tostring(message.content)
| extend categories=(message.categories)
| project-away message
}

 

Then, we'll create the two update policies (one for the Bitcoin rates table ("fromlogstash" table) and one for the new reports table ("fromlogstashnews" table).

 

.alter table fromlogstash_extracted policy update 
@'[{ "IsEnabled": true, "Source": "fromlogstash", "Query": "ExtractLogstashLogs()", "IsTransactional": false, "PropagateIngestionProperties": false}]'

 

 

.alter table fromlogstashnews_and_categories_extracted policy update 
@'[{ "IsEnabled": true, "Source": "fromlogstashnews", "Query": "ExtractNewsDataAndCategories()", "IsTransactional": false, "PropagateIngestionProperties": false}]'

 

Step 10: Run the Logstash pipeline 

To start the ingestion, start the Logstash pipeline from its bin library. 

 

logstash -f logstash.conf --config.reload.automatic

 

Step 11: Create a dashboard

You can use the create new dashboard from a file option, to reuse my dashboard. The dashboard file is attached. Note that it is in DOC format due to technical blog platform constraints. Before importing it to ADX dashboards, you'll have to convert it to JSON.
We have incorporated the dashboards' cross-filter interactions. This allows for a seamless experience where clicking on a point in the time chart dynamically displays the corresponding news reports and categories associated with that particular time.
Dashboard interactions. Click on the magnifying glass icon to zoom inDashboard interactions. Click on the magnifying glass icon to zoom in

Step 12:. Have fun digging into the Bitcoin rates and checking out the related news reports!

 

Summary:
In this blog post, we demonstrated how to utilize the free Azure Data Explorer cluster and the Kusto Logstash connector to collect, ingest, and analyze real-time Bitcoin rates and The New York Times news reports. We also provided the Logstash plugin configuration, the table creation scripts, including functions and update policies. We also provided a dashboard file with the functionality to visualize news reports and their corresponding categories within a selected time frame.

 
3 Comments
Version history
Last update:
‎Mar 09 2023 03:07 AM