Any commercial or government organization, big or small, collects and stores a lot of data and, in general, across multiple systems. Getting insights and doing Machine Learning across all these different and distributed datasets can be a challenge. For example, an insurance company that insures houses would like to understand if an address is located in a flood zone to calculate the risks involved and estimate the risk. These types of problems need an analytics system that can combine, transform, and enrich the data to give needed insights and useful results.
Getting geospatial insights from big data
Azure Synapse is a limitless analytics service that combines Enterprise data warehousing and Big Data analytics. It gives you the freedom to query data on your terms. By combining Azure Maps geospatial capabilities with Azure Synapse and SynapseML, you can enrich your data with location-aware capabilities. SynapseML geospatial services can geocode addresses, reverse-geocode coordinates, and also check if a coordinate is inside a polygon. The geocoder is very tolerant of typos and incomplete addresses. It will also handle everything from exact street addresses or street intersections and higher-level geographies such as city centers, counties, states, etc.
How to visualize customer addresses on a floodplain?
Before you can start using SynapseML to geocode, you need to obtain a Azure Maps key and set it in your environment. Learn here how to get a Azure Maps key.
from synapse.ml.cognitive import *
from synapse.ml.geospatial import *
# An Azure Maps account key
azureMapsKey = os.environ["AZURE_MAPS_KEY"]
Load address data
King County (WA) publishes floodplain and tax parcel data. We can use the addresses in the tax parcel data and the Azure Maps geocoder to calculate coordinates. Then, using these coordinates and the floodplain data, we can enrich our dataset with a flag indicating whether the house is in a flood zone or not. The following data has been sourced from King County's Open data portal.
data = spark.read\
.option("header", "true")\
.csv("wasbs://publicwasb@mmlspark.blob.core.windows.net/maps/KingCountyAddress.csv")
# Visualize incoming schema
print("Schema:")
data.printSchema()
# Choose a subset of the data for this example
subset_data = data.limit(50)
display(subset_data)
Wire-up the Address Geocoder
We will use the address geocoder to enrich the dataset with location coordinates of the addresses.
from pyspark.sql.functions import col
from synapse.ml.cognitive import *
from synapse.ml.stages import FixedMiniBatchTransformer, FlattenBatch
from synapse.ml.geospatial import *
def extract_location_fields(df):
# Use this function to select only lat/lon columns into the dataframe
return df.select(col("*"),
col("output.response.results").getItem(0).getField("position").getField("lat").alias("Latitude"),
col("output.response.results").getItem(0).getField("position").getField("lon").alias("Longitude")
).drop("output")
# Azure Maps geocoder to enhance the dataframe with location data
geocoder = (AddressGeocoder()
.setSubscriptionKey(azureMapsKey)
.setAddressCol("FullAddress")
.setOutputCol("output"))
# Set up a fixed mini batch transformer to geocode addresses
batched_dataframe = geocoder.transform(FixedMiniBatchTransformer().setBatchSize(10).transform(subset_data.coalesce(1)))
geocoded_addresses = extract_location_fields(FlattenBatch().transform(batched_dataframe))
# Display the results
display(geocoded_addresses)
Setup Check Point In Polygon
Now that we have geocoded the addresses, we can then use the CheckPointInPolygon function to check if a property is in a flood zone or not.
def extract_point_in_polygon_result_fields(df):
# Use this function to select only lat/lon columns into the dataframe
return df.select(col("*"),
col("output.result.pointInPolygons").alias("In Polygon"),
col("output.result.intersectingGeometries").alias("Intersecting Polygons")
).drop("output")
check_point_in_polygon = (CheckPointInPolygon()
.setSubscriptionKey(azureMapsKey)
.setGeography(atlas_geo_prefix)
.setUserDataIdentifier(user_data_id)
.setLatitudeCol("Latitude")
.setLongitudeCol("Longitude")
.setOutputCol("output"))
flood_plain_addresses = extract_point_in_polygon_result_fields(check_point_in_polygon.transform(geocoded_addresses))
# Display the results
display(flood_plain_addresses)
Thank you for checking out our blog and you can read the complete tutorial on the SynapseML GitHub page.