Store and query embeddings in Azure Cosmos DB for PostgreSQL with pgvector
Published Mar 05 2024 12:00 AM 3,020 Views
Iron Contributor

part2.png

 

In the previous post, you learned how to generate vector embeddings for images and text using the multi-modal embeddings APIs of Azure AI Vision and computed the embeddings of the images in our dataset. In this post, we will use Azure Blob Storage to store the images and Azure Cosmos DB for PostgreSQL to store our vector embeddings. Then, we will perform a similarity search on our embeddings by utilizing the pgvector extension.

 

The workflow is illustrated in the following image:

 

Image similarity search workflowImage similarity search workflow

 

In this tutorial, you will learn how to:

  • Upload images to an Azure Blob Storage container using the Python SDK.
  • Store vector embeddings on an Azure Cosmos DB for PostgreSQL table.
  • Write SQL queries to detect similar images based on a text prompt or a reference image.

 

The entire functional project is available in my GitHub repository. If you're keen on trying it out, just fork the repository and clone it to have it locally available.

 

Prerequisites

 

To proceed with this tutorial, ensure that you have the following prerequisites installed and configured:

 

Upload images to Azure Blob Storage

 

The code for uploading images to an Azure Blob Storage container can be found at data_upload/upload_images_to_blob.py.

 

Azure Blob Storage is a cloud storage service that is optimized for storing large amounts of unstructured data, such as images. It offers three types of resources:

  • The storage account that contains all your Azure Storage data objects. Every object that is stored in Azure Storage is identified by a unique address.
  • Containers in the storage account which are similar to directories in a file system.
  • Blobs that are organized in the containers.

The following diagram illustrates the relationship between these resources:

 

blob1

 

Azure Blob Storage resources: storage account, container, and blobs. (Image source: Azure Blob Storage Object model – Microsoft Docs)

 

The Azure Blob Storage client library for Python provides the following classes to manage blobs and containers:

  • BlobServiceClient: We will use the BlobServiceClient class to interact with the Azure Storage account and create a container.
  • ContainerClient: We will use the ContainerClient class to interact with our container and the blobs inside the container.
  • BlobClient: We will use the BlobClient class to upload a blob to our container.

The process of uploading our images to Azure Blob Storage can be summarized as follows:

  1. Create a new container to store our images.
  2. Retrieve the filenames of the images in the dataset.
  3. Upload the images in the container, utilizing multiple threads via the ThreadPoolExecutor class. Additionally, use the tqdm library to display progress bars for better visualizing the image uploading process.

 

 

import os
import csv
import sys
from dotenv import load_dotenv
from azure.storage.blob import BlobServiceClient, ContainerClient, ContentSettings
from azure.core.exceptions import ResourceExistsError
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm

# Constants
MAX_WORKERS = 4
IMAGE_FILE_CSV_COLUMN_NAME = "image_file"

# Directories
current_dir = os.path.dirname(os.path.realpath(__file__))
parent_dir = os.path.dirname(current_dir)

# Load environemt file
load_dotenv(os.path.join(parent_dir, ".env"), override=True)
# Azure Blob Storage credentials
blob_account_name = os.getenv("BLOB_ACCOUNT_NAME")
blob_account_key = os.getenv("BLOB_ACCOUNT_KEY")
blob_endpoint_suffix = os.getenv("BLOB_ENDPOINT_SUFFIX")
blob_connection_string = (
    f"DefaultEndpointsProtocol=https;AccountName={blob_account_name};"
    f"AccountKey={blob_account_key};EndpointSuffix={blob_endpoint_suffix}"
)
container_name = os.getenv("CONTAINER_NAME")

# Dataset's folder
dataset_folder = os.path.join(parent_dir, "dataset")
dataset_filepath = os.path.join(dataset_folder, "dataset_embeddings.csv")

# Images' folder
images_folder = os.path.join(parent_dir, "semart_dataset", "images")

# Content-Type for blobs
content_settings = ContentSettings(content_type="image/jpeg")


def main():
    # Create Azure Blob Storage client
    blob_service_client = BlobServiceClient.from_connection_string(conn_str=blob_connection_string)

    # Create a new container
    try:
        container_client = blob_service_client.create_container(name=container_name, public_access="blob")
    except ResourceExistsError:
        sys.exit(f"A container with name {container_name} already exists.")

    # Find the URLs of the images in the dataset
    images = load_image_filenames()

    print(f"Number of images in the dataset: {len(images)}")
    print(f"Uploading images to container '{container_name}'")

    # Upload images to blob storage
    upload_images(images=images, container_client=container_client)


