强强联合!LangChain携手CrewAI共创基于RAG的智能问答解答系统,开启智慧查询新篇章

在当今数字化时代,企业和教育机构每天都会收到海量的咨询问题。无论是客户支持、销售团队的提问,还是内部员工的咨询,手动回复这些问题不仅耗时费力,还容易出现回答不一致的情况。而基于人工智能的查询解答系统,能够快速、准确且高效地提供答案,极大地提升了工作效率和用户体验。

今天,我们就来聊聊如何利用LangChain、ChromaDB和CrewAI构建一个基于检索增强生成(RAG)的智能查询解答系统。这个系统不仅能自动处理各种问题,还能生成精准的回答,帮助企业和教育机构更好地服务用户。

为什么我们需要AI驱动的查询解答系统?

在传统的业务流程中,手动回复客户或学员的咨询问题是一个极其低效的过程。客户希望得到即时的回复,而企业则需要快速获取准确的信息来做出决策。AI驱动的查询解答系统通过自动化处理这些问题,不仅减轻了人工负担,还能提供一致且高质量的回答。

在客户服务领域,AI系统可以自动回复常见问题,提升客户满意度;在销售和市场营销中,它可以实时提供产品细节和客户洞察;在金融、医疗、教育和电商等行业,AI驱动的查询处理能够确保操作顺畅,提升用户体验。

深入了解RAG工作流程

在动手构建系统之前,我们先来了解一下检索增强生成(RAG)系统是如何工作的。RAG架构主要分为三个关键阶段:索引、检索和生成。

强强联合!LangChain与CrewAI构建基于RAG的智能查询解答系统-AI.x社区

1. 构建向量存储(文档处理与存储)

系统首先需要处理和存储相关文档,以便能够快速检索。具体步骤如下:

  • 文档切分:将大型文档切分为更小的文本块,以便高效检索。
  • 嵌入模型:利用基于AI的嵌入模型将这些文本块转换为向量表示。
  • 向量存储:将向量化的数据索引并存储在数据库(如ChromaDB)中,以便快速查找。

2. 查询处理与检索

当用户提交问题时,系统会先检索相关数据,然后再生成回答。具体步骤如下:

  • 用户查询输入:用户提交问题或请求。
  • 向量化:利用嵌入模型将查询转换为数值向量。
  • 搜索与检索:系统在向量存储中搜索最相关的文本块并检索出来。

3. 增强与回答生成

为了生成准确的回答,系统会将检索到的数据与原始查询结合。具体步骤如下:

  • 增强查询:将检索到的文档块与原始查询结合。
  • LLM处理:利用大型语言模型(LLM)根据查询和检索到的上下文生成最终回答。
  • 最终回答:系统向用户提供一个准确且富有上下文的回答。

构建基于RAG的查询解答系统

接下来,我们将通过一个实际案例,展示如何构建一个基于RAG的查询解答系统。这个系统将高效地回答学员的问题,帮助他们更好地学习。

选择合适的数据用于查询解答

在构建RAG系统之前,最重要的就是数据。一个结构良好的知识库是关键,因为回答的准确性和相关性完全依赖于数据的质量。以下是一些适合不同类型用途的数据:

  • 客户支持数据:常见问题解答(FAQ)、故障排除指南、产品手册和过去的客户互动记录。
  • 销售与市场数据:产品目录、价格详情、竞争对手分析和客户咨询记录。
  • 内部知识库:公司政策、培训文档和标准操作流程(SOP)。
  • 财务与法律文件:合规指南、财务报告和监管政策。
  • 用户生成内容:论坛讨论、聊天记录和反馈表单,这些都能提供真实的用户问题。

在我们的学员查询解答系统中,我们尝试了多种数据类型,最终发现使用课程视频的字幕是最有效的方法。字幕提供了与学员问题直接相关的结构化和详细内容,能够快速生成相关答案。

构建查询解答系统的架构

在动手编写代码之前,我们需要先规划系统的架构。系统需要完成以下三个主要任务:

  1. 从字幕文件(SRT)中提取并存储课程内容。
  2. 根据学员的查询检索相关的课程材料。
  3. 利用AI驱动的代理生成结构化的回答。

为了实现这些功能,我们将系统分为三个组件:

  • 字幕处理:从SRT文件中提取文本,处理并将其嵌入存储到ChromaDB中。
  • 检索:根据学员的查询搜索并检索相关的课程材料。
  • 查询回答代理:利用CrewAI生成结构化且准确的回答。

实现步骤

现在,我们已经规划好了系统的架构,接下来就是动手实现。

1. 导入必要的库

