Draft

Databricks RAG Chatbot Series - Step 3 – Creating a Vector Database

Author

Zohar Kapach

Published

March 7, 2025

Create a Mosaic AI Vector DB in Unity Catalog

To enable efficient retrieval for the RAG chatbot, we need to set up a vector database using Mosaic AI. This includes creating the index and deploying an endpoint.

Install Libraries

This notebook has been tested on Databricks Runtime 16.2 ML and Serverless (Environment version 2)

%load_ext autoreload
%autoreload 2 
# To disable autoreload; run %autoreload 0
%%capture
%pip install databricks-vectorsearch
dbutils.library.restartPython()

Imports and Variables

%run ./00_setup
import os
import sys
# Add the project root to sys.path to make raglib importable
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), "..")))

from databricks.vector_search.client import VectorSearchClient

from raglib.shell.helpers import endpoint_exists, index_exists
from raglib.models import MODEL_CONFIGS
VECTOR_SEARCH_SOURCE_TABLE_NAME = f"{UC_NAME}.{SCHEMA_NAME_SILVER}.{SILVER_SUMMARIZED_CONVERSATIONS_TABLE_NAME}" 
VECTOR_SEARCH_INDEX_NAME = f"{UC_NAME}.{SCHEMA_NAME_GOLD}.{VECTOR_STORAGE_NAME}"

Create Vector Search Endpoint and Index

# Create Vector Search Client
vsc = VectorSearchClient()
#TODO: for prod use service principal to set up client
# The following line uses the service principal token for authentication
# client = VectorSearchClient(service_principal_client_id=<CLIENT_ID>,service_principal_client_secret=<CLIENT_SECRET>)
[NOTICE] Using a notebook authentication token. Recommended for development only. For improved performance, please use Service Principal based authentication. To disable this message, pass disable_notice=True to VectorSearchClient().
# Create Vector Search Endpoint if it doesnt exist
# NOTE: it can take 10+ minutes for the endpoint to be provisioned
if not endpoint_exists(vsc, VECTOR_SEARCH_ENDPOINT_NAME):
  vsc.create_endpoint(name=VECTOR_SEARCH_ENDPOINT_NAME, endpoint_type="STANDARD") # endpoint_type must be Standard according to docs
  vsc.wait_for_endpoint(name=VECTOR_SEARCH_ENDPOINT_NAME)
# Create Vector Search Index if it doesnt exist otherwise sync
if not index_exists(vsc, VECTOR_SEARCH_ENDPOINT_NAME, VECTOR_SEARCH_INDEX_NAME):
  try:
    spark.sql(f"CREATE SCHEMA IF NOT EXISTS {UC_NAME}.{SCHEMA_NAME_GOLD}")
    vsc.create_delta_sync_index_and_wait(
      endpoint_name=VECTOR_SEARCH_ENDPOINT_NAME,
      index_name=VECTOR_SEARCH_INDEX_NAME,
      source_table_name=VECTOR_SEARCH_SOURCE_TABLE_NAME,
      pipeline_type="TRIGGERED", # Other option is CONTINUOUS which will sync every few seconds
      primary_key=MODEL_CONFIGS["embedding"]["params"]["primary_key"],
      embedding_source_column=MODEL_CONFIGS["embedding"]["params"]["source_col"], #The column containing our text
      embedding_model_endpoint_name=MODEL_CONFIGS["embedding"]["model_name"]
    )
  except Exception as e:
    raise e
else:
  #Trigger a sync to update our vs content with the new data saved in the table
  vsc.get_index(VECTOR_SEARCH_ENDPOINT_NAME, VECTOR_SEARCH_INDEX_NAME).sync()
---------------------------------------------------------------------------
HTTPError                                 Traceback (most recent call last)
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-6ce0e635-215d-4d8a-a321-d38fa7be53c5/lib/python3.11/site-packages/databricks/vector_search/utils.py:126, in RequestUtils.issue_request(url, method, token, params, json, verify, auth, data, headers)
    125 try:
--> 126     response.raise_for_status()
    127 except Exception as e:
File /databricks/python/lib/python3.11/site-packages/requests/models.py:1021, in Response.raise_for_status(self)
   1020 if http_error_msg:
-> 1021     raise HTTPError(http_error_msg, response=self)
HTTPError: 400 Client Error: Bad Request for url: https://dbc-464ba720-0425.cloud.databricks.com/api/2.0/vector-search/endpoints/vs_endpoint/indexes/cma_ragchat.gold.vector_storage/sync
During handling of the above exception, another exception occurred:
Exception                                 Traceback (most recent call last)
File <command-8893851667757136>, line 18
     15     raise e
     16 else:
     17   #Trigger a sync to update our vs content with the new data saved in the table
---> 18   vsc.get_index(VECTOR_SEARCH_ENDPOINT_NAME, VECTOR_SEARCH_INDEX_NAME).sync()
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-6ce0e635-215d-4d8a-a321-d38fa7be53c5/lib/python3.11/site-packages/databricks/vector_search/index.py:209, in VectorSearchIndex.sync(self)
    204 def sync(self):
    205     """
    206     Sync the index. This is used to sync the index with the source delta table.
    207     This only works with managed delta sync index with pipeline type="TRIGGERED".
    208     """
--> 209     return RequestUtils.issue_request(
    210         url=f"{self.workspace_url}/api/2.0/vector-search/endpoints/{self.endpoint_name}/indexes/{self.name}/sync",
    211         token=self._get_token_for_request(control_plane=True),
    212         method="POST",
    213     )
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-6ce0e635-215d-4d8a-a321-d38fa7be53c5/lib/python3.11/site-packages/databricks/vector_search/utils.py:128, in RequestUtils.issue_request(url, method, token, params, json, verify, auth, data, headers)
    126     response.raise_for_status()
    127 except Exception as e:
--> 128     raise Exception(
    129         f"Response content {response.content}, status_code {response.status_code}"
    130     )
    131 return response.json()
Exception: Response content b'{"error_code":"BAD_REQUEST","message":"Index is not ready to sync yet. Pipeline is in state RUNNING and needs to be in one of the following states to sync: COMPLETED, FAILED, CANCELED."}', status_code 400