In the previous post, you learned how to generate vector embeddings for images and text using the multi-modal embeddings APIs of Azure AI Vision and computed the embeddings of the images in our dataset. In this post, we will use Azure Blob Storage to store the images and Azure Cosmos DB for PostgreSQL to store our vector embeddings. Then, we will perform a similarity search on our embeddings by utilizing the pgvector extension.
The workflow is illustrated in the following image:
In this tutorial, you will learn how to:
The entire functional project is available in my GitHub repository. If you're keen on trying it out, just fork the repository and clone it to have it locally available.
To proceed with this tutorial, ensure that you have the following prerequisites installed and configured:
The code for uploading images to an Azure Blob Storage container can be found at data_upload/upload_images_to_blob.py.
Azure Blob Storage is a cloud storage service that is optimized for storing large amounts of unstructured data, such as images. It offers three types of resources:
The following diagram illustrates the relationship between these resources:
Azure Blob Storage resources: storage account, container, and blobs. (Image source: Azure Blob Storage Object model – Microsoft Docs)
The Azure Blob Storage client library for Python provides the following classes to manage blobs and containers:
BlobServiceClient
: We will use the BlobServiceClient
class to interact with the Azure Storage account and create a container.ContainerClient
: We will use the ContainerClient
class to interact with our container and the blobs inside the container.BlobClient
: We will use the BlobClient
class to upload a blob to our container.The process of uploading our images to Azure Blob Storage can be summarized as follows:
ThreadPoolExecutor
class. Additionally, use the tqdm
library to display progress bars for better visualizing the image uploading process.
import os
import csv
import sys
from dotenv import load_dotenv
from azure.storage.blob import BlobServiceClient, ContainerClient, ContentSettings
from azure.core.exceptions import ResourceExistsError
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm
# Constants
MAX_WORKERS = 4
IMAGE_FILE_CSV_COLUMN_NAME = "image_file"
# Directories
current_dir = os.path.dirname(os.path.realpath(__file__))
parent_dir = os.path.dirname(current_dir)
# Load environemt file
load_dotenv(os.path.join(parent_dir, ".env"), override=True)
# Azure Blob Storage credentials
blob_account_name = os.getenv("BLOB_ACCOUNT_NAME")
blob_account_key = os.getenv("BLOB_ACCOUNT_KEY")
blob_endpoint_suffix = os.getenv("BLOB_ENDPOINT_SUFFIX")
blob_connection_string = (
f"DefaultEndpointsProtocol=https;AccountName={blob_account_name};"
f"AccountKey={blob_account_key};EndpointSuffix={blob_endpoint_suffix}"
)
container_name = os.getenv("CONTAINER_NAME")
# Dataset's folder
dataset_folder = os.path.join(parent_dir, "dataset")
dataset_filepath = os.path.join(dataset_folder, "dataset_embeddings.csv")
# Images' folder
images_folder = os.path.join(parent_dir, "semart_dataset", "images")
# Content-Type for blobs
content_settings = ContentSettings(content_type="image/jpeg")
def main():
# Create Azure Blob Storage client
blob_service_client = BlobServiceClient.from_connection_string(conn_str=blob_connection_string)
# Create a new container
try:
container_client = blob_service_client.create_container(name=container_name, public_access="blob")
except ResourceExistsError:
sys.exit(f"A container with name {container_name} already exists.")
# Find the URLs of the images in the dataset
images = load_image_filenames()
print(f"Number of images in the dataset: {len(images)}")
print(f"Uploading images to container '{container_name}'")
# Upload images to blob storage
upload_images(images=images, container_client=container_client)
def load_image_filenames() -> list[str]:
with open(dataset_filepath, "r") as csv_file:
csv_reader = csv.DictReader(csv_file, delimiter="\t", skipinitialspace=True)
image_filenames = [row[IMAGE_FILE_CSV_COLUMN_NAME] for row in csv_reader]
return image_filenames
def upload_blob_from_local_file(
image_filepath: str,
container_client: ContainerClient,
) -> None:
blob_name = os.path.basename(image_filepath)
try:
blob_client = container_client.get_blob_client(blob=blob_name)
with open(image_filepath, mode="rb") as data:
blob_client.upload_blob(data=data, overwrite=True, content_settings=content_settings)
except Exception as e:
print(f"Couldn't upload image {blob_name} to Azure Storage Account due to error: {e}")
def upload_images(images: list[str], container_client: ContainerClient) -> None:
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
list(
tqdm(
executor.map(
lambda x: upload_blob_from_local_file(
image_filepath=os.path.join(images_folder, x),
container_client=container_client,
),
images,
),
total=len(images),
)
)
The code for inserting vector embeddings into an Azure Cosmos DB for PostgreSQL table can be found at data_upload/upload_data_to_postgresql.py.
The pgvector extension adds vector similarity search capabilities to your PostgreSQL database. To use the extension, you have to first create it in your database. You can install the extension, by connecting to your database and running the CREATE EXTENSION
command from the psql
command prompt:
SELECT CREATE_EXTENSION('vector');
The pgvector extension introduces a data type called VECTOR
that can be used during the creation of a table to indicate that a column will hold vector embeddings. When creating the column, it’s essential to specify the dimension of the vectors. In our scenario, Azure AI Vision generates 1024-dimensional vectors.
To insert data into an Azure Cosmos DB for PostgreSQL table, we will proceed as follows:
COPY
command.
import os
import psycopg2
from psycopg2 import pool
from dotenv import load_dotenv
# Constants
IMAGE_FILE_COLUMN_NAME = "image_file"
DESCRIPTION_COLUMN_NAME = "description"
AUTHOR_COLUMN_NAME = "author"
TITLE_COLUMN_NAME = "title"
TECHNIQUE_COLUMN_NAME = "technique"
TYPE_COLUMN_NAME = "type"
TIMEFRAME_COLUMN_NAME = "timeframe"
VECTOR_COLUMN_NAME = "vector"
# Directories
current_dir = os.path.dirname(os.path.realpath(__file__))
parent_dir = os.path.dirname(current_dir)
# Load environemt file
load_dotenv(os.path.join(parent_dir, ".env"), override=True)
# Azure CosmosDB for PostgreSQL credentials
postgres_host = os.getenv("POSTGRES_HOST")
postgres_database_name = os.getenv("POSTGRES_DB_NAME")
postgres_user = os.getenv("POSTGRES_USER")
postgres_password = os.getenv("POSTGRES_PASSWORD")
sslmode = "require"
table_name = os.getenv("POSTGRES_TABLE_NAME")
postgres_connection_string = (
f"host={postgres_host} user={postgres_user} "
f"dbname={postgres_database_name} "
f"password={postgres_password} sslmode={sslmode}"
)
# Dataset's folder
dataset_folder = os.path.join(parent_dir, "dataset")
dataset_filepath = os.path.join(dataset_folder, "dataset_embeddings.csv")
def main():
postgresql_pool = psycopg2.pool.SimpleConnectionPool(1, 20, postgres_connection_string)
if (postgresql_pool):
print("Connection pool created successfully")
# Get a connection from the connection pool
conn = postgresql_pool.getconn()
cursor = conn.cursor()
print("Creating a table...")
cursor.execute(f"DROP TABLE IF EXISTS {table_name};")
cursor.execute(
f"CREATE TABLE {table_name} ("
f"{IMAGE_FILE_COLUMN_NAME} TEXT PRIMARY KEY,"
f"{DESCRIPTION_COLUMN_NAME} TEXT NOT NULL,"
f"{AUTHOR_COLUMN_NAME} TEXT NOT NULL,"
f"{TITLE_COLUMN_NAME} TEXT NOT NULL,"
f"{TECHNIQUE_COLUMN_NAME} TEXT,"
f"{TYPE_COLUMN_NAME} TEXT,"
f"{TIMEFRAME_COLUMN_NAME} TEXT,"
f"{VECTOR_COLUMN_NAME} VECTOR(1024) NOT NULL);"
)
print("Saving data to table...")
with open(dataset_filepath) as csv_file:
cursor.copy_expert(
f"COPY {table_name} FROM STDIN WITH "
f"(FORMAT csv, DELIMITER '\t', HEADER MATCH);",
csv_file
)
conn.commit()
# Fetch all rows from table
cursor.execute(f"SELECT * FROM {table_name};")
rows = cursor.fetchall()
print(f"Number of records in the table: {len(rows)}")
# Close the connection
cursor.close()
conn.close()
The image similarity search workflow that we will follow is summarized as follows:
SELECT
statements and the built-in vector operators of the PostgreSQL database. Specifically, cosine similarity will be used as the similarity metric.
This workflow is illustrated in the following diagram:
Given the vector embedding of the query, we can use SQL SELECT
statements to search for similar images. Let’s understand how a simple SELECT
statement works. Consider the following query:
SELECT * FROM table ORDER BY embedding <=> '[0.003, …, 0.034]' LIMIT 5
This query computes the cosine distance (<=>
) between the given vector ([0.003, …, 0.034]
) and the vectors stored in the table, sorts the results by the calculated distance, and returns the five most similar images (LIMIT 5
). Additionally, you can obtain the cosine similarity between the query vector and the retrieved vectors by modifying the SELECT
statement as follows:
SELECT image_title, 1 - (vector <=> '[0.003, …, 0.034]') AS cosine_similarity FROM table ORDER BY cosine_similarity DESC LIMIT 5;
The pgvector extension provides 3 operators that can be used to calculate similarity:
Operator | Description |
<-> |
Euclidean distance |
<#> |
Negative inner product |
<=> |
Cosine distance |
In the Jupyter Notebook provided on my GitHub repository, you'll explore the following scenarios:
In this post, you explored the basic vector similarity search features offered by the pgvector extension. This type of vector search is referred to as exact nearest neighbor search, as it computes the similarity between the query vector and every vector in the database. In the upcoming post, you will explore approximate nearest neighbor search, which trades off result quality for speed.
If you want to explore pgvector's features, check out these learning resources:
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.