构建AI驱动的学习支持系统,首先需要导入一些关键的库:

import pysrtfrom langchain.text_splitter import RecursiveCharacterTextSplitterfrom langchain.schema import Documentfrom langchain.embeddings import OpenAIEmbeddingsfrom langchain.vectorstores import Chromafrom crewai import Agent, Task, Crewimport pandas as pdimport astimport osimport timefrom tqdm import tqdm

这些库的作用如下:

  • pysrt:用于从SRT字幕文件中提取文本。
  • RecursiveCharacterTextSplitter:将大段文本切分为更小的块,以便更好地检索。
  • Document:表示结构化的文本文档。
  • OpenAIEmbeddings:将文本转换为数值向量,用于相似性搜索。
  • Chroma:将嵌入存储在向量数据库中,便于高效检索。
  • CrewAI(Agent、Task、Crew):定义处理学员查询的AI代理。
  • pandas:以DataFrame形式处理结构化数据。
  • ast:将基于字符串的数据结构解析为Python对象。
  • os:提供系统级操作,如读取环境变量。
  • tqdm:在长时间运行的任务中显示进度条。

2. 设置环境

为了使用OpenAI的API进行嵌入,我们需要加载API密钥并配置模型设置。

步骤1:从本地文件中读取API密钥

with open(/home/janvi/Downloads/openai.txt, r) as file:    openai_api_key = file.read()

步骤2:将API密钥存储为环境变量

os.environ[OPENAI_API_KEY] = openai_api_key

步骤3:指定OpenAI模型

os.environ["OPENAI_MODEL_NAME"] = gpt-4o-mini

通过这些配置,我们可以确保系统能够高效地处理和存储嵌入。

3. 提取并存储字幕数据

字幕文件中包含了视频讲座的宝贵信息,是AI检索系统中结构化内容的丰富来源。有效地提取和处理字幕数据,能够让我们在回答学员问题时快速检索到相关信息。

步骤1:从SRT文件中提取文本

我们使用​​pysrt​​库从SRT文件中提取文本,并将其组织成结构化的形式,以便进一步处理和存储。

def extract_text_from_srt(srt_path):    """从SRT字幕文件中提取文本"""    subs = pysrt.open(srt_path)    text = " ".join(sub.text for sub in subs)    return text

由于课程可能包含多个字幕文件,我们需要系统地组织和迭代这些文件,以便无缝提取文本。

course_folders = {    "深度学习入门(使用PyTorch)": "C:\M\Code\GAI\Learn_queries\Subtitle_Introduction_to_Deep_Learning_Using_Pytorch",    "构建生产级RAG系统(使用LlamaIndex)": "C:\M\Code\GAI\Learn_queries\Subtitle of Building Production-Ready RAG systems using LlamaIndex",    "LangChain入门(构建生成式AI应用与代理)": "C:\M\Code\GAI\Learn_queries\Subtitle_introduction_to_langchain_using_agentic_ai"}course_srt_files = {}for course, folder_path in course_folders.items():    srt_files = []    for root, _, files in os.walk(folder_path):        srt_files.extend(os.path.join(root, file) for file in files if file.endswith(".srt"))    if srt_files:        course_srt_files[course] = srt_files

这些提取的文本将成为我们AI驱动学习支持系统的基础,使我们能够进行高级检索和查询解答。

步骤2:将字幕存储到ChromaDB

接下来,我们将课程字幕存储到ChromaDB中,包括文本切分、嵌入生成、持久化存储和成本估算。

(1)为ChromaDB设置持久化目录

​persist_directory​​是一个文件夹路径,用于保存存储的数据。这样即使程序重新启动,嵌入数据也能保留下来。

persist_directory = "./subtitles_db"
(2)将文本切分为更小的块

大型文档(如整个课程字幕)可能会超出嵌入的标记限制。为了处理这种情况,我们使用​​RecursiveCharacterTextSplitter​​将文本切分为更小的、有重叠的块,以提高搜索精度。

text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)

每个块的长度为1000个字符,为了在块之间保留上下文,我们将前一个块的200个字符包含在下一个块中。这种重叠有助于保留重要细节,提高检索精度。

(3)初始化OpenAI嵌入和ChromaDB向量存储

我们需要将文本转换为数值向量表示,以便进行相似性搜索。OpenAI的嵌入功能允许我们将课程内容编码为可以高效搜索的格式。

embeddings = OpenAIEmbeddings(openai_api_key=openai_api_key)

这里,​​OpenAIEmbeddings()​​使用我们的OpenAI API密钥初始化嵌入模型,确保每段文本都能转换为高维向量表示。

