%load_ext autoreload
%autoreload 2
# To disable autoreload; run %autoreload 0Generate Intercom Data
To enable a RAG chatbot using Intercom data, we need to create a structured schema and generate synthetic chat data. This involves defining the schema and implementing Bronze and Silver tables using Faker for synthetic data generation and Pydantic for validation.
Install Libraries
This notebook has been tested on Databricks Runtime 16.2 ML and Serverless (Environment version 2)
%%capture
%pip install fakerImports and Variables
%run ./00_setupimport os
import pyspark.sql.functions as F
import sys
# Add the project root to sys.path to make raglib importable
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), "..")))
from raglib.core.data_generation import generate_fake_conversationNUM_FAKE_BRONZE_CONVERSATIONS = 1Generate Fake Bronze Layer Entries
The schema for Bronze is defined in entropy-exchange/rag-chatbot-databricks/data/prd_bronze_intercom.conversations.csv. We manually created a pydantic model to represent this pyspark schema export. This model is named Conversations and is located in entropy-exchange/rag-chatbot-databricks/raglib/data_generation.py. We are using faker to fill the pydantic model and create a synthetic bronze layer.
# Generate a batch of fake conversations
fake_conversations = [generate_fake_conversation() for _ in range(NUM_FAKE_BRONZE_CONVERSATIONS)]
# Convert the list of fake conversations to a Spark DataFrame
fake_conversations_df = spark.createDataFrame(fake_conversations)
# Write the DataFrame to the Unity Catalog in a new table
if UC_MANAGED_LOCATION:
spark.sql(f"CREATE CATALOG IF NOT EXISTS {UC_NAME} MANAGED LOCATION '{UC_MANAGED_LOCATION}'")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {UC_NAME}.{SCHEMA_NAME_BRONZE}")
fake_conversations_df.write.mode("append").saveAsTable(f"{UC_NAME}.{SCHEMA_NAME_BRONZE}.{BRONZE_FAKE_CONVERSATIONS_TABLE_NAME}")Create a Silver Layer
Silver layer should combine conversation parts so each row represents a full chat.
from pyspark.sql.functions import col, from_unixtime, struct, collect_list, explode, array_union, lit, array
bronze_fake_intercom_conversations_df = spark.read.table(f"{UC_NAME}.{SCHEMA_NAME_BRONZE}.{BRONZE_FAKE_CONVERSATIONS_TABLE_NAME}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {UC_NAME}.{SCHEMA_NAME_SILVER}")
# Sources Table
sources_df = (
bronze_fake_intercom_conversations_df.select(
col("id"),
from_unixtime(col("created_at")).alias("created_at"),
col("state"),
col("source.type").alias("source_type"),
col("source.author.type").alias("source_author_type"),
col("source.delivered_as").alias("source_delivered_as"),
array(
struct(
col("source.author.email").alias("author_email"),
col("source.author.name").alias("author_name"),
col("source.author.type").alias("author_type"),
col("source.type").alias("message_type"),
col("source.subject").alias("message_subject"),
col("source.body").alias("message_body"),
)
).alias("source_part"),
)
)
# Conversation Parts
conversation_parts_df = (
bronze_fake_intercom_conversations_df.select(
col("id"),
from_unixtime(col("created_at")).alias("created_at"),
col("state"),
col("source.type").alias("source_type"),
col("source.author.type").alias("source_author_type"),
col("source.delivered_as").alias("source_delivered_as"),
explode(col("conversation_parts.conversation_parts")).alias("cp"),
)
.filter(col("cp.body").isNotNull())
.groupBy(
"id", "created_at", "state", "source_type", "source_author_type", "source_delivered_as"
)
.agg(
collect_list(
struct(
col("cp.author.email").alias("author_email"),
col("cp.author.name").alias("author_name"),
col("cp.author.type").alias("author_type"),
col("cp.part_type").alias("message_type"),
lit(None).cast("string").alias("message_subject"),
col("cp.body").alias("message_body"),
)
).alias("conversation_parts")
)
)
# Final Join
final_df = (
sources_df.join(conversation_parts_df, "id")
.withColumn("conversation", array_union(sources_df.source_part, conversation_parts_df.conversation_parts))
.select(
sources_df.id, sources_df.created_at, sources_df.state, sources_df.source_type, sources_df.source_author_type, sources_df.source_delivered_as, "conversation"
)
.orderBy(col("id").desc())
)
final_df.write.mode("overwrite").saveAsTable(f"{UC_NAME}.{SCHEMA_NAME_SILVER}.{SILVER_CONVERSATIONS_TABLE_NAME}")Preprocess and Clean Conversations
silver_conversations_df = spark.read.table(f"{UC_NAME}.{SCHEMA_NAME_SILVER}.{SILVER_CONVERSATIONS_TABLE_NAME}")
# Concatenate chat messages together -> author_name, message_subject, message_body
silver_cleaned_conversations_df = silver_conversations_df.withColumn(
"cleaned_conversation",
F.concat_ws("\n", F.transform(
F.col("conversation"),
lambda x: F.concat_ws("\n", x.author_name, x.message_subject, x.message_body)
))
)
silver_cleaned_conversations_df.write.mode("overwrite").saveAsTable(f"{UC_NAME}.{SCHEMA_NAME_SILVER}.{SILVER_CLEANED_CONVERSATIONS_TABLE_NAME}")