%load_ext autoreload
%autoreload 2
# To disable autoreload; run %autoreload 0Create a Mosaic AI Vector DB in Unity Catalog
To enable efficient retrieval for the RAG chatbot, we need to set up a vector database using Mosaic AI. This includes creating the index and deploying an endpoint.
Install Libraries
This notebook has been tested on Databricks Runtime 16.2 ML and Serverless (Environment version 2)
%%capture
%pip install databricks-vectorsearch
dbutils.library.restartPython()Imports and Variables
%run ./00_setupimport os
import sys
# Add the project root to sys.path to make raglib importable
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), "..")))
from databricks.vector_search.client import VectorSearchClient
from raglib.shell.helpers import endpoint_exists, index_exists
from raglib.models import MODEL_CONFIGSVECTOR_SEARCH_SOURCE_TABLE_NAME = f"{UC_NAME}.{SCHEMA_NAME_SILVER}.{SILVER_SUMMARIZED_CONVERSATIONS_TABLE_NAME}"
VECTOR_SEARCH_INDEX_NAME = f"{UC_NAME}.{SCHEMA_NAME_GOLD}.{VECTOR_STORAGE_NAME}"Create Vector Search Endpoint and Index
# Create Vector Search Client
vsc = VectorSearchClient()
#TODO: for prod use service principal to set up client
# The following line uses the service principal token for authentication
# client = VectorSearchClient(service_principal_client_id=<CLIENT_ID>,service_principal_client_secret=<CLIENT_SECRET>)[NOTICE] Using a notebook authentication token. Recommended for development only. For improved performance, please use Service Principal based authentication. To disable this message, pass disable_notice=True to VectorSearchClient().
# Create Vector Search Endpoint if it doesnt exist
# NOTE: it can take 10+ minutes for the endpoint to be provisioned
if not endpoint_exists(vsc, VECTOR_SEARCH_ENDPOINT_NAME):
vsc.create_endpoint(name=VECTOR_SEARCH_ENDPOINT_NAME, endpoint_type="STANDARD") # endpoint_type must be Standard according to docs
vsc.wait_for_endpoint(name=VECTOR_SEARCH_ENDPOINT_NAME)# Create Vector Search Index if it doesnt exist otherwise sync
if not index_exists(vsc, VECTOR_SEARCH_ENDPOINT_NAME, VECTOR_SEARCH_INDEX_NAME):
try:
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {UC_NAME}.{SCHEMA_NAME_GOLD}")
vsc.create_delta_sync_index_and_wait(
endpoint_name=VECTOR_SEARCH_ENDPOINT_NAME,
index_name=VECTOR_SEARCH_INDEX_NAME,
source_table_name=VECTOR_SEARCH_SOURCE_TABLE_NAME,
pipeline_type="TRIGGERED", # Other option is CONTINUOUS which will sync every few seconds
primary_key=MODEL_CONFIGS["embedding"]["params"]["primary_key"],
embedding_source_column=MODEL_CONFIGS["embedding"]["params"]["source_col"], #The column containing our text
embedding_model_endpoint_name=MODEL_CONFIGS["embedding"]["model_name"]
)
except Exception as e:
raise e
else:
#Trigger a sync to update our vs content with the new data saved in the table
vsc.get_index(VECTOR_SEARCH_ENDPOINT_NAME, VECTOR_SEARCH_INDEX_NAME).sync()--------------------------------------------------------------------------- HTTPError Traceback (most recent call last) File /local_disk0/.ephemeral_nfs/envs/pythonEnv-6ce0e635-215d-4d8a-a321-d38fa7be53c5/lib/python3.11/site-packages/databricks/vector_search/utils.py:126, in RequestUtils.issue_request(url, method, token, params, json, verify, auth, data, headers) 125 try: --> 126 response.raise_for_status() 127 except Exception as e: File /databricks/python/lib/python3.11/site-packages/requests/models.py:1021, in Response.raise_for_status(self) 1020 if http_error_msg: -> 1021 raise HTTPError(http_error_msg, response=self) HTTPError: 400 Client Error: Bad Request for url: https://dbc-464ba720-0425.cloud.databricks.com/api/2.0/vector-search/endpoints/vs_endpoint/indexes/cma_ragchat.gold.vector_storage/sync During handling of the above exception, another exception occurred: Exception Traceback (most recent call last) File <command-8893851667757136>, line 18 15 raise e 16 else: 17 #Trigger a sync to update our vs content with the new data saved in the table ---> 18 vsc.get_index(VECTOR_SEARCH_ENDPOINT_NAME, VECTOR_SEARCH_INDEX_NAME).sync() File /local_disk0/.ephemeral_nfs/envs/pythonEnv-6ce0e635-215d-4d8a-a321-d38fa7be53c5/lib/python3.11/site-packages/databricks/vector_search/index.py:209, in VectorSearchIndex.sync(self) 204 def sync(self): 205 """ 206 Sync the index. This is used to sync the index with the source delta table. 207 This only works with managed delta sync index with pipeline type="TRIGGERED". 208 """ --> 209 return RequestUtils.issue_request( 210 url=f"{self.workspace_url}/api/2.0/vector-search/endpoints/{self.endpoint_name}/indexes/{self.name}/sync", 211 token=self._get_token_for_request(control_plane=True), 212 method="POST", 213 ) File /local_disk0/.ephemeral_nfs/envs/pythonEnv-6ce0e635-215d-4d8a-a321-d38fa7be53c5/lib/python3.11/site-packages/databricks/vector_search/utils.py:128, in RequestUtils.issue_request(url, method, token, params, json, verify, auth, data, headers) 126 response.raise_for_status() 127 except Exception as e: --> 128 raise Exception( 129 f"Response content {response.content}, status_code {response.status_code}" 130 ) 131 return response.json() Exception: Response content b'{"error_code":"BAD_REQUEST","message":"Index is not ready to sync yet. Pipeline is in state RUNNING and needs to be in one of the following states to sync: COMPLETED, FAILED, CANCELED."}', status_code 400