(4)初始化ChromaDB

现在,我们将这些向量嵌入存储到ChromaDB中。

vectorstore = Chroma(    collection_name="course_materials",    embedding_functinotallow=embeddings,    persist_directory=persist_directory)

​collection_name="course_materials"​​​在ChromaDB中创建了一个专门的集合,用于组织所有与课程相关的嵌入。​​embedding_functinotallow=embeddings​​​指定了OpenAI嵌入用于将文本转换为数值向量。​​persist_directory=persist_directory​​确保所有存储的嵌入在程序重新启动后仍然可用。

(5)估算存储课程数据的成本

在将文档添加到向量数据库之前,估算标记使用成本是非常重要的。由于OpenAI按每1000个标记收费,我们需要提前估算成本,以便高效管理开支。

COST_PER_1K_TOKENS = 0.0001  # 每1000个标记的成本(使用text-embedding-ada-002模型)TOKENS_PER_CHUNK_ESTIMATE = 750  # 每1000字符块的估计标记数total_tokens = 0total_cost = 0start_time = time.time()

​COST_PER_1K_TOKENS=0.0001​​​定义了使用OpenAI嵌入时每1000个标记的成本。​​TOKENS_PER_CHUNK_ESTIMATE=750​​​估计每个1000字符块包含约750个标记。​​total_tokens​​​和​​total_cost​​​变量用于跟踪整个执行过程中处理的数据量和产生的成本。​​start_time​​变量记录了开始时间,用于测量整个过程的耗时。

(6)检查并添加课程到ChromaDB

我们希望避免重新处理已经存储在向量数据库中的课程。因此,我们先查询ChromaDB,检查课程是否已经存在。如果不存在,我们则提取并存储其字幕数据。

for course, srt_list in course_srt_files.items():    existing_docs = vectorstore._collection.get(where={"course": course})    if not existing_docs[ids]:        srt_texts = [extract_text_from_srt(srt) for srt in srt_list]        course_text = "\n\n\n\n".join(srt_texts)        doc = Document(page_cnotallow=course_text, metadata={"course": course})        chunks = text_splitter.split_documents([doc])

字幕通过​​extract_text_from_srt()​​​函数提取,多个字幕文件通过​​\n\n\n\n​​​连接,以提高可读性。创建了一个​​Document​​​对象,存储完整的字幕文本及其元数据。最后,使用​​text_splitter.split_documents()​​将文本切分为更小的块,以便高效处理和检索。

(7)估算标记使用量和成本

在将块添加到ChromaDB之前,我们估算成本。

chunk_count = len(chunks)batch_tokens = chunk_count * TOKENS_PER_CHUNK_ESTIMATEbatch_cost = (batch_tokens / 1000) * COST_PER_1K_TOKENStotal_tokens += batch_tokenstotal_cost += batch_cost

​chunk_count​​​表示切分后的块数量。​​batch_tokens​​​根据块数量估算总标记数。​​batch_cost​​​计算当前课程的处理成本。​​total_tokens​​​和​​total_cost​​累加每次处理的值,以跟踪整体处理量和开支。

(8)将块添加到ChromaDB

vectorstore.add_documents(chunks)print(f"已添加课程:{course} (块数:{chunk_count}, 成本:${batch_cost:.4f})")

处理后的块被存储到ChromaDB中,以便高效检索。程序会显示添加的块数和估算的处理成本。

如果课程已经存在,则会显示以下信息:

print(f"课程已存在:{course}")

一旦所有课程处理完成,我们计算并显示最终结果。

end_time = time.time()print(f"\n课程嵌入更新完成!🚀")print(f"总处理块数:{total_tokens // TOKENS_PER_CHUNK_ESTIMATE}")print(f"估算总标记数:{total_tokens}")print(f"估算总成本:${total_cost:.4f}")print(f"总耗时:{end_time - start_time:.2f}秒")

​end_time - start_time​​计算总处理时间。系统会显示处理的块数、估算的标记使用量、总成本以及整个嵌入过程的总结。

4. 查询并回答学员问题

一旦字幕存储到ChromaDB中,系统需要一种方式来检索相关内容,以便在学员提交问题时提供答案。这个检索过程通过相似性搜索实现,它能够识别与输入问题最相关的存储文本段。

工作原理

  • 查询输入:学员提交与课程相关的问题。
  • 按课程过滤:系统确保检索仅限于相关课程材料。
  • ChromaDB中的相似性搜索:将查询转换为嵌入,ChromaDB检索最相似的存储文本块。
  • 返回顶部结果:系统选择最相关的三个文本段。
  • 格式化输出:检索到的文本被格式化并呈现为进一步处理的上下文。