def load_image_filenames() -> list[str]:
    with open(dataset_filepath, "r") as csv_file:
        csv_reader = csv.DictReader(csv_file, delimiter="\t", skipinitialspace=True)
        image_filenames = [row[IMAGE_FILE_CSV_COLUMN_NAME] for row in csv_reader]

    return image_filenames


def upload_blob_from_local_file(
    image_filepath: str,
    container_client: ContainerClient,
) -> None:
    blob_name = os.path.basename(image_filepath)
    try:
        blob_client = container_client.get_blob_client(blob=blob_name)
        with open(image_filepath, mode="rb") as data:
            blob_client.upload_blob(data=data, overwrite=True, content_settings=content_settings)
    except Exception as e:
        print(f"Couldn't upload image {blob_name} to Azure Storage Account due to error: {e}")


def upload_images(images: list[str], container_client: ContainerClient) -> None:
    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        list(
            tqdm(
                executor.map(
                    lambda x: upload_blob_from_local_file(
                        image_filepath=os.path.join(images_folder, x),
                        container_client=container_client,
                    ),
                    images,
                ),
                total=len(images),
            )
        )

 

 

Store embeddings in Azure Cosmos DB for PostgreSQL

 

The code for inserting vector embeddings into an Azure Cosmos DB for PostgreSQL table can be found at data_upload/upload_data_to_postgresql.py.

 

Activate the pgvector extension

 

The pgvector extension adds vector similarity search capabilities to your PostgreSQL database. To use the extension, you have to first create it in your database. You can install the extension, by connecting to your database and running the CREATE EXTENSION command from the psql command prompt:

 

 

SELECT CREATE_EXTENSION('vector');

 

 

The pgvector extension introduces a data type called VECTOR that can be used during the creation of a table to indicate that a column will hold vector embeddings. When creating the column, it’s essential to specify the dimension of the vectors. In our scenario, Azure AI Vision generates 1024-dimensional vectors.

 

Insert data into a PostgreSQL table

 

To insert data into an Azure Cosmos DB for PostgreSQL table, we will proceed as follows:

  1. Create a table to store the filenames of the images, their embeddings, and their associated metadata. All information is saved in a CSV file, as presented in the previous post.
  2. Insert the data from the CSV file into the table using the PostgreSQL COPY command.

 

 

import os
import psycopg2
from psycopg2 import pool
from dotenv import load_dotenv

# Constants
IMAGE_FILE_COLUMN_NAME = "image_file"
DESCRIPTION_COLUMN_NAME = "description"
AUTHOR_COLUMN_NAME = "author"
TITLE_COLUMN_NAME = "title"
TECHNIQUE_COLUMN_NAME = "technique"
TYPE_COLUMN_NAME = "type"
TIMEFRAME_COLUMN_NAME = "timeframe"
VECTOR_COLUMN_NAME = "vector"

# Directories
current_dir = os.path.dirname(os.path.realpath(__file__))
parent_dir = os.path.dirname(current_dir)

# Load environemt file
load_dotenv(os.path.join(parent_dir, ".env"), override=True)
# Azure CosmosDB for PostgreSQL credentials
postgres_host = os.getenv("POSTGRES_HOST")
postgres_database_name = os.getenv("POSTGRES_DB_NAME")
postgres_user = os.getenv("POSTGRES_USER")
postgres_password = os.getenv("POSTGRES_PASSWORD")
sslmode = "require"
table_name = os.getenv("POSTGRES_TABLE_NAME")
postgres_connection_string = (
    f"host={postgres_host} user={postgres_user} "
    f"dbname={postgres_database_name} "
    f"password={postgres_password} sslmode={sslmode}"
)

# Dataset's folder
dataset_folder = os.path.join(parent_dir, "dataset")
dataset_filepath = os.path.join(dataset_folder, "dataset_embeddings.csv")


