Draft

Databricks RAG Chatbot Series - Step 1 – Preparing an Intercom Dataset

Author

Zohar Kapach

Published

March 7, 2025

Generate Intercom Data

To enable a RAG chatbot using Intercom data, we need to create a structured schema and generate synthetic chat data. This involves defining the schema and implementing Bronze and Silver tables using Faker for synthetic data generation and Pydantic for validation.

Install Libraries

This notebook has been tested on Databricks Runtime 16.2 ML and Serverless (Environment version 2)

%load_ext autoreload
%autoreload 2 
# To disable autoreload; run %autoreload 0
%%capture
%pip install faker

Imports and Variables

%run ./00_setup
import os
import pyspark.sql.functions as F 
import sys
# Add the project root to sys.path to make raglib importable
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), "..")))

from raglib.core.data_generation import generate_fake_conversation
NUM_FAKE_BRONZE_CONVERSATIONS = 1

Generate Fake Bronze Layer Entries

The schema for Bronze is defined in entropy-exchange/rag-chatbot-databricks/data/prd_bronze_intercom.conversations.csv. We manually created a pydantic model to represent this pyspark schema export. This model is named Conversations and is located in entropy-exchange/rag-chatbot-databricks/raglib/data_generation.py. We are using faker to fill the pydantic model and create a synthetic bronze layer.

# Generate a batch of fake conversations
fake_conversations = [generate_fake_conversation() for _ in range(NUM_FAKE_BRONZE_CONVERSATIONS)]

# Convert the list of fake conversations to a Spark DataFrame
fake_conversations_df = spark.createDataFrame(fake_conversations)

# Write the DataFrame to the Unity Catalog in a new table
if UC_MANAGED_LOCATION:
  spark.sql(f"CREATE CATALOG IF NOT EXISTS {UC_NAME} MANAGED LOCATION '{UC_MANAGED_LOCATION}'")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {UC_NAME}.{SCHEMA_NAME_BRONZE}")
fake_conversations_df.write.mode("append").saveAsTable(f"{UC_NAME}.{SCHEMA_NAME_BRONZE}.{BRONZE_FAKE_CONVERSATIONS_TABLE_NAME}")

Create a Silver Layer

Silver layer should combine conversation parts so each row represents a full chat.

from pyspark.sql.functions import col, from_unixtime, struct, collect_list, explode, array_union, lit, array

bronze_fake_intercom_conversations_df = spark.read.table(f"{UC_NAME}.{SCHEMA_NAME_BRONZE}.{BRONZE_FAKE_CONVERSATIONS_TABLE_NAME}")

spark.sql(f"CREATE SCHEMA IF NOT EXISTS {UC_NAME}.{SCHEMA_NAME_SILVER}")

# Sources Table
sources_df = (
    bronze_fake_intercom_conversations_df.select(
        col("id"),
        from_unixtime(col("created_at")).alias("created_at"),
        col("state"),
        col("source.type").alias("source_type"),
        col("source.author.type").alias("source_author_type"),
        col("source.delivered_as").alias("source_delivered_as"),
        array(
            struct(
                col("source.author.email").alias("author_email"),
                col("source.author.name").alias("author_name"),
                col("source.author.type").alias("author_type"),
                col("source.type").alias("message_type"),
                col("source.subject").alias("message_subject"),
                col("source.body").alias("message_body"),
            )
        ).alias("source_part"),
    )
)

# Conversation Parts
conversation_parts_df = (
    bronze_fake_intercom_conversations_df.select(
        col("id"),
        from_unixtime(col("created_at")).alias("created_at"),
        col("state"),
        col("source.type").alias("source_type"),
        col("source.author.type").alias("source_author_type"),
        col("source.delivered_as").alias("source_delivered_as"),
        explode(col("conversation_parts.conversation_parts")).alias("cp"),
    )
    .filter(col("cp.body").isNotNull())
    .groupBy(
        "id", "created_at", "state", "source_type", "source_author_type", "source_delivered_as"
    )
    .agg(
        collect_list(
            struct(
                col("cp.author.email").alias("author_email"),
                col("cp.author.name").alias("author_name"),
                col("cp.author.type").alias("author_type"),
                col("cp.part_type").alias("message_type"),
                lit(None).cast("string").alias("message_subject"),
                col("cp.body").alias("message_body"),
            )
        ).alias("conversation_parts")
    )
)

# Final Join
final_df = (
    sources_df.join(conversation_parts_df, "id")
    .withColumn("conversation", array_union(sources_df.source_part, conversation_parts_df.conversation_parts))
    .select(
        sources_df.id, sources_df.created_at, sources_df.state, sources_df.source_type, sources_df.source_author_type, sources_df.source_delivered_as, "conversation"
    )
    .orderBy(col("id").desc())
)

final_df.write.mode("overwrite").saveAsTable(f"{UC_NAME}.{SCHEMA_NAME_SILVER}.{SILVER_CONVERSATIONS_TABLE_NAME}")

Preprocess and Clean Conversations

silver_conversations_df = spark.read.table(f"{UC_NAME}.{SCHEMA_NAME_SILVER}.{SILVER_CONVERSATIONS_TABLE_NAME}")

# Concatenate chat messages together -> author_name, message_subject, message_body
silver_cleaned_conversations_df = silver_conversations_df.withColumn(
    "cleaned_conversation",
    F.concat_ws("\n", F.transform(
        F.col("conversation"), 
        lambda x: F.concat_ws("\n", x.author_name, x.message_subject, x.message_body)
    ))
)

silver_cleaned_conversations_df.write.mode("overwrite").saveAsTable(f"{UC_NAME}.{SCHEMA_NAME_SILVER}.{SILVER_CLEANED_CONVERSATIONS_TABLE_NAME}")