def retrieve_course_materials(query: str, course):    """按课程名称检索课程材料"""    filter_dict = {"course": course}    results = vectorstore.similarity_search(query, k=3, filter=filter_dict)    return "\n\n".join([doc.page_content for doc in results])

例如:

course_name = "深度学习入门(使用PyTorch)"question = "什么是梯度下降?"context = retrieve_course_materials(query=question, course=course_name)print(context)

从输出中可以看到,ChromaDB通过相似性搜索,根据课程名称和问题检索到最相关的信息。

为什么使用相似性搜索?

  • 语义理解:与关键词搜索不同,相似性搜索能够找到与查询语义相关的文本。
  • 高效检索:系统无需扫描整个文档,只需检索最相关的部分。
  • 提升答案质量:通过按课程过滤并按相关性排序,学员能够获得高度针对性的内容。

这种机制确保学员提交问题时,能够从存储的课程材料中获得相关且上下文准确的信息。

5. 实现AI查询回答代理

检索到相关课程材料后,下一步是利用AI驱动的代理生成有意义的回答。我们使用CrewAI定义一个智能代理,负责分析查询并生成结构化的回答。

步骤1:定义代理

查询回答代理通过清晰的角色和背景故事来指导其行为,以便更好地回答学员的问题。

query_answer_agent = Agent(    role="学习支持专家",    goal="您需要为学员提供最准确的回答",    backstory="""    您是一家专注于数据科学、机器学习和生成式AI的在线教育公司学员查询解答部门的负责人。您负责回答学员关于课程内容、作业、技术问题和行政问题的咨询。您礼貌、圆滑,并且对可以改进的地方负有责任感。    """,    verbose=False)

在代码块中,我们首先定义了代理的角色为“学习支持专家”,因为它充当虚拟助教的角色,回答学员的问题。然后,我们定义了目标,确保代理在回答时优先考虑准确性和清晰性。最后,我们将​​verbose​​​设置为​​False​​,这样在不需要调试时,执行过程将保持安静。这种清晰定义的代理角色确保回答既有帮助性,又结构化,且符合教育平台的语气。

步骤2:定义任务

定义了代理之后,我们需要为其分配任务。

query_answering_task = Task(    descriptinotallow="""    尽您所能回答学员的问题。尽量保持回答简洁,不超过100个单词。    这是问题:{query}    这是从课程字幕中提取的相关内容,仅在需要时使用:{relevant_content}。    由于这些内容是从课程字幕中提取的,可能存在拼写错误,请在回答中纠正这些错误。    这是与学员之前的对话记录:{thread}。    在对话中,以“学员”开头的是学员的问题,以“支持”开头的是您的回答。请根据之前的对话适当调整您的回答。    这是学员的全名:{learner_name}。    如果不确定学员的名字,直接用“嗨”开头。    在回答的结尾添加一些适当的、鼓励性的安慰语句,例如“希望您觉得有帮助”、“希望这些信息有用。继续努力!”、“很高兴能帮到您!随时联系我。”等。    如果您不确定答案,请注明:“抱歉,我不确定这个问题的答案,我会稍后回复您。”    """,    expected_output="简洁准确的回答",    agent=query_answer_agent)

接下来,我们来分解分配给AI的任务。处理学员的查询时,​​{query}​​​代表学员的问题。回答应简洁(不超过100个单词)且准确。如果需要使用课程内容,​​{relevant_content}​​是从存储在ChromaDB中的字幕中提取的,AI必须在回答中纠正任何拼写错误。

如果存在之前的对话,​​{thread}​​​有助于保持连贯性。学员的问题以“学员”开头,而之前的回答以“支持”开头,这使得代理能够提供与上下文相关的回答。通过​​{learner_name}​​实现个性化——代理会用学员的名字称呼他们,如果不确定名字,就简单地用“嗨”开头。

为了使回答更具吸引力,AI会在结尾添加一句积极的结束语,比如“希望您觉得有帮助!”或者“随时联系我。”如果AI不确定答案,它会明确说明:“抱歉,我不确定这个问题的答案,我会稍后回复您。”这种方法确保了回答的礼貌性、清晰性和结构化,提升了学员的参与度和信任感。

步骤3:初始化CrewAI实例

现在我们已经定义了代理和任务,接下来初始化CrewAI,它能够动态处理学员的查询。

response_crew = Crew(    agents=[query_answer_agent],    tasks=[query_answering_task],    verbose=False)

​agents=[query_answer_agent]​​​将“学习支持专家”代理添加到团队中。​​tasks=[query_answering_task]​​​将查询回答任务分配给这个代理。设置​​verbose=False​​可以保持输出简洁,除非需要调试。CrewAI能够动态处理多个学员的查询,使系统具有可扩展性和高效性,能够动态处理查询。

步骤4:为多个学员的查询生成回答

设置好AI代理后,我们需要动态处理存储在结构化数据集中的学员查询。

以下代码处理存储在CSV文件中的学员查询,并使用AI代理生成回答。它首先加载包含学员查询、课程详情和对话线程的数据集。​​reply_to_query​​函数提取相关细节,如学员姓名、课程名称和当前查询。如果存在之前的对话,它会提取出来以提供上下文。如果查询包含图片,则会跳过。然后,它从ChromaDB中检索相关的课程材料,并将查询、相关内容和之前的对话发送给AI代理,以生成结构化的回答。

df = pd.read_csv(C:\M\Code\GAI\Learn_queries\filtered_data_top3_courses.csv)def reply_to_query(df, index=1):    learner_name = df.iloc[index]["thread_starter"]    course_name = df.iloc[index]["course"]    if df.iloc[index][number_of_replies] > 1:        thread = ast.literal_eval(df.iloc[index]["modified_thread"])    else:        thread = []    question = df.iloc[index]["current_query"]    if df.iloc[index][has_image] == True:        return" "    context = retrieve_course_materials(query=question, course=course_name)    response_result = response_crew.kickoff(inputs={"query": question, "relevant_content": context, "thread": thread, "learner_name": learner_name})    print(Q: , question)    print(\n)    print(A: , response_result)    print(\n\n)

测试该函数时,我们为一个查询(​​index=1​​)执行它:

reply_to_query(df, index=1)

从输出中可以看到,它能够正常工作,仅针对一个索引生成回答。

现在,我们通过所有查询进行迭代,处理每一个查询,同时处理可能出现的错误。这确保了查询解答过程的高效自动化,能够动态处理多个学员的查询。

for i in range(len(df)):    try:        reply_to_query(df, index=i)    except:        print("索引号出错:", i)        continue

为什么这一步很重要?

  • 自动化查询处理:系统能够高效处理多个学员的查询。
  • 确保上下文相关性:回答基于检索到的课程材料和之前的对话生成。
  • 可扩展性:该方法允许AI代理动态处理并回答数千个查询。
  • 提升学习支持体验:学员能够收到个性化且数据驱动的回答。

这一步确保每个学员的查询都能被分析、结合上下文并有效回答,从而提升整体学习体验。

输出示例

从输出中可以看到,回答查询的过程已经实现自动化,首先是问题,然后是回答。

未来改进方向

为了进一步提升基于RAG的查询解答系统,我们可以考虑以下改进方向:

  1. 常见问题及其解答:在查询解答框架内实现一个结构化的FAQ系统,能够即时回答常见问题,减少对人工支持的依赖。
  2. 图像处理能力:增加分析和提取图像(如截图、图表或扫描文档)相关信息的能力,将使系统在教育和客户支持领域更具 versatility。
  3. 改进图像列布尔值:完善图像列检测的逻辑,更准确地识别和处理基于图像的查询。
  4. 语义切块和不同的切块技术:尝试不同的切块策略,如语义切块、固定长度分割和混合方法,可以提高检索精度和回答的上下文理解能力。

总结

这个基于RAG的查询解答系统利用LangChain、ChromaDB和CrewAI,高效地实现了学员支持的自动化。它从课程字幕中提取文本,将其嵌入存储到ChromaDB中,并通过相似性搜索检索相关内容。CrewAI代理处理查询,参考之前的对话,并生成结构化的回答,确保回答的准确性和个性化。

该系统提升了可扩展性、检索效率和回答质量,使自主学习更加互动。未来的改进方向包括多模态支持、更好的检索优化和增强的回答生成。通过自动化查询解答,这个系统简化了学习支持流程,为学员提供了更快、更具上下文意识的回答,提升了整体参与度。

希望这篇文章能帮助你更好地理解如何构建一个基于RAG的智能查询解答系统。如果你有任何问题或想法,欢迎在评论区留言,我们一起探讨!

目前市面上也有很多现成管理平台,如Dify、RagFlow、AnythinLLM等,通过低代码的方式就可以直接实现自主搭建智能问答系统。感兴趣的都可以去看看哈!


本文转载自公众号Halo咯咯    作者:基咯咯

原文链接:​​https://mp.weixin.qq.com/s/vQiL3RjAU7--QE-EyXrarg​​


©著作权归作者所有,如需转载,请注明出处,否则将追究法律责任