def main():
    postgresql_pool = psycopg2.pool.SimpleConnectionPool(1, 20, postgres_connection_string)
    if (postgresql_pool):
        print("Connection pool created successfully")

    # Get a connection from the connection pool
    conn = postgresql_pool.getconn()
    cursor = conn.cursor()

    print("Creating a table...")
    cursor.execute(f"DROP TABLE IF EXISTS {table_name};")
    cursor.execute(
        f"CREATE TABLE {table_name} ("
        f"{IMAGE_FILE_COLUMN_NAME} TEXT PRIMARY KEY,"
        f"{DESCRIPTION_COLUMN_NAME} TEXT NOT NULL,"
        f"{AUTHOR_COLUMN_NAME} TEXT NOT NULL,"
        f"{TITLE_COLUMN_NAME} TEXT NOT NULL,"
        f"{TECHNIQUE_COLUMN_NAME} TEXT,"
        f"{TYPE_COLUMN_NAME} TEXT,"
        f"{TIMEFRAME_COLUMN_NAME} TEXT,"
        f"{VECTOR_COLUMN_NAME} VECTOR(1024) NOT NULL);"
    )

    print("Saving data to table...")
    with open(dataset_filepath) as csv_file:
        cursor.copy_expert(
            f"COPY {table_name} FROM STDIN WITH "
            f"(FORMAT csv, DELIMITER '\t', HEADER MATCH);",
            csv_file
        )

    conn.commit()

    # Fetch all rows from table
    cursor.execute(f"SELECT * FROM {table_name};")
    rows = cursor.fetchall()
    print(f"Number of records in the table: {len(rows)}")

    # Close the connection
    cursor.close()
    conn.close()

 

 

Detect similar images using the pgvector extension

 

The image similarity search workflow that we will follow is summarized as follows:

  1. Use the Azure AI Vision Vectorize Image API or the Vectorize Text API to generate the vector embedding of a reference image or text prompt, respectively. It is crucial to employ the same embedding model for queries as the one used to generate embeddings for the images in the dataset.
  2. To calculate similarity and retrieve images, utilize SQL SELECT statements and the built-in vector operators of the PostgreSQL database. Specifically, cosine similarity will be used as the similarity metric.
  3. The similarity search will produce a list of vectors that are most similar to the query vector. The raw data associated with each vector can then be accessed.
  4. Download the images from the Azure Blob Storage container and display them using the matplotlib package.

 

This workflow is illustrated in the following diagram:

 

ipynb-search-flow.drawio.png

 

Nearest neighbor search using pgvector

 

Given the vector embedding of the query, we can use SQL SELECT statements to search for similar images. Let’s understand how a simple SELECT statement works. Consider the following query:

 

 

SELECT * FROM table ORDER BY embedding <=> '[0.003, …, 0.034]' LIMIT 5

 

 

This query computes the cosine distance (<=>) between the given vector ([0.003, …, 0.034]) and the vectors stored in the table, sorts the results by the calculated distance, and returns the five most similar images (LIMIT 5). Additionally, you can obtain the cosine similarity between the query vector and the retrieved vectors by modifying the SELECT statement as follows:

 

 

SELECT image_title, 1 - (vector <=> '[0.003, …, 0.034]') AS cosine_similarity FROM table ORDER BY cosine_similarity DESC LIMIT 5;

 

 

The pgvector extension provides 3 operators that can be used to calculate similarity:

 

Operator Description
<-> Euclidean distance
<#> Negative inner product
<=> Cosine distance

 

Code sample: Image similarity search

 

In the Jupyter Notebook provided on my GitHub repository, you'll explore the following scenarios:

  1. Text-to-image search: You will use a text prompt to search for and identify paintings that are semantically similar, relying solely on the vector embeddings without utilizing image metadata, such as the title or description of the painting.
  2. Image-to-image search: You will use a painting as a reference to search for similar ones by comparing the vector embedding of the reference image with those in the collection.
  3. Metadata filtering: Filtering enables users to narrow down search results, such as searching for paintings by a specific artist. However, implementing accurate and fast metadata filtering in vector search systems is a challenging task. You can read the article The Missing WHERE Clause in Vector Search on the Pinecone blog to learn about the two fundamental approaches for metadata filtering and understand the complexities involved in implementing such filters into vector search applications.

 

Images retrieved by searching for paintings using the text prompt “flowers by Vincent van Gogh”.Images retrieved by searching for paintings using the text prompt “flowers by Vincent van Gogh”.

 

Next steps

 

In this post, you explored the basic vector similarity search features offered by the pgvector extension. This type of vector search is referred to as exact nearest neighbor search, as it computes the similarity between the query vector and every vector in the database. In the upcoming post, you will explore approximate nearest neighbor search, which trades off result quality for speed.

 

If you want to explore pgvector's features, check out these learning resources:

Co-Authors
Version history
Last update:
‎Feb 28 2024 12:42 PM
Updated by: