# graph_rag_architecture
**Repository Path**: BruceCat/graph_rag_architecture
## Basic Information
- **Project Name**: graph_rag_architecture
- **Description**: AI Agent 今天吃点啥
- **Primary Language**: Unknown
- **License**: Not specified
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 0
- **Created**: 2026-03-25
- **Last Updated**: 2026-04-10
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
```text
code/C8/
├── config.py # 配置管理
├── main.py # 主程序入口
├── requirements.txt # 依赖列表
├── rag_modules/ # 核心模块
│ ├── __init__.py
│ ├── data_preparation.py # 数据准备模块
│ ├── index_construction.py # 索引构建模块
│ ├── retrieval_optimization.py # 检索优化模块
│ └── generation_integration.py # 生成集成模块
└── vector_index/ # 向量索引缓存(自动生成)
```
# data_preparation.py这段代码在干什么,什么叫父子文档,能否用大白话给我举例讲解一下
我来用大白话给你讲解一下这段代码在干什么。
## 这段代码的核心功能
这是一个**数据准备模块**,专门用来处理菜谱文档的。它的主要工作流程是:
1. **加载文档** - 从文件夹里读取所有的 Markdown 格式的菜谱文件
2. **增强元数据** - 自动识别每个菜谱的分类(荤菜、素菜、汤品等)和难度等级(看星星标记)
3. **分割文档** - 把完整的菜谱按照标题结构切成小块
4. **建立父子关系** - 记录每个小块属于哪个完整菜谱
5. **提供查询功能** - 可以按分类、难度筛选菜谱
## 什么是"父子文档"?
让我用个通俗的例子来解释:
### 举个例子 🍳
假设你有一个完整的菜谱文件叫《红烧肉.md》,内容如下:
```markdown
# 红烧肉
## 原料
- 五花肉 500g
- 冰糖 30g
- 生抽 2 勺
## 操作步骤
### 第一步:处理猪肉
把猪肉切成方块...
### 第二步:炒糖色
锅里放油,加入冰糖...
```
**父子文档的关系就是:**
| 文档类型 | 内容 | 作用 |
| ---------- | ------------------------------- | -------------------------------------------- |
| **父文档** | 整个《红烧肉.md》文件 | 完整的菜谱,检索到的时候可以给用户看全部内容 |
| **子文档** | "## 原料" 这个小节 | 检索的时候用,因为更精确,容易匹配到具体问题 |
| **子文档** | "## 操作步骤" 这个小节 | 检索的时候用 |
| **子文档** | "### 第一步:处理猪肉" 这个小节 | 检索的时候用 |
### 为什么需要父子文档? 🤔
想象用户在问:"**红烧肉的原料有哪些?**"
**检索过程是这样的:**
1. **检索阶段**:系统拿着问题去匹配,发现"## 原料"这个**子文档**最相关(因为内容短小精悍,关键词集中)
2. **返回结果阶段**:但是只给用户看"原料"这一段不够友好啊,用户可能还想知道怎么做
3. **通过子找父**:系统通过子文档记录的 `parent_id`,找到对应的父文档《红烧肉.md》
4. **展示给用户**:把完整的红烧肉菜谱(包括原料、步骤等)都展示给用户
### 代码中的关键实现
```python
# 第 44-45 行:定义了两个列表
self.documents: List[Document] = [] # 父文档(完整食谱)
self.chunks: List[Document] = [] # 子文档(按标题分割的小块)
self.parent_child_map: Dict[str, str] = {} # 子块 ID -> 父文档 ID 的映射
# 第 210-226 行:建立父子关系
parent_id = doc.metadata["parent_id"] # 父文档的 ID
for i, chunk in enumerate(md_chunks):
child_id = str(uuid.uuid4()) # 给子块分配新 ID
chunk.metadata.update({
"parent_id": parent_id, # 记录它属于哪个父文档
"doc_type": "child", # 标记为子文档
})
self.parent_child_map[child_id] = parent_id # 建立映射关系
# 第 317-365 行:根据子文档找父文档(智能去重)
def get_parent_documents(self, child_chunks: List[Document]) -> List[Document]:
"""
比如用户搜"怎么做猪肉",可能匹配到:
- 红烧肉的"操作步骤"子块
- 回锅肉的"操作步骤"子块
这个函数就通过这些子块,找到对应的完整菜谱(父文档),
并且按相关性排序(匹配到的子块越多,说明这个菜谱越相关)
"""
```
### 实际使用场景 💡
```python
# 1. 创建数据准备模块
preparation = DataPreparationModule("data/recipes")
# 2. 加载所有菜谱
documents = preparation.load_documents() # 加载了 100 个完整菜谱
# 3. 分割成小块
chunks = preparation.chunk_documents() # 分割成 500 个子块
# 4. 假设检索系统找到了相关的 3 个子块
relevant_chunks = [chunk1, chunk2, chunk3]
# 5. 通过子块找到对应的完整菜谱
parent_docs = preparation.get_parent_documents(relevant_chunks)
# 输出:从 3 个子块中找到 2 个去重父文档:红烧肉 (2 块), 回锅肉 (1 块)
# 6. 把完整的菜谱展示给用户
for doc in parent_docs:
print(f"菜品:{doc.metadata['dish_name']}")
print(f"内容:{doc.page_content}") # 完整的内容,不只是子块
```
### 总结 📝
- **父文档** = 完整的菜谱文件(给用户看的)
- **子文档** = 菜谱里的某个小节(检索用的)
- **父子映射** = 通过子文档快速找到完整菜谱的"导航系统"
这样做的好处是:**检索更精准**(小块容易匹配),**展示更完整**(给用户看全文),两全其美!
# 我能否理解这是倒排索引,用es也能实现这个功能呢
## 一、先说说什么是倒排索引
**倒排索引**就是个"反向查找表",比如:
| 关键词 | 出现在哪些文档 |
| -------- | ---------------------- |
| 红烧肉 | 文档 1, 文档 5, 文档 8 |
| 五花肉 | 文档 1, 文档 3 |
| 操作步骤 | 所有文档 |
当你搜索"红烧肉的做法"时,ES 会:
1. 提取关键词"红烧肉""做法"
2. 查倒排索引,找到包含这些词的文档
3. 返回相关文档
## 二、这段代码 vs ES 倒排索引
### **相同点** ✅
| 功能 | 这段代码 | Elasticsearch |
| -------------- | -------------------------------- | -------------------------- |
| **快速检索** | 通过子块检索,再找父文档 | 通过倒排索引直接定位文档 |
| **元数据过滤** | `filter_documents_by_category()` | term filter / range filter |
| **相关性排序** | 按匹配子块数量排序 | BM25/TF-IDF 评分 |
| **结构化存储** | metadata 存储分类、难度 | _source + fields |
### **核心区别** ❌
| 维度 | 这段代码(父子文档) | ES 倒排索引 |
| ------------ | ------------------------------------- | ----------------------------- |
| **检索粒度** | **两阶段检索**:先查子块 → 再找父文档 | **单阶段检索**:直接查文档 |
| **索引方式** | 内存中的字典映射 (`parent_child_map`) | 磁盘上的倒排索引结构 |
| **适用场景** | RAG 系统,需要完整上下文 | 通用搜索引擎 |
| **扩展性** | 数据量大时内存可能不够 | 支持分布式,海量数据 |
| **模糊匹配** | ❌ 不支持(需要配合向量检索) | ✅ 支持(通配符、正则、fuzzy) |
| **分词能力** | ❌ 无 | ✅ 强大的分词器 |
## 三、用 ES 实现同样的功能
你完全可以用 ES 的 **Parent-Child Join** 实现类似功能:
### ES 的 mapping 设计
```json
PUT /recipes
{
"mappings": {
"properties": {
"content": {
"type": "text",
"analyzer": "ik_max_word" // 中文分词
},
"dish_name": { "type": "keyword" },
"category": { "type": "keyword" },
"difficulty": { "type": "keyword" },
"doc_type": { "type": "keyword" }, // parent 或 child
"parent_id": { "type": "keyword" }
}
}
}
```
### ES 的 Parent-Child Join 方案
```json
// 1. 更新 mapping,定义 join 关系
PUT /recipes/_mapping
{
"properties": {
"join_field": {
"type": "join",
"relations": {
"parent": "child" // parent 是父文档,child 是子文档
}
}
}
}
// 2. 插入父文档(完整菜谱)
POST /recipes/_doc/1
{
"dish_name": "红烧肉",
"category": "荤菜",
"difficulty": "中等",
"content": "# 红烧肉\n完整的菜谱内容...",
"join_field": "parent" // 标记为父文档
}
// 3. 插入子文档(分割的小块)
POST /recipes/_doc/1_chunk_1?routing=1 // routing 必须指定父文档 ID
{
"section_title": "原料",
"content": "五花肉 500g, 冰糖 30g...",
"join_field": {
"name": "child", // 标记为子文档
"parent": "1" // 关联父文档 ID
}
}
// 4. 检索:先查子文档,再通过 join 找父文档
POST /recipes/_search
{
"query": {
"has_parent": { // 通过子文档找父文档
"parent_type": "parent",
"query": {
"match": {
"content": "红烧肉怎么做"
}
}
}
}
}
```
## 四、两种方案对比
### 方案 A:当前代码的实现(适合 RAG 系统)
```python
# 流程示意
用户问题 → 向量检索子块 → 通过 parent_child_map 找父文档 → 返回完整内容
```
**优势:**
- ✅ 专为 RAG 优化:子块用于向量检索(更精确),父文档用于返回(更完整)
- ✅ 简单轻量:不需要额外的 ES 服务
- ✅ 灵活可控:代码自己掌握,想怎么改就怎么改
**劣势:**
- ❌ 内存限制:所有映射关系在内存中
- ❌ 检索能力弱:没有分词、模糊匹配等高级功能
### 方案 B:使用 ES(适合通用搜索)
```python
# 流程示意
用户问题 → ES 全文检索 → has_parent 查询 → 返回父文档
```
**优势:**
- ✅ 强大的检索能力:分词、模糊匹配、同义词、拼音...
- ✅ 可扩展:支持海量数据、分布式
- ✅ 开箱即用:REST API、各种客户端
**劣势:**
- ❌ 需要额外部署和维护 ES 服务
- ❌ 对于 RAG 场景,需要配合向量检索(ES 的向量检索不如专业向量数据库)
## 五、实际建议 💡
**如果你的场景是 RAG(检索增强生成):**
| 需求 | 推荐方案 |
| ------------------------- | ---------------------------------------------- |
| 小规模数据(< 10 万文档) | **当前方案**就够了 |
| 需要向量检索 | 当前方案 + **向量数据库**(如 Milvus、Chroma) |
| 需要全文检索 | 当前方案 + **ES**(混合检索) |
| 大规模数据 + 复杂查询 | **ES 主导** + 向量数据库 |
**最佳实践(混合检索架构):**
```
用户问题
↓
├─→ 向量检索(子块)──┐
│ ├──→ 相关性融合 → 父文档去重 → 返回结果
└─→ ES 全文检索 ──────┘
```
## 六、总结 📝
1. **你的理解是对的**:这段代码确实实现了类似倒排索引的"快速查找"功能
2. **ES 能实现**:而且功能更强大(分词、模糊匹配、分布式...)
3. **但设计目标不同**:
- 当前代码:为 RAG 优化,重点是**向量检索 + 父子文档恢复**
- ES:通用搜索引擎,重点是**全文检索 + 高性能**
4. **不冲突**:可以一起用,ES 负责关键词检索,当前方案负责向量检索和父子文档管理
**一句话总结**:这是 RAG 场景下的"轻量级倒排索引",简单够用;如果需要更强的检索能力,就上 ES!
# data_preparation.py的工作原理
我来给你详细演示一下这个菜谱在代码里会被处理成什么样子!
## 一、加载阶段(`load_documents()`)
首先,这个 Markdown 文件会被读进来,变成一个**父文档**:
```python
Document(
page_content="""# 微波葱姜黑鳕鱼的做法
这道菜改编自西雅图 Veil 餐厅主厨 Johnny Zhu 的母亲 Margaret Lu 的菜谱...
预估烹饪难度:★★★
## 必备原料和工具
原料:
- 黑鳕鱼,带皮
调味料:
- 青葱
- 姜
...(省略中间内容)...
""", # 完整的菜谱内容
metadata={
"source": "E:\\agent\\llm_agent\\graph_rag_architecture\\resources\\dishes\\aquatic\\微波葱姜黑鳕鱼.md",
"parent_id": "a1b2c3d4e5f6...", # 基于文件路径的 MD5 哈希值
"doc_type": "parent", # 标记为父文档
"category": "水产", # 从'aquatic'文件夹识别出来
"dish_name": "微波葱姜黑鳕鱼", # 文件名
"difficulty": "中等" # 从'★★★'识别出来
}
)
```
**关键点:**
- ✅ 自动识别分类:从 `aquatic` 文件夹 → `"水产"`
- ✅ 自动识别难度:从 `★★★` → `"中等"`
- ✅ 提取菜名:从文件名 → `"微波葱姜黑鳕鱼"`
---
## 二、分割阶段(`chunk_documents()`)
接下来,这个父文档会被按标题分割成多个**子文档**:
### 分割后的子文档列表:
| 子块索引 | 标题层级 | 内容预览 | 元数据 |
| ----------- | -------------- | ------------------------ | ------------------------------------------------------------ |
| **Chunk 0** | `#` 主标题 | 完整的第一段介绍文字 | `{"主标题": "微波葱姜黑鳕鱼的做法"}` |
| **Chunk 1** | `##` 二级标题 | 必备原料和工具的详细内容 | `{"主标题": "...", "二级标题": "必备原料和工具"}` |
| **Chunk 2** | `##` 二级标题 | 计算部分的详细配方 | `{"主标题": "...", "二级标题": "计算"}` |
| **Chunk 3** | `##` 二级标题 | 操作步骤的详细内容 | `{"主标题": "...", "二级标题": "操作"}` |
| **Chunk 4** | `###` 三级标题 | 使用其他鱼类的做法 | `{"主标题": "...", "二级标题": "附加内容", "三级标题": "使用海鲈鱼..."}` |
| **Chunk 5** | `###` 三级标题 | 其他变化技巧 | `{"主标题": "...", "二级标题": "附加内容", "三级标题": "其他变化"}` |
### 具体的子文档示例:
```python
# Chunk 1 - 原料部分
Document(
page_content="""## 必备原料和工具
原料:
- 黑鳕鱼,带皮
调味料:
- 青葱
- 姜
- 料酒
- 酱油
- 芝麻油
- 花生油
工具:
- 密封袋""",
metadata={
"source": "E:\\...\\微波葱姜黑鳕鱼.md",
"parent_id": "a1b2c3d4e5f6...", # 指向父文档
"doc_type": "child", # 标记为子文档
"chunk_id": "uuid-随机生成",
"chunk_index": 1, # 第 1 个子块
"category": "水产",
"dish_name": "微波葱姜黑鳕鱼",
"difficulty": "中等",
"主标题": "微波葱姜黑鳕鱼的做法",
"二级标题": "必备原料和工具",
"chunk_size": 156 # 内容长度
}
)
# Chunk 3 - 操作部分
Document(
page_content="""## 操作
- 鱼片分别放入密封袋,鱼皮向下放在盘子中。
- 取葱白切丝 25g,姜去皮后切丝,10g,混合在一起后分成两半...
- 每个袋子倒入 2.5mL 料酒。
- 封好密封袋,放入微波炉中,中火(800 瓦)微波至不透明且容易散开时...
- 去除青葱和姜。
...(省略后续步骤)...""",
metadata={
"parent_id": "a1b2c3d4e5f6...", # 同一个父文档 ID
"doc_type": "child",
"chunk_id": "uuid-另一个随机值",
"chunk_index": 3,
"category": "水产",
"dish_name": "微波葱姜黑鳕鱼",
"difficulty": "中等",
"主标题": "微波葱姜黑鳕鱼的做法",
"二级标题": "操作",
"chunk_size": 520
}
)
```
---
## 三、父子映射关系(`parent_child_map`)
内存中会建立这样的映射表:
```python
{
"chunk-0-uuid": "a1b2c3d4e5f6...", # Chunk 0 → 父文档
"chunk-1-uuid": "a1b2c3d4e5f6...", # Chunk 1 → 父文档
"chunk-2-uuid": "a1b2c3d4e5f6...", # Chunk 2 → 父文档
"chunk-3-uuid": "a1b2c3d4e5f6...", # Chunk 3 → 父文档
"chunk-4-uuid": "a1b2c3d4e5f6...", # Chunk 4 → 父文档
"chunk-5-uuid": "a1b2c3d4e5f6...", # Chunk 5 → 父文档
}
```
---
## 四、实际使用场景演示
假设用户问:"**鳕鱼的微波炉做法是什么?**"
### 检索过程:
```python
# 1. 向量检索系统找到最相关的子块
relevant_chunks = [
chunk_3, # "操作"部分,包含微波炉步骤
chunk_1, # "必备原料"部分
]
# 2. 通过子块找父文档
parent_docs = preparation.get_parent_documents(relevant_chunks)
# 日志输出:
# "从 2 个子块中找到 1 个去重父文档:微波葱姜黑鳕鱼 (2 块)"
# 3. 返回完整的父文档给用户
print(parent_docs[0].page_content)
# 输出完整的菜谱,包括:介绍、原料、计算、操作、附加内容...
```
---
## 五、可视化对比
| 阶段 | 输入 | 输出 | 数量 |
| ------------ | ------------------- | ------------------ | -------- |
| **原始文件** | `微波葱姜黑鳕鱼.md` | 1 个 Markdown 文件 | 1 个 |
| **加载后** | Markdown 文件 | 1 个父 Document | 1 个 |
| **分割后** | 父 Document | 6 个子 Document | 6 个 |
| **映射关系** | 子 Document IDs | parent_child_map | 6 条记录 |
---
## 六、统计信息(`get_statistics()`)
如果只有这一个菜谱,统计结果会是:
```python
{
'total_documents': 1, # 1 个父文档
'total_chunks': 6, # 6 个子块
'categories': {
'水产': 1
},
'difficulties': {
'中等': 1
},
'avg_chunk_size': 280.5 # 平均每个子块的字符数
}
```
---
## 七、导出的元数据(`export_metadata()`)
```json
[
{
"source": "E:\\agent\\llm_agent\\graph_rag_architecture\\resources\\dishes\\aquatic\\微波葱姜黑鳕鱼.md",
"dish_name": "微波葱姜黑鳕鱼",
"category": "水产",
"difficulty": "中等",
"content_length": 1683
}
]
```
---
## 总结 📊
这个菜谱的处理流程就是:
```
原始 Markdown 文件
↓ (load_documents)
1 个父文档(完整菜谱,带分类/难度等元数据)
↓ (chunk_documents)
6 个子文档(按标题分割的小块,保留父子关系)
↓ (检索时)
通过子块快速匹配问题 → 找到父文档 → 返回完整菜谱
```
**好处:**
- 🔍 **检索精准**:用户问"鳕鱼怎么蒸",直接匹配到"操作"子块
- 📖 **展示完整**:找到相关子块后,把整个菜谱都给用户(不只是操作步骤)
- 🏷️ **过滤方便**:可以按"水产类""中等难度"筛选菜谱
# index_construction解读
我来给你详细讲解这段代码,包括注释、流程图和通俗示例!
## 一、带详细注释的代码
```python
"""
索引构建模块 - 负责把文本块转换成向量,并建立 searchable 的索引
"""
import logging
from typing import List
from pathlib import Path
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_core.documents import Document
logger = logging.getLogger(__name__)
class IndexConstructionModule:
"""索引构建模块 - 负责向量化和索引构建"""
# 类比:这个模块就像一个"图书管理员"
# 1. 把书籍(文本)转换成数字编码(向量)
# 2. 把这些编码整理成卡片目录(索引)
# 3. 以后查资料时,通过卡片目录快速找到相关书籍
def __init__(self, model_name: str = "BAAI/bge-small-zh-v1.5", index_save_path: str = "./vector_index"):
"""
初始化索引构建模块
Args:
model_name: 嵌入模型名称 - 用来把文字转成数字的 AI 模型
index_save_path: 索引保存路径 - 整理好的卡片目录存到哪里
"""
self.model_name = model_name
self.index_save_path = index_save_path
self.embeddings = None # 嵌入模型实例(文字转数字的工具)
self.vectorstore = None # 向量存储对象(整理好的卡片目录)
self.setup_embeddings() # 启动时就把工具准备好
def setup_embeddings(self):
"""初始化嵌入模型 - 准备文字转数字的工具"""
logger.info(f"正在初始化嵌入模型:{self.model_name}")
self.embeddings = HuggingFaceEmbeddings(
model_name=self.model_name,
model_kwargs={'device': 'cpu'}, # 使用 CPU 运行
encode_kwargs={'normalize_embeddings': True} # 归一化向量,方便计算相似度
)
logger.info("嵌入模型初始化完成")
def build_vector_index(self, chunks: List[Document]) -> FAISS:
"""
构建向量索引 - 把一堆文本块全部转成向量,并建立索引
Args:
chunks: 文档块列表 - 之前 data_preparation 模块分割好的子块
Returns:
FAISS 向量存储对象 - 整理好的 searchable 索引
"""
logger.info("正在构建 FAISS 向量索引...")
if not chunks:
raise ValueError("文档块列表不能为空")
# 构建 FAISS 向量存储
# 这一步会做两件事:
# 1. 调用 embeddings 把每个 chunk 的文字内容转成向量(一串数字)
# 2. 用 FAISS 算法把这些向量组织成高效的索引结构
self.vectorstore = FAISS.from_documents(
documents=chunks,
embedding=self.embeddings
)
logger.info(f"向量索引构建完成,包含 {len(chunks)} 个向量")
return self.vectorstore
def add_documents(self, new_chunks: List[Document]):
"""
向现有索引添加新文档 - 比如新增了几个菜谱,不用重建整个索引
Args:
new_chunks: 新的文档块列表
"""
if not self.vectorstore:
raise ValueError("请先构建向量索引")
logger.info(f"正在添加 {len(new_chunks)} 个新文档到索引...")
self.vectorstore.add_documents(new_chunks)
logger.info("新文档添加完成")
def save_index(self):
"""
保存向量索引到配置的路径 - 把整理好的卡片目录存到硬盘
为什么需要保存?
- 向量索引构建很耗时(要把所有文字转成向量)
- 保存后下次直接用,不用重新计算
"""
if not self.vectorstore:
raise ValueError("请先构建向量索引")
# 确保保存目录存在
Path(self.index_save_path).mkdir(parents=True, exist_ok=True)
self.vectorstore.save_local(self.index_save_path)
logger.info(f"向量索引已保存到:{self.index_save_path}")
def load_index(self):
"""
从配置的路径加载向量索引 - 从硬盘读取之前保存的索引
Returns:
加载的向量存储对象,如果加载失败返回 None
"""
if not self.embeddings:
self.setup_embeddings()
if not Path(self.index_save_path).exists():
logger.info(f"索引路径不存在:{self.index_save_path},将构建新索引")
return None
try:
self.vectorstore = FAISS.load_local(
self.index_save_path,
self.embeddings,
allow_dangerous_deserialization=True # 允许反序列化(加载本地文件)
)
logger.info(f"向量索引已从 {self.index_save_path} 加载")
return self.vectorstore
except Exception as e:
logger.warning(f"加载向量索引失败:{e},将构建新索引")
return None
def similarity_search(self, query: str, k: int = 5) -> List[Document]:
"""
相似度搜索 - 核心功能!根据问题找最相关的文本块
Args:
query: 查询文本 - 用户的问题,比如"鳕鱼怎么做"
k: 返回结果数量 - 返回前 k 个最相关的结果
Returns:
相似文档列表 - 按相关性排序的 chunk 列表
工作原理:
1. 把用户的问题也转成向量(和 chunks 用同一个模型)
2. 在向量空间里计算距离,找最近的 k 个向量
3. 返回对应的原始文本块
"""
if not self.vectorstore:
raise ValueError("请先构建或加载向量索引")
return self.vectorstore.similarity_search(query, k=k)
```
---
## 二、流程图
```mermaid
graph TD
A[开始] --> B[初始化模块]
B --> C[加载嵌入模型
BAAI/bge-small-zh-v1.5]
C --> D{有缓存索引吗?}
D -->|有 | E[加载已有索引]
D -->|没有 | F[构建新索引]
F --> G[输入:chunks 列表
6 个子块]
G --> H[调用 embeddings
把文字转成向量]
H --> I[FAISS 建立索引
组织向量结构]
I --> J[保存到硬盘
./vector_index]
E --> K[准备就绪]
J --> K
K --> L[用户提问:
'鳕鱼怎么做']
L --> M[把问题转成向量]
M --> N[在索引中搜索
最相似的 k 个向量]
N --> O[返回相关 chunks
按相似度排序]
style A fill:#e1f5ff
style K fill:#d4edda
style O fill:#fff3cd
```
---
## 三、大白话讲解 + 完整示例
### **这个模块是干啥的?**
用大白话说:**这个模块就是个"智能翻译官 + 图书管理员"**
它的工作分两步:
1. **翻译**:把所有文字翻译成"数字密码"(向量)
2. **整理**:把这些数字密码整理成可以快速查找的"卡片目录"(索引)
---
### **完整示例:处理"微波葱姜黑鳕鱼"菜谱**
假设我们有之前那个鳕鱼菜谱的 6 个 chunks:
```python
# 第 1 步:初始化模块
index_module = IndexConstructionModule(
model_name="BAAI/bge-small-zh-v1.5",
index_save_path="./vector_index"
)
# 此时发生了什么?
# ✅ 下载并加载了 AI 模型(文字转数字的工具)
# ✅ 准备好了工作环境
```
```python
# 第 2 步:准备数据(来自 data_preparation 模块的输出)
chunks = [
Document(page_content="# 微波葱姜黑鳕鱼的做法\n...", metadata={"dish_name": "微波葱姜黑鳕鱼"}),
Document(page_content="## 必备原料和工具\n...", metadata={"dish_name": "微波葱姜黑鳕鱼"}),
Document(page_content="## 计算\n...", metadata={"dish_name": "微波葱姜黑鳕鱼"}),
Document(page_content="## 操作\n...", metadata={"dish_name": "微波葱姜黑鳕鱼"}),
Document(page_content="### 使用海鲈鱼、罗非鱼...", metadata={"dish_name": "微波葱姜黑鳕鱼"}),
Document(page_content="### 其他变化\n...", metadata={"dish_name": "微波葱姜黑鳕鱼"}),
]
```
```python
# 第 3 步:构建向量索引
index_module.build_vector_index(chunks)
# 🔍 内部发生了什么?看详细过程:
# 对每个 chunk,AI 模型把它转成向量:
chunk_0_vector = [0.12, -0.45, 0.78, ..., 0.33] # 512 维的向量
chunk_1_vector = [-0.23, 0.56, -0.11, ..., 0.89]
chunk_2_vector = [0.67, -0.12, 0.34, ..., -0.45]
# ... (共 6 个向量)
# FAISS 做的事情:
# 把这 6 个向量组织成一个高效的数据结构
# 想象成:把 6 本书的"数字指纹"整理成一本卡片目录
```
```python
# 第 4 步:保存索引(避免下次重复计算)
index_module.save_index()
# 在 ./vector_index/ 目录下生成两个文件:
# - index.faiss (存储向量索引结构)
# - index.pkl (存储元数据和映射关系)
```
---
### **实际使用场景:用户提问**
```python
# 第 5 步:用户问问题
query = "微波炉怎么做鳕鱼?"
# 🔍 检索过程:
# 1. 把问题转成向量(用同一个 AI 模型)
query_vector = [0.15, -0.42, 0.75, ..., 0.31] # 也是 512 维
# 2. 计算向量之间的相似度(余弦相似度)
# 就像比较两个向量的"夹角",夹角越小越相似
similarity_scores = {
chunk_0: 0.72, # 标题介绍部分
chunk_1: 0.65, # 原料部分
chunk_2: 0.58, # 计算部分
chunk_3: 0.91, # 操作部分 ← 最相关!
chunk_4: 0.45, # 其他鱼类做法
chunk_5: 0.38, # 其他变化
}
# 3. 排序,取前 k 个
results = index_module.similarity_search(query, k=3)
# 返回的结果:
# [chunk_3, chunk_0, chunk_1] # 按相似度从高到低
```
---
### **增量更新示例**
```python
# 场景:你又加了 3 个新菜谱
new_recipe_chunks = [
Document(page_content="# 红烧肉的做法\n..."),
Document(page_content="## 原料\n..."),
Document(page_content="## 步骤\n..."),
]
# 不用重建整个索引,直接追加
index_module.add_documents(new_recipe_chunks)
# 现在索引里有:6 + 3 = 9 个 chunks
```
---
## 四、关键概念解释
### 1️⃣ **什么是向量(Vector)?**
**向量就是一串数字,用来表示文字的"语义"**
| 文本 | 向量表示(简化版,实际是 512 维) |
| ------ | --------------------------------- |
| "猫" | `[0.8, -0.2, 0.5, 0.1, ...]` |
| "狗" | `[0.75, -0.25, 0.48, 0.12, ...]` |
| "汽车" | `[-0.3, 0.6, -0.1, 0.8, ...]` |
**神奇的地方:**
- "猫"和"狗"的向量很接近(都是宠物)
- "猫"和"汽车"的向量差很远(完全不同的概念)
### 2️⃣ **FAISS 是什么?**
**FAISS = 超快的向量搜索引擎**(Facebook 开源的)
| 对比项 | 暴力搜索 | FAISS |
| ------------ | --------------------------------- | -------------------------- |
| **原理** | 逐个比较所有向量 | 用特殊数据结构组织向量 |
| **速度** | 慢(100 万个向量要比较 100 万次) | 快(可能只需要比较几万次) |
| **适用场景** | 小数据量 | 海量数据 |
### 3️⃣ **为什么要保存索引?**
```python
# 不保存的情况:
每次启动程序 → 重新计算所有向量 → 耗时 5 分钟 ❌
# 保存的情况:
第一次:计算向量 → 保存索引 → 耗时 5 分钟
第二次:直接加载索引 → 耗时 10 秒 ✅
# 节省时间:290 秒!
```
---
## 五、总结 📊
| 功能 | 作用 | 类比 |
| ---------------------- | ---------------------- | ---------------------------------- |
| `setup_embeddings()` | 加载 AI 模型 | 请个翻译官 |
| `build_vector_index()` | 把文字转成向量并建索引 | 把书翻译成数字密码,整理成卡片目录 |
| `save_index()` | 保存索引到硬盘 | 把卡片目录存到仓库 |
| `load_index()` | 从硬盘加载索引 | 从仓库取出卡片目录 |
| `similarity_search()` | 根据问题找相关文本 | 拿着问题找最匹配的卡片 |
**这个模块的核心价值:**
- 🔢 **翻译**:把文字变成可计算的向量
- 🚀 **加速**:让检索速度提升几个数量级
- 💾 **持久化**:避免重复计算,节省时间
# 意思是FAISS是一个本地的向量库吗,和sqlite是差不多的定位吗。然后mysql就相当于是Milvus 吗
你的理解**基本正确,但有些细节需要澄清**!让我用对比的方式给你讲清楚:
## 一、定位对比表
| 维度 | SQLite | MySQL | FAISS | Milvus |
| ------------ | ------------------ | ------------------ | --------------------- | --------------------- |
| **类型** | 关系型数据库 | 关系型数据库 | **向量搜索库** | 向量数据库 |
| **存储内容** | 结构化数据(表格) | 结构化数据(表格) | **向量 + 元数据** | 向量 + 元数据 |
| **部署方式** | 本地文件 | 客户端 - 服务器 | **本地库** | 客户端 - 服务器 |
| **主要用途** | 轻量级数据存储 | 企业级数据存储 | **向量相似度搜索** | 向量相似度搜索 |
| **并发能力** | 低(文件锁) | 高(行锁) | 中(内存操作) | 高(分布式) |
| **数据规模** | GB 级别 | TB-PB 级别 | **百万 - 千万级向量** | 十亿级向量 |
| **查询类型** | SQL(精确匹配) | SQL(精确匹配) | **向量相似度** | 向量相似度 + 标量过滤 |
---
## 二、详细对比
### 1️⃣ **FAISS vs SQLite**(都是本地库)
#### 相似点 ✅
| 特性 | SQLite | FAISS |
| ------------ | -------------------- | ------------------------- |
| **部署简单** | 无需服务器,一个文件 | 无需服务器,内存/文件索引 |
| **轻量级** | 适合小型项目 | 适合中小规模向量检索 |
| **嵌入式** | 直接链接到程序 | 作为库导入到 Python |
| **免费开源** | ✓ | ✓ |
#### 核心区别 ❌
```python
# SQLite 的使用方式
import sqlite3
conn = sqlite3.connect('data.db') # 创建/打开数据库文件
cursor = conn.cursor()
# 存储的是结构化数据
cursor.execute('''
CREATE TABLE recipes (
id INTEGER PRIMARY KEY,
name TEXT,
category TEXT,
difficulty INTEGER
)
''')
cursor.execute("INSERT INTO recipes VALUES (1, '红烧肉', '荤菜', 3)")
cursor.execute("SELECT * FROM recipes WHERE category = '荤菜'")
# 查询:基于条件的精确匹配
```
```python
# FAISS 的使用方式
from langchain_community.vectorstores import FAISS
# 存储的是向量(数字数组)
vectorstore = FAISS.from_documents(
documents=chunks,
embedding=embeddings
)
# 查询:基于相似度的模糊匹配
results = vectorstore.similarity_search("怎么做鳕鱼?", k=5)
# 返回最相似的 5 个文档(不是精确匹配)
```
**本质区别:**
| 方面 | SQLite | FAISS |
| ------------ | ------------------------- | ------------------------------ |
| **数据模型** | 表格(行×列) | 向量空间(点云) |
| **查询逻辑** | `WHERE category = '荤菜'` | "找和这个向量最近的点" |
| **底层算法** | B+ 树、哈希 | IVF、PQ、HNSW 等近似最近邻算法 |
---
### 2️⃣ **MySQL vs Milvus**(都是客户端 - 服务器架构)
#### 相似点 ✅
| 特性 | MySQL | Milvus |
| -------------- | ---------------------- | ----------------------- |
| **服务器模式** | 需要启动 mysqld 服务 | 需要启动 milvus 服务 |
| **客户端连接** | 通过网络协议连接 | 通过 gRPC/REST API 连接 |
| **高并发** | 支持大量并发连接 | 支持高并发向量检索 |
| **分布式** | 支持主从复制、分库分表 | 支持分布式集群 |
| **企业级** | 适合生产环境 | 适合生产环境 |
#### 核心区别 ❌
```python
# MySQL 的使用方式
import pymysql
conn = pymysql.connect(
host='localhost',
user='root',
password='123456',
database='recipes_db'
)
cursor = conn.cursor()
# 存储结构化数据
cursor.execute('''
INSERT INTO recipes (name, content, category)
VALUES (%s, %s, %s)
''', ('红烧肉', '...', '荤菜'))
# 精确查询
cursor.execute("SELECT * FROM recipes WHERE name = '红烧肉'")
```
```python
# Milvus 的使用方式
from pymilvus import connections, Collection, FieldSchema, CollectionSchema
# 连接到 Milvus 服务器
connections.connect("default", host="localhost", port="19530")
# 定义 schema
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True),
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=512),
FieldSchema(name="dish_name", dtype=DataType.VARCHAR, max_length=256)
]
schema = CollectionSchema(fields)
# 创建集合(类似表)
collection = Collection("recipes", schema)
# 插入向量数据
vectors = [[0.12, -0.45, ...], [0.78, 0.33, ...]] # 512 维向量
collection.insert([
{"id": 1, "embedding": vectors[0], "dish_name": "红烧肉"},
{"id": 2, "embedding": vectors[1], "dish_name": "清蒸鱼"}
])
# 相似度搜索
results = collection.search(
data=[[0.15, -0.42, ...]], # 查询向量
anns_field="embedding",
param={"metric_type": "COSINE", "params": {"nprobe": 10}},
limit=5
)
```
---
## 三、更准确的类比
### **正确的对应关系应该是:**
```
传统数据库领域: 向量数据库领域:
┌─────────────┐ ┌─────────────┐
│ SQLite │ ←→→→→→ │ FAISS │
│ (本地 RDBMS) │ │(本地向量库) │
└─────────────┘ └─────────────┘
┌─────────────┐ ┌─────────────┐
│ MySQL │ ←→→→→→ │ Milvus │
│(服务器 RDBMS)│ │(服务器向量库)│
└─────────────┘ └─────────────┘
```
### **为什么这样对应?**
| 维度 | SQLite → FAISS | MySQL → Milvus |
| ------------ | --------------- | -------------- |
| **部署方式** | 都是本地 | 都是服务器 |
| **使用场景** | 轻量级/开发测试 | 生产级/大规模 |
| **并发能力** | 较低 | 较高 |
| **扩展性** | 单机 | 分布式 |
---
## 四、实际选择建议
### **场景 1:个人项目 / 小数据量(< 10 万向量)**
```python
# 推荐方案:FAISS(本地)
# 优点:
# - 简单,不需要额外部署
# - 速度快,内存操作
# - 足够应对小规模数据
vectorstore = FAISS.from_documents(chunks, embeddings)
vectorstore.save_local("./vector_index")
```
### **场景 2:生产环境 / 大数据量(> 100 万向量)**
```python
# 推荐方案:Milvus(服务器)
# 优点:
# - 支持分布式,可扩展到十亿级
# - 高可用,支持故障转移
# - 支持混合查询(向量 + 标量过滤)
from pymilvus import connections, Collection
connections.connect(host="milvus-server", port="19530")
```
### **场景 3:需要同时存储结构化数据 + 向量**
```python
# 混合方案:MySQL + FAISS/Milvus
# MySQL 存储元数据和结构化信息
# 表结构:
# recipes(id, name, category, difficulty, created_at)
# FAISS/Milvus 存储向量和用于检索
# 只存:id + embedding + dish_name(用于展示)
# 查询流程:
# 1. 在向量库中搜索:"找最相似的 5 个菜谱"
# → 返回 ids: [101, 205, 308, 412, 567]
# 2. 去 MySQL 查详细信息:
# SELECT * FROM recipes WHERE id IN (101, 205, 308, 412, 567)
```
---
## 五、完整架构图
```
┌─────────────────────────────────────────────────────┐
│ 你的 RAG 系统架构 │
├─────────────────────────────────────────────────────┤
│ │
│ 用户提问:"怎么做鳕鱼?" │
│ ↓ │
│ ┌─────────────────┐ │
│ │ Embedding 模型 │ │
│ │ (文字转成向量) │ │
│ └────────┬────────┘ │
│ ↓ │
│ ↓ 查询向量 [0.15, -0.42, ...] │
│ ↓ │
│ ┌────────▼────────┐ ┌──────────────────┐ │
│ │ 向量搜索引擎 │ │ 关系型数据库 │ │
│ │ │ │ │ │
│ │ ┌───────────┐ │ │ ┌────────────┐ │ │
│ │ │ FAISS │ │ │ │ SQLite │ │ │
│ │ │ (本地) │ │ 或 │ │ (本地) │ │ │
│ │ └───────────┘ │ │ └────────────┘ │ │
│ │ │ │ │ │
│ │ ┌───────────┐ │ │ ┌────────────┐ │ │
│ │ │ Milvus │ │ │ │ MySQL │ │ │
│ │ │ (服务器) │ │ │ │ (服务器) │ │ │
│ │ └───────────┘ │ │ └────────────┘ │ │
│ └────────┬────────┘ └─────────┬────────┘ │
│ ↓ ↓ │
│ 找到相关 chunks 查元数据信息 │
│ [chunk_3, chunk_1] {category: "水产"} │
│ ↓ ↓ │
│ └──────────┬──────────────┘ │
│ ↓ │
│ 组合结果,返回给用户 │
│ │
└─────────────────────────────────────────────────────┘
```
---
## 六、总结 📊
### **你的理解对在哪里?**
✅ **对的部分:**
- FAISS 是本地库 ← 正确!
- SQLite 也是本地库 ← 正确!
- Milvus 是服务器 ← 正确!
- MySQL 也是服务器 ← 正确!
### **需要澄清的地方:**
❌ **不完全准确:**
- FAISS 和 SQLite **功能完全不同**(一个搜向量,一个存表格)
- Milvus 和 MySQL **数据类型不同**(一个处理向量,一个处理结构化数据)
- 它们**不能互相替代**,而是**互补关系**
### **最佳实践:**
```python
# 小型项目(你的菜谱 RAG)
SQLite (存用户信息、收藏记录) + FAISS (搜菜谱向量)
# 大型项目(商业 RAG 系统)
MySQL (存业务数据) + Milvus (海量向量检索) + Redis (缓存热点数据)
```
**一句话总结:**
FAISS = 本地的向量搜索引擎(和 SQLite 一样简单好用)
Milvus = 分布式的向量数据库(和 MySQL 一样强大可靠)
# retrieval_optimization做了啥
### 核心概念对比表
| 检索方式 | 原理 | 优点 | 缺点 |
| --------------------- | ---------------------------------------- | ---------------- | ---------------------- |
| **向量检索** | 理解语义("怎么做菜"能匹配到"制作方法") | 聪明,懂意思 | 可能错过关键词精准匹配 |
| **BM25 检索** | 数关键词("宫保鸡丁"必须出现这个词) | 精准,传统可靠 | 不懂同义词、近义词 |
| **混合检索 (hybrid)** | 两个都用,然后 RRF 重排 | 取长补短,最准确 | 稍微慢一点点 |
### RRF 重排是啥意思?
**通俗理解**:就像考试排名
- 向量检索考了前 5 名:小明第 2 名,小红第 4 名
- BM25 检索考了前 5 名:小明第 3 名,小刚第 1 名,小红第 5 名
**RRF 算法**会给每个人算个综合分:
- 小明:1/(60+1) + 1/(60+2) = 0.0164 + 0.0159 = **0.0323**
- 小红:1/(60+3) + 1/(60+4) = 0.0159 + 0.0154 = **0.0313**
- 小刚:1/(60+0) = **0.0167**(只在 BM25 中出现)
**最后排名**:小明 > 小红 > 小刚
**核心思想**:**在两种检索中都表现好的文档,才是真正的优质文档!**
这样解释够清楚不?
# BM25的原理是什么
## BM25 检索原理 - 大白话讲解
### 核心思想
BM25 就是个"**关键词计分器**",它的任务就是:**给文档按相关性打分,分数高的排前面**。
### 三个关键因素
BM25 主要看三个方面来打分:
| 因素 | 通俗解释 | 举例 |
| -------------------- | ------------------------ | ------------------------------------------------------------ |
| **TF (词频)** | 这个词在文档里出现了几次 | "宫保鸡丁"在文档 A 出现 5 次,在文档 B 出现 1 次 → A 的 TF 分更高 |
| **IDF (逆文档频率)** | 这个词有多稀有 | "的"出现在所有文档→IDF 很低;"宫保鸡丁"只出现在少数文档→IDF 很高 |
| **文档长度** | 文档长短要公平 | 长文档天然占优势?不行!BM25 会惩罚过长的文档 |
### BM25 公式(带注释版)
```python
BM25 分数 = Σ [ IDF(qi) × ( f(qi, D) × (k1 + 1) ) / ( f(qi, D) + k1 × (1 - b + b × |D|/avgdl) ) ]
其中:
- qi: 查询中的每个词(比如"宫保"、"鸡丁")
- f(qi, D): 词 qi 在文档 D 中出现的次数(词频 TF)
- |D|: 文档 D 的长度(通常用词数计算)
- avgdl: 所有文档的平均长度
- IDF(qi): 词 qi 的逆文档频率(越稀有的词 IDF 越高)
- k1: 调节词频饱和度的参数(一般取 1.2~2.0)
- b: 调节长度惩罚的参数(一般取 0.75)
```
### 用大白话翻译公式
**BM25 分数 = 每个词的贡献累加**
每个词的贡献 = **稀有度 × 词频得分**
其中:
- **稀有度 (IDF)**:越稀有的词权重越高
- "的"、"了"这种烂大街的词 → IDF 接近 0
- "宫保鸡丁"、"鱼香肉丝"这种专业词 → IDF 很高
- **词频得分**:不是简单的出现次数,而是有饱和效应
- 出现 1 次:得分快速上升
- 出现 5 次:得分继续上升,但没那么快了
- 出现 10 次:几乎不涨了(防止有人刷词频)
- **长度惩罚**:防止长文档占便宜
- 短文档:稍微加分
- 长文档:稍微减分
- 中等长度:不罚不奖
### 实际例子
假设用户搜索:**"宫保鸡丁怎么做"**
现在有两个文档:
**文档 A**(300 字):
> 宫保鸡丁的制作方法:准备鸡胸肉、花生、干辣椒。宫保鸡丁需要先把鸡肉切丁,然后腌制。炒宫保鸡丁时要大火快炒...("宫保"出现 3 次,"鸡丁"出现 4 次)
**文档 B**(800 字):
> 川菜有很多经典菜品,比如宫保鸡丁、鱼香肉丝、麻婆豆腐...宫保鸡丁是一道有名的菜...("宫保"出现 1 次,"鸡丁"出现 1 次)
**BM25 计算过程**:
1. **计算 IDF**(假设语料库有 1000 个文档):
- "宫保":只在 50 个文档出现 → IDF = log(1000/50) = 2.99
- "鸡丁":只在 80 个文档出现 → IDF = log(1000/80) = 2.53
- "怎么":在 500 个文档出现 → IDF = log(1000/500) = 0.69
2. **计算每个词的得分**(简化版,假设 k1=1.5, b=0.75, avgdl=500):
**文档 A**:
- "宫保":TF=3, 长度因子=0.85 → 得分 = 2.99 × (3×2.5)/(3+1.5×0.85) = **5.24**
- "鸡丁":TF=4, 长度因子=0.85 → 得分 = 2.53 × (4×2.5)/(4+1.5×0.85) = **6.38**
- "怎么":TF=0 → 得分 = 0
- **总分 = 5.24 + 6.38 = 11.62**
**文档 B**:
- "宫保":TF=1, 长度因子=1.3 → 得分 = 2.99 × (1×2.5)/(1+1.5×1.3) = **2.53**
- "鸡丁":TF=1, 长度因子=1.3 → 得分 = 2.53 × (1×2.5)/(1+1.5×1.3) = **2.14**
- "怎么":TF=0 → 得分 = 0
- **总分 = 2.53 + 2.14 = 4.67**
3. **排名**:
- 🥇 文档 A:11.62 分
- 🥈 文档 B:4.67 分
### 为什么 BM25 比简单的关键词匹配强?
| 对比项 | 简单关键词匹配 | BM25 |
| -------- | --------------------------------------- | ------------------------ |
| 词频处理 | 线性增长(出现 10 次就是 1 次的 10 倍) | 有饱和效应(防止刷词频) |
| 文档长度 | 不公平(长文档占优势) | 有长度惩罚(公平) |
| 词权重 | 所有词一样重要 | 稀有词权重高(更智能) |
| 效果 | 容易找到一堆垃圾结果 | 排序更合理 |
### 代码示例(Python 实现 BM25 核心逻辑)
```python
import math
def calculate_idf(query_terms, all_docs, docs_containing_term):
"""计算 IDF:log(总文档数 / 包含该词的文档数)"""
return math.log((all_docs - docs_containing_term + 0.5) / (docs_containing_term + 0.5) + 1)
def calculate_bm25_score(query, doc_terms, doc_length, avg_doc_length, idf_scores, k1=1.5, b=0.75):
"""计算 BM25 分数"""
score = 0.0
for term in query:
if term not in idf_scores:
continue
# 词频
tf = doc_terms.count(term)
# 长度归一化因子
length_factor = 1 - b + b * (doc_length / avg_doc_length)
# 计算该项的得分
term_score = idf_scores[term] * (tf * (k1 + 1)) / (tf + k1 * length_factor)
score += term_score
return score
```
### 总结
**BM25 的核心优势**:
1. ✅ **防作弊**:词频有饱和效应,不能靠堆砌关键词刷排名
2. ✅ **公平**:长度惩罚让短文档也有机会排前面
3. ✅ **智能**:稀有词权重高,常见词权重低
4. ✅ **高效**:计算简单,速度快,适合大规模检索
**适用场景**:
- 搜索引擎(Google、百度早期都用这个)
- 文档检索系统
- 配合向量检索做混合检索(就像你的代码里那样)
这样解释够清楚不?还有哪里不明白的随时问!
# 是怎么命中索引的?
我来详细解释一下用户从提问到命中索引的完整流程,用大白话给你讲清楚。
让我先看看整个系统的架构,然后给你举例说明:
**已查看文件** *main.py*
**已查看文件** *retrieval_optimization.py*
**已查看文件** *generation_integration.py*
好的!现在我用大白话 + 具体例子,给你讲清楚用户提问是如何命中索引的。
## 📍 索引使用完整流程举例
假设用户问了这样一句话:
> **"宫保鸡丁怎么做?"**
我来一步步拆解这句话是怎么命中索引的:
---
### 🔹 **第 1 步:查询重写(让问题更容易被索引理解)**
用户的问题先送到 `GenerationIntegrationModule` 的 `query_rewrite()` 方法:
```python
# 原始问题:"宫保鸡丁怎么做?"
# AI 分析后发现这个问题已经很明确了,不需要重写
# 返回:"宫保鸡丁怎么做?"
```
**为什么要这一步?**
如果用户问的是模糊的词,比如 `"做菜"`,AI 会把它重写成 `"简单易做的家常菜谱"`,这样更容易在索引里找到东西。
---
### 🔹 **第 2 步:提取过滤条件(可选)**
系统会检查问题里有没有分类、难度等关键词:
```python
# 检查 "宫保鸡丁怎么做?"
# - 没有分类词(如 "川菜"、"粤菜")
# - 没有难度词(如 "简单"、"困难")
# 返回:filters = {} (空字典,不过滤)
```
如果用户问的是 `"川菜里的简单菜怎么做"`,就会提取出:
```python
filters = {
"category": "川菜",
"difficulty": "简单"
}
```
---
### 🔹 **第 3 步:混合检索(核心!真正命中索引的地方)**
这里是你最关心的部分 —— **如何命中索引**。系统会同时用两种方式找:
#### **方式 A:向量检索(语义相似度)**
```python
# 把 "宫保鸡丁怎么做?" 转成向量(一串数字)
# 这个向量和索引里的所有向量进行相似度计算
# 找到语义最接近的 chunks
# 工作原理:
# 1. 用同样的嵌入模型(BAAI/bge-small-zh-v1.5)把问题转成向量
# 2. 在 FAISS 索引里计算这个向量和所有 chunks 向量的距离
# 3. 返回距离最近的几个 chunks
# 可能命中的结果:
# - "宫保鸡丁" 的完整食谱文档(语义最相关)
# - "宫保鸡丁的制作方法" 片段
# - "川菜经典菜品" 相关片段
```
**关键点**:即使用户问的不是原文的原话,只要意思相近就能命中!
#### **方式 B:BM25 检索(关键词匹配)**
```python
# 从 "宫保鸡丁怎么做?" 中提取关键词:
# - "宫保鸡丁"(最重要)
# - "做"(次要)
# - "怎么"(一般)
# 在索引里找这些词出现频率最高的 chunks
# 命中结果:
# - 标题包含 "宫保鸡丁" 的文档
# - 内容多次出现 "宫保鸡丁" 的文档
```
**关键点**:这种方式更传统,就是看关键词出现次数和位置。
---
### 🔹 **第 4 步:RRF 重排序(合并两种检索结果)**
两种检索各有优劣,系统用 RRF 算法把它们的结果合并:
```python
# 假设向量检索返回的前 3 名:
# 1. "宫保鸡丁完整食谱" (排名 1, RRF 分数 = 1/(60+0+1) = 0.0164)
# 2. "宫保鸡丁制作方法" (排名 2, RRF 分数 = 1/(60+1+1) = 0.0161)
# 3. "川菜经典做法" (排名 3, RRF 分数 = 1/(60+2+1) = 0.0159)
# 假设 BM25 检索返回的前 3 名:
# 1. "宫保鸡丁完整食谱" (排名 1, RRF 分数 = 1/(60+0+1) = 0.0164)
# 2. "宫保鸡丁简介" (排名 2, RRF 分数 = 1/(60+1+1) = 0.0161)
# 3. "宫保鸡丁制作方法" (排名 3, RRF 分数 = 1/(60+2+1) = 0.0159)
# 合并后(分数累加):
# "宫保鸡丁完整食谱": 0.0164 + 0.0164 = 0.0328 ← 双料冠军!
# "宫保鸡丁制作方法": 0.0161 + 0.0159 = 0.0320
# "宫保鸡丁简介": 0.0161 (只在 BM25 中出现)
# "川菜经典做法": 0.0159 (只在向量检索中出现)
# 最终按总分排序,返回前 3 个给下一步
```
**为什么这么做?**
两边都排名靠前的,才是真正相关的结果!
---
### 🔹 **第 5 步:如果有过滤条件,进一步筛选**
如果之前提取到了过滤条件(比如 `category="川菜"`),这时候会检查每个 chunk 的元数据:
```python
# 检查每个 chunk 的 metadata:
for chunk in retrieved_chunks:
if chunk.metadata['category'] == '川菜': # 符合条件,保留
filtered_docs.append(chunk)
```
但在我们的例子里没有过滤条件,所以跳过这一步。
---
### 🔹 **第 6 步:获取完整文档**
检索到的是小片段(chunks),系统会根据这些片段找到它们所属的完整文档:
```python
# 假设检索到的 chunks 属于以下文档:
# - chunk1: 来自 "宫保鸡丁.md" 的 "制作步骤" 部分
# - chunk2: 来自 "宫保鸡丁.md" 的 "所需食材" 部分
# - chunk3: 来自 "宫保鸡丁.md" 的 "菜品介绍" 部分
# 系统会找到完整的 "宫保鸡丁.md" 文档
relevant_docs = [Document("宫保鸡丁完整食谱")]
```
---
### 🔹 **第 7 步:生成回答**
最后,把检索到的完整文档作为上下文,送给 LLM 生成回答:
```python
# 构建提示词:
"""
你是一位专业的烹饪导师。请根据食谱信息,为用户提供详细的分步骤指导。
用户问题:宫保鸡丁怎么做?
相关食谱信息:
【食谱 1】宫保鸡丁 | 分类:川菜 | 难度:中等
[完整的宫保鸡丁食谱内容...]
请灵活组织回答...
"""
# LLM 根据这些信息生成详细回答
```
---
## 📊 **总结:索引命中的关键环节**
| 步骤 | 使用的索引类型 | 命中方式 | 对应代码位置 |
| -------------- | -------------- | --------------- | --------------------------------------- |
| **查询重写** | 无 | AI 理解问题意图 | `generation_integration.py:164-227` |
| **向量检索** | FAISS 向量索引 | 语义相似度计算 | `retrieval_optimization.py:64` |
| **BM25 检索** | BM25 倒排索引 | 关键词匹配 | `retrieval_optimization.py:65` |
| **RRF 重排序** | 无 | 分数融合算法 | `retrieval_optimization.py:68, 118-173` |
| **元数据过滤** | 文档 metadata | 字段匹配 | `retrieval_optimization.py:73-116` |
---
## 🎯 **核心命中原理**
你的系统用了 **双重保险** 来命中索引:
1. **向量索引(FAISS)**:理解问题的"意思",即使用户的表达和原文不一样也能命中
2. **BM25 索引**:匹配问题的"关键词",确保原文中高频出现的文档能被找到
3. **RRF 融合**:把两边的结果合并,确保最相关的排在前面
这种设计比单一检索更靠谱,既不会漏掉语义相关但关键词不匹配的文档,也不会漏掉关键词匹配但语义有偏差的文档!
# 究竟是怎么建立索引的?
好的!我来详细给你讲清楚索引是怎么建立的,用大白话 + 具体例子说明。
让我先看看数据准备模块,了解整个流程:
**已查看文件** *data_preparation.py*
完美!现在我用一个具体的例子,给你详细讲解索引是怎么建立的。
## 📚 索引建立完整流程举例
假设你的系统里有这么一道菜:**`宫保鸡丁.md`**
---
### 🔹 **第 1 步:加载原始文档(DataPreparationModule)**
```python
# 文件路径:E:\agent\llm_agent\graph_rag_architecture\resources\meat_dish\宫保鸡丁.md
# 文件内容:
"""
# 宫保鸡丁
川菜经典代表作之一,口味麻辣鲜香。
## 所需食材
- 鸡胸肉 500g
- 花生米 100g
- 干辣椒 20g
- 花椒 10g
## 制作步骤
1. 鸡肉切丁,用料酒腌制
2. 热锅凉油,爆香干辣椒和花椒
3. 下鸡丁翻炒至变色
4. 加入花生米,调味出锅
## 附加内容
- 火候控制:全程大火快炒
- 关键技巧:先腌制鸡肉更入味
"""
```
**这一步做了什么?**
- 读取 Markdown 文件内容
- 提取元数据(分类、难度、菜品名):
```python
metadata = {
"source": "meat_dish/宫保鸡丁.md",
"category": "荤菜", # 从路径识别
"dish_name": "宫保鸡丁", # 从文件名识别
"difficulty": "★★★", # 从内容识别
"parent_id": "abc123..." # 文件的 MD5 哈希值
}
```
---
### 🔹 **第 2 步:按标题结构分割成小块(chunking)**
系统不会把整个文档直接转成向量,而是**按 Markdown 标题切成更小的块**:
```python
# 使用 MarkdownHeaderTextSplitter 分割后,得到 4 个 chunks:
# Chunk 1: 菜品介绍
{
"page_content": "# 宫保鸡丁\n\n川菜经典代表作之一,口味麻辣鲜香。",
"metadata": {
"chunk_id": "uuid_001",
"parent_id": "abc123...",
"主标题": "宫保鸡丁",
"doc_type": "child"
}
}
# Chunk 2: 所需食材
{
"page_content": "## 所需食材\n- 鸡胸肉 500g\n- 花生米 100g\n- 干辣椒 20g\n- 花椒 10g",
"metadata": {
"chunk_id": "uuid_002",
"parent_id": "abc123...",
"主标题": "宫保鸡丁",
"二级标题": "所需食材",
"doc_type": "child"
}
}
# Chunk 3: 制作步骤
{
"page_content": "## 制作步骤\n1. 鸡肉切丁...\n2. 热锅凉油...\n3. 下鸡丁翻炒...\n4. 加入花生米...",
"metadata": {
"chunk_id": "uuid_003",
"parent_id": "abc123...",
"主标题": "宫保鸡丁",
"二级标题": "制作步骤",
"doc_type": "child"
}
}
# Chunk 4: 附加内容
{
"page_content": "## 附加内容\n- 火候控制:全程大火快炒\n- 关键技巧:先腌制鸡肉更入味",
"metadata": {
"chunk_id": "uuid_004",
"parent_id": "abc123...",
"主标题": "宫保鸡丁",
"二级标题": "附加内容",
"doc_type": "child"
}
}
```
**为什么要分割?**
- 如果把整篇文档转成一个向量,检索时会丢失细节
- 分成小块后,用户问"宫保鸡丁需要什么食材"时,能精准命中"所需食材"这个 chunk,而不是返回整篇文章
---
### 🔹 **第 3 步:向量化(Embedding)- 核心!**
现在有了很多个 chunks,接下来要把每个 chunk 的文字内容转成**向量(一串数字)**:
```python
# 使用 HuggingFaceEmbeddings 模型(BAAI/bge-small-zh-v1.5)
# 对 Chunk 1 进行向量化:
chunk_1_text = "# 宫保鸡丁\n\n川菜经典代表作之一,口味麻辣鲜香。"
vector_1 = embeddings.embed_query(chunk_1_text)
# vector_1 长什么样?这是一个 768 维的向量(数组):
vector_1 = [
0.023, -0.045, 0.089, -0.012, 0.067, ..., # 共 768 个数字
-0.034, 0.091, -0.078, 0.045, -0.023
]
# 同样地,对其他 chunks 也做同样的事:
vector_2 = embeddings.embed_query("## 所需食材\n- 鸡胸肉 500g...") # 食材 chunk 的向量
vector_3 = embeddings.embed_query("## 制作步骤\n1. 鸡肉切丁...") # 步骤 chunk 的向量
vector_4 = embeddings.embed_query("## 附加内容\n- 火候控制...") # 技巧 chunk 的向量
```
**向量的本质是什么?**
- 把文字的含义编码成一串数字
- 语义相近的内容,在向量空间里的距离也近
- 比如"宫保鸡丁怎么做"和"制作步骤"这两个 chunk 的向量会很接近
---
### 🔹 **第 4 步:构建 FAISS 索引结构**
拿到所有向量后,要用 FAISS 算法把它们组织成高效的索引:
```python
# 假设有 3 道菜,每道菜 4 个 chunks,总共 12 个向量:
all_vectors = [
vector_1, # 宫保鸡丁 - 介绍
vector_2, # 宫保鸡丁 - 食材
vector_3, # 宫保鸡丁 - 步骤
vector_4, # 宫保鸡丁 - 技巧
vector_5, # 鱼香肉丝 - 介绍
vector_6, # 鱼香肉丝 - 食材
vector_7, # 鱼香肉丝 - 步骤
vector_8, # 鱼香肉丝 - 技巧
...
]
# FAISS 做的事情:
# 1. 把所有向量组成一个巨大的矩阵(12 行 × 768 列)
# 2. 建立索引结构,方便快速查找
# 代码实现:
from langchain_community.vectorstores import FAISS
vectorstore = FAISS.from_documents(
documents=all_chunks, # 原始的 chunk 对象(带内容和 metadata)
embedding=embeddings_model # 嵌入模型
)
# 内部发生了什么?
# FAISS 会做两件事:
# 1. 自动调用 embeddings_model 把所有文本转成向量
# 2. 用高效的数据结构存储这些向量,并建立索引
```
**FAISS 索引的核心原理:**
FAISS 用了叫 **"倒排文件索引" (Inverted File Index, IVF)** 的技术:
```python
# 想象一下图书馆的图书分类:
# 1. 先把所有向量聚类成几个"簇"(类似图书分类)
# - 簇 1: 介绍类(宫保鸡丁介绍、鱼香肉丝介绍...)
# - 簇 2: 食材类(宫保鸡丁食材、鱼香肉丝食材...)
# - 簇 3: 步骤类(宫保鸡丁步骤、鱼香肉丝步骤...)
# - 簇 4: 技巧类(宫保鸡丁技巧、鱼香肉丝技巧...)
# 2. 每个簇有一个"中心向量"(质心)
# centroid_1 = [0.01, -0.03, 0.05, ...] # 介绍类的平均向量
# centroid_2 = [-0.02, 0.04, -0.01, ...] # 食材类的平均向量
# 3. 检索时:
# - 先用问题向量找到最近的几个簇
# - 只在这些簇里搜索,不用遍历所有向量
# - 速度提升几十倍!
```
---
### 🔹 **第 5 步:保存索引到硬盘**
建好的索引要保存起来,下次不用重新计算:
```python
# 保存到本地
vectorstore.save_local("./vector_index")
# 实际保存了什么文件?
# ./vector_index/
# ├── faiss.index # FAISS 索引结构(聚类信息、向量矩阵等)
# └── index.pkl # 用 pickle 序列化存储的 chunks 和 metadata
```
**为什么需要保存?**
- 向量化很耗时(1000 个菜谱可能要跑几分钟)
- 保存后下次直接加载,秒开!
---
### 🔹 **完整代码示例**
这是索引建立的完整流程代码:
```python
from rag_modules.data_preparation import DataPreparationModule
from rag_modules.index_construction import IndexConstructionModule
# 1. 初始化模块
data_module = DataPreparationModule(data_path="./resources")
index_module = IndexConstructionModule(
model_name="BAAI/bge-small-zh-v1.5",
index_save_path="./vector_index"
)
# 2. 加载文档
documents = data_module.load_documents()
# 输出:成功加载 500 个文档
# 3. 分割成 chunks
chunks = data_module.chunk_documents()
# 输出:Markdown 分块完成,共生成 2000 个 chunk
# (平均每个文档切成 4 块)
# 4. 构建向量索引
vectorstore = index_module.build_vector_index(chunks)
# 输出:正在构建 FAISS 向量索引...
# 输出:向量索引构建完成,包含 2000 个向量
# 5. 保存索引
index_module.save_index()
# 输出:向量索引已保存到:./vector_index
```
---
### 🔹 **可视化流程**
```mermaid
graph TD
A["宫保鸡丁.md
原始 Markdown 文件"] --> B[加载文档]
B --> C["Document 对象
page_content + metadata"]
C --> D[按标题分割]
D --> E["Chunk 1: 菜品介绍"]
D --> F["Chunk 2: 所需食材"]
D --> G["Chunk 3: 制作步骤"]
D --> H["Chunk 4: 附加内容"]
E --> I[嵌入模型]
F --> I
G --> I
H --> I
I --> J["向量 1:
[0.023, -0.045, ...]"]
I --> K["向量 2:
[-0.012, 0.067, ...]"]
I --> L["向量 3:
[0.089, -0.034, ...]"]
I --> M["向量 4:
[0.045, 0.091, ...]"]
J --> N[FAISS 索引构建]
K --> N
L --> N
M --> N
N --> O["FAISS 索引结构
聚类 + 倒排文件"]
O --> P[保存到硬盘]
P --> Q["./vector_index/
faiss.index + index.pkl"]
```
---
## 🎯 **总结:索引建立的关键步骤**
| 步骤 | 输入 | 输出 | 核心操作 |
| --------------- | ------------- | ------------- | ------------------------ |
| **1. 加载文档** | Markdown 文件 | Document 对象 | 读取内容 + 提取 metadata |
| **2. 分割成块** | Document | List[Chunk] | 按 Markdown 标题切割 |
| **3. 向量化** | Chunk 文本 | 768 维向量 | 用嵌入模型转成数字 |
| **4. 建索引** | 所有向量 | FAISS 对象 | 聚类 + 建立倒排索引 |
| **5. 保存** | FAISS 对象 | 硬盘文件 | 序列化存储 |
**核心思想**:
把非结构化的文本 → 切成小块 → 转成向量 → 用 FAISS 组织成高效索引 → 保存起来供后续检索使用!
# BM25是怎么知道哪些是关键词的,我有在哪里告诉他吗
好问题!让我告诉你 BM25 是怎么自动识别关键词的,以及你有没有配置过。
**已检索仓库** *BM25Retriever* 0 个结果
让我看看你的 BM25 初始化代码,以及它内部是怎么工作的:
**已查看文件** *retrieval_optimization.py 30-50*
好的!现在让我详细解释 BM25 是怎么自动识别关键词的:
## 🎯 **答案:你没有手动配置,BM25 是全自动的!**
看你的代码第 45-48 行:
```python
self.bm25_retriever = BM25Retriever.from_documents(
self.chunks, # ← 你只传入了原始文档
k=5
)
```
**你没有告诉它哪些是关键词**,BM25Retriever 会**自动分析所有文档**,自己统计出关键词!
---
## 🔍 **BM25 的自动关键词识别过程**
### **第 1 步:分词(Tokenization)**
当你传入这些文档时:
```python
chunks = [
Document("宫保鸡丁\n川菜经典,麻辣鲜香"),
Document("鱼香肉丝\n川菜名菜,酸甜可口"),
Document("清蒸鲈鱼\n粤菜经典,清淡鲜美")
]
```
BM25 会自动把文字切成一个个"词":
```python
# 中文分词效果(实际用 jieba 或其他分词器):
{
"文档 1": ["宫保鸡丁", "川菜", "经典", "麻辣", "鲜香"],
"文档 2": ["鱼香肉丝", "川菜", "名菜", "酸甜", "可口"],
"文档 3": ["清蒸", "鲈鱼", "粤菜", "经典", "清淡", "鲜美"]
}
```
---
### **第 2 步:建立倒排索引(Inverted Index)**
BM25 会创建一个巨大的统计表:
```python
# 倒排索引表(记录每个词出现在哪些文档里)
{
"宫保鸡丁": {
"出现文档": [文档 1],
"出现次数": [1],
"文档频率": 1 # 只有 1 个文档有这个词
},
"川菜": {
"出现文档": [文档 1, 文档 2],
"出现次数": [1, 1],
"文档频率": 2 # 有 2 个文档有这个词
},
"经典": {
"出现文档": [文档 1, 文档 3],
"出现次数": [1, 1],
"文档频率": 2
},
"清蒸": {
"出现文档": [文档 3],
"出现次数": [1],
"文档频率": 1 # 只在 1 个文档出现
},
...
}
```
---
### **第 3 步:计算 IDF(逆文档频率)**
这是 BM25 的核心!**越稀有的词,权重越高**:
```python
# IDF 公式:IDF(qi) = log((N - n + 0.5) / (n + 0.5))
# N = 总文档数
# n = 包含这个词的文档数
# 例子:
# "宫保鸡丁": IDF = log((3 - 1 + 0.5) / (1 + 0.5)) = log(2.5/1.5) = 0.22
# "川菜": IDF = log((3 - 2 + 0.5) / (2 + 0.5)) = log(1.5/2.5) = -0.22
# "清蒸": IDF = log((3 - 1 + 0.5) / (1 + 0.5)) = 0.22
# 看到了吗?
# - "宫保鸡丁" 只在 1 个文档出现 → IDF 高 → 更重要!
# - "川菜" 在 2 个文档出现 → IDF 低 → 不那么重要
```
**核心思想:**
- ✅ 如果一个词在很多文档都出现(如"的"、"了"、"制作"),说明它不具区分度
- ✅ 如果一个词只在少数文档出现(如"宫保鸡丁"、"佛跳墙"),说明它是关键词!
---
### **第 4 步:用户查询时的处理**
当用户搜索 **"宫保鸡丁怎么做"**:
```python
# 1. 对查询也进行分词
query_tokens = ["宫保鸡丁", "怎么", "做"]
# 2. 对每个词计算得分
for doc in all_docs:
score = 0
# 对查询中的每个词
for token in query_tokens:
# 如果这个词在这个文档里出现过
if token in doc:
# 计算 TF(词频)× IDF(稀有度)
tf = 词在文档中出现次数
idf = 之前算好的 IDF 值
# BM25 公式(简化版)
score += (tf × (k1 + 1)) / (tf + k1 × (1 - b + b × doc_len/avg_len)) × idf
# 记录这个文档的总分
doc_scores[doc] = score
# 3. 按分数排序,返回最高的几个
return top_k_docs
```
---
## 📊 **实际例子:完整流程**
假设你有这 3 个菜谱 chunk:
```python
chunks = [
"宫保鸡丁的制作方法\n鸡肉切丁,加花生米爆炒",
"鱼香肉丝的做法\n猪肉切丝,配木耳炒制",
"宫保鸡丁的食材准备\n鸡胸肉 500g,花生米 100g"
]
```
### **BM25 自动建立的索引:**
| 词语 | 文档频率 | IDF 值 | 为什么是关键词 |
| ------------ | -------- | ------ | -------------------------------- |
| **宫保鸡丁** | 2/3 | 0.18 | 虽然不是最稀有,但能精准定位菜品 |
| **鱼香肉丝** | 1/3 | 0.69 | 很稀有!是这道菜的专属词 |
| **制作方法** | 1/3 | 0.69 | 较稀有 |
| **做法** | 1/3 | 0.69 | 较稀有 |
| **鸡肉** | 1/3 | 0.69 | 食材词,有区分度 |
| **猪肉** | 1/3 | 0.69 | 食材词,有区分度 |
| **切** | 3/3 | -0.51 | 太常见了,不是关键词 |
| **炒** | 2/3 | -0.18 | 常见动词,区分度低 |
---
## 🎯 **回答你的问题**
> **"我有在哪里告诉他(BM25)哪些是关键词吗?"**
**答案:没有!你什么都没配置!**
BM25Retriever 在 `from_documents()` 初始化时,自动做了以下事情:
```python
# retrieval_optimization.py 第 45 行
self.bm25_retriever = BM25Retriever.from_documents(
self.chunks, # ← 就这一句,其他的都不用管
k=5
)
# 内部发生了什么?
# 1. 自动对所有 chunks 进行分词
# 2. 自动统计每个词的词频(TF)
# 3. 自动计算每个词的逆文档频率(IDF)
# 4. 自动建立倒排索引表
# 5. 准备好 BM25 评分所需的所有数据结构
```
**你完全不用告诉它哪些是关键词**,BM25 会根据统计学原理自动判断:
- ✅ 出现次数多的词 → 可能是关键词(但不是绝对)
- ✅ 在少数文档中集中出现的词 → 很可能是关键词
- ❌ 在所有文档都出现的词 → 肯定不是关键词(停用词)
---
## 💡 如果你想自定义(可选功能)
虽然你现在的代码没配置,但 BM25Retriever 其实支持一些自定义参数:
```python
# 可选参数(你当前用的是默认值)
bm25_retriever = BM25Retriever.from_documents(
documents=self.chunks,
k=5, # ← 返回结果数量(你已配置)
bm25_params={
'k1': 1.5, # ← 调节词频饱和度的参数(默认 1.5)
'b': 0.75, # ← 调节文档长度惩罚的参数(默认 0.75)
}
)
# 但你一般不需要改这些,默认值就很好用!
```
---
## 📋 **总结对比**
| 特性 | **向量检索(FAISS)** | **BM25 检索** |
| ------------------ | ---------------------- | --------------------- |
| **关键词识别** | ❌ 不需要,直接转成向量 | ✅ 自动分词 + 统计 IDF |
| **需要配置** | ❌ 不需要,自动向量化 | ❌ 不需要,自动建索引 |
| **工作原理** | 语义相似度 | 词频统计 + 稀有度 |
| **谁来判断关键词** | AI 模型(嵌入模型) | 统计学算法(IDF) |
**一句话概括**:
BM25 就像一个自动化的图书管理员,你给它一堆书(文档),它自己就能统计出哪些词是关键词,完全不用你操心!
# 那如果我有上万篇文档呢,还用BM25在内存里构建索引?是不是要考虑其他技术方案了?
非常好的问题!你说到了 BM25 的一个关键瓶颈。让我详细分析一下上万篇文档的场景,以及应该用什么技术方案。
**查看文件失败** *requirements.txt*
**已查看文件** *requirements.txt*
好的!现在让我详细分析上万篇文档的场景,以及 BM25 的瓶颈和替代方案。
## 🚨 **BM25 在大规模数据下的问题**
### **1️⃣ 内存占用爆炸**
假设你有 **10,000 篇文档**,每篇平均 500 字:
```python
# 当前方案(LangChain + rank_bm25)
# 内存里要存什么?
倒排索引表 = {
"宫保鸡丁": {
"doc_1": {"tf": 3, "positions": [10, 50, 120]},
"doc_15": {"tf": 1, "positions": [200]},
"doc_89": {"tf": 5, "positions": [5, 30, 60, 90, 150]},
... # 假设有 500 个文档提到宫保鸡丁
},
"鱼香肉丝": {...},
"制作": {...}, # 这个词可能在 8000 个文档出现
"方法": {...}, # 这个词可能在 7500 个文档出现
... # 中文词汇量可能有 5-10 万个词
}
# 内存占用估算:
# - 唯一词数:约 50,000 个
# - 每个词的倒排表:平均 200 个文档引用 × 16 字节/记录 = 3.2KB
# - 总内存:50,000 × 3.2KB ≈ 160MB
# 看起来还行?但实际远不止!
# - Python 字典开销大(hash 表 + 对象头)
# - 嵌套结构(dict of dict of dict)
# - 字符串对象本身
# 实际可能吃掉 1-2GB 内存!
```
### **2️⃣ 初始化速度慢**
```python
# 每次启动程序都要重建索引
self.bm25_retriever = BM25Retriever.from_documents(
self.chunks, # 10,000 篇文档
k=5
)
# 内部流程:
# 1. 对所有文档分词 → 10,000 × 500 字 = 5,000,000 字
# 2. 统计词频 → O(N) 复杂度
# 3. 计算 IDF → O(词汇量) 复杂度
# 4. 构建倒排索引 → O(N × 平均词数) 复杂度
# 预估时间:
# - 分词:约 30 秒
# - 统计 + 建索引:约 60 秒
# 总共:1.5 分钟才能启动!
```
### **3️⃣ 查询性能下降**
```python
# BM25 查询时要做什么?
query = "宫保鸡丁怎么做"
# 1. 分词
tokens = ["宫保鸡丁", "怎么", "做"]
# 2. 对每个 token,查倒排表
for token in tokens:
posting_list = inverted_index[token]
# "宫保鸡丁" → 500 个文档
# "怎么" → 3000 个文档
# "做" → 8000 个文档
# 3. 合并结果并打分
# 要处理:500 + 3000 + 8000 = 11,500 条记录
# 然后排序取 top_k
# 单次查询耗时:
# - 小规模(100 文档):~10ms
# - 中规模(1000 文档):~50ms
# - 大规模(10000 文档):~200-500ms ← 还能接受?
# 但如果并发查询呢?
# 10 个用户同时问 → 2-5 秒延迟!
```
---
## 📊 **规模对比表**
| 文档数量 | BM25 内存占用 | 初始化时间 | 单次查询 | 推荐方案 |
| ---------------- | ------------- | ---------- | -------- | ---------------- |
| **100 篇** | ~10MB | <1 秒 | <5ms | ✅ 当前方案 |
| **1,000 篇** | ~100MB | ~10 秒 | ~20ms | ✅ 当前方案 |
| **10,000 篇** | ~1-2GB | ~1.5 分钟 | ~200ms | ⚠️ 需要优化 |
| **100,000 篇** | ~10-20GB | ~15 分钟 | ~2 秒 | ❌ 必须换方案 |
| **1,000,000 篇** | ~100GB+ | >2 小时 | >10 秒 | ❌ 必须用专业引擎 |
---
## 🛠️ **大规模场景的技术方案**
### **方案 A:Elasticsearch(推荐用于 1 万 -100 万文档)**
```python
# Elasticsearch 是什么?
# - 专业的搜索引擎(基于 Lucene)
# - 支持 BM25、TF-IDF 等算法
# - 分布式、可水平扩展
# - 持久化存储(不占内存)
# 怎么用?
from elasticsearch import Elasticsearch
# 1. 连接 ES 集群
es = Elasticsearch([
"http://node1:9200",
"http://node2:9200",
])
# 2. 创建索引(类似数据库的表)
es.indices.create(index="recipes")
# 3. 批量导入文档
for i, chunk in enumerate(chunks):
es.index(
index="recipes",
id=i,
document={
"content": chunk.page_content,
"metadata": chunk.metadata
}
)
# 4. 查询(自动用 BM25)
response = es.search(
index="recipes",
query={
"match": {
"content": "宫保鸡丁怎么做"
}
},
size=5
)
# 优势:
# ✅ 支持海量数据(亿级文档)
# ✅ 查询速度快(毫秒级)
# ✅ 内存占用小(数据存在硬盘,只缓存热点)
# ✅ 支持分布式、高可用
# ✅ 自带分词器(支持中文)
# 劣势:
# ❌ 需要额外部署(Docker 或独立服务器)
# ❌ 运维成本高
# ❌ 学习曲线陡峭
```
**架构示意:**
```mermaid
graph TB
A[你的 Python 程序] --> B[Elasticsearch 集群]
B --> C[Node1: 存储部分数据
硬盘 + 内存缓存]
B --> D[Node2: 存储部分数据
硬盘 + 内存缓存]
B --> E[Node3: 存储部分数据
硬盘 + 内存缓存]
style B fill:#f9f,stroke:#333
style C fill:#bbf,stroke:#333
style D fill:#bbf,stroke:#333
style E fill:#bbf,stroke:#333
```
---
### **方案 B:Whoosh(轻量级,适合 1 万 -10 万文档)**
```python
# Whoosh 是什么?
# - 纯 Python 实现的搜索引擎
# - 类似简化版 Elasticsearch
# - 数据存在硬盘上
from whoosh.index import create_in, open_dir
from whoosh.fields import SchemaClass, TEXT, ID
from whoosh.qparser import QueryParser
# 1. 定义索引结构
class RecipeSchema(SchemaClass):
id = ID(stored=True)
content = TEXT(analyzer=whoosh.analysis.StemmingAnalyzer())
dish_name = TEXT(stored=True)
# 2. 创建索引(存在硬盘)
ix = create_in("./index_storage", RecipeSchema)
# 3. 写入文档
writer = ix.writer()
for i, chunk in enumerate(chunks):
writer.add_document(
id=str(i),
content=chunk.page_content,
dish_name=chunk.metadata['dish_name']
)
writer.commit()
# 4. 查询
with ix.searcher() as searcher:
query = QueryParser("content", ix.schema).parse("宫保鸡丁")
results = searcher.search(query, limit=5)
# 优势:
# ✅ 纯 Python,无需额外部署
# ✅ 数据持久化到硬盘
# ✅ 内存占用小
# ✅ API 简单
# 劣势:
# ❌ 性能不如 ES(单机上限约 10 万文档)
# ❌ 不支持分布式
# ❌ 中文分词需要额外配置
```
---
### **方案 C:FAISS + 关键词过滤(折中方案)**
```python
# 思路:只用 FAISS,不用 BM25
# 但检索后对结果进行关键词匹配度排序
from faiss import IndexFlatIP
import numpy as np
# 1. 正常用语义检索
vector_results = faiss_index.search(query_vector, k=50) # 先找 50 个
# 2. 对结果做关键词匹配度打分
def keyword_score(doc_text, query_tokens):
score = 0
for token in query_tokens:
if token in doc_text:
score += 1 # 简单的关键词匹配
return score
# 3. 重排序
reranked = sorted(
vector_results,
key=lambda doc: (
0.7 * semantic_similarity(doc, query) + # 语义分
0.3 * keyword_score(doc.content, query_tokens) # 关键词分
),
reverse=True
)[:5] # 取前 5 个
# 优势:
# ✅ 不需要 BM25 索引
# ✅ 内存占用可控
# ✅ 实现简单
# 劣势:
# ❌ 效果不如真正的 BM25
# ❌ 关键词匹配较粗糙
```
---
### **方案 D:Milvus / Chroma(向量数据库,适合大规模)**
```python
# 这些是专业的向量数据库
# 除了存向量,也支持元数据过滤
from pymilvus import connections, Collection, FieldSchema, CollectionSchema
# 1. 连接 Milvus
connections.connect("default", host="localhost", port="19530")
# 2. 定义 schema
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True),
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=768),
FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=65535),
FieldSchema(name="dish_name", dtype=DataType.VARCHAR, max_length=256),
]
# 3. 创建集合
schema = CollectionSchema(fields)
collection = Collection("recipes", schema)
# 4. 插入数据
collection.insert([
ids=[1, 2, 3],
embeddings=[vec1, vec2, vec3],
contents=["宫保鸡丁...", "鱼香肉丝...", ...],
dish_names=["宫保鸡丁", "鱼香肉丝", ...]
)
# 5. 创建索引(向量 + 标量)
collection.create_index("embedding", {"index_type": "IVF_FLAT"})
collection.create_index("dish_name") # 标量字段也能建索引
# 6. 混合查询(向量 + 关键词过滤)
results = collection.search(
data=[query_vector],
anns_field="embedding",
param={"metric_type": "COSINE"},
limit=5,
expr="dish_name == '宫保鸡丁'" # 可以用标量字段过滤
)
# 优势:
# ✅ 专为大规模设计(千万级向量)
# ✅ 向量 + 标量混合查询
# ✅ 分布式、高可用
# ✅ 持久化存储
# 劣势:
# ❌ 需要额外部署
# ❌ 不支持全文搜索(只能用标量字段精确过滤)
# ❌ 不能像 BM25 那样模糊匹配关键词
```
---
## 🎯 **针对你的情况的建议**
### **如果你的文档规模:**
#### **1 万篇以下:**
✅ **继续用当前的 BM25 方案**,完全没问题!
#### **1 万 -10 万篇:**
🔧 **方案选择:**
- 不想折腾 → **Whoosh**(纯 Python,简单)
- 追求性能 → **Elasticsearch**(专业,但要部署)
- 折中方案 → **FAISS + 关键词过滤**(去掉 BM25)
#### **10 万 -100 万篇:**
🚀 **强烈推荐 Elasticsearch**
- 性能好
- 可扩展
- 生态成熟
#### **100 万篇以上:**
🌟 **Elasticsearch 分布式集群 + FAISS/Milvus**
- 向量检索用 FAISS/Milvus
- 关键词检索用 Elasticsearch
- 两者结果融合
---
## 📋 **完整改造示例(如果用 Elasticsearch)**
这是你当前代码的改造版本:
```python
"""
检索优化模块 - 负责从数据库里找资料,并用多种方法提高查找准确率
"""
import logging
from typing import List, Dict, Any
from langchain_community.vectorstores import FAISS
from langchain_core.documents import Document
from elasticsearch import Elasticsearch
logger = logging.getLogger(__name__)
class RetrievalOptimizationModule:
"""检索优化模块 - 负责混合检索和过滤"""
def __init__(self, vectorstore: FAISS, chunks: List[Document], use_es: bool = False, es_hosts: list = None):
"""
初始化检索优化模块
Args:
vectorstore: FAISS 向量存储
chunks: 文档块列表
use_es: 是否使用 Elasticsearch(默认 False,保持向后兼容)
es_hosts: ES 集群地址列表(如 ["http://localhost:9200"])
"""
self.vectorstore = vectorstore
self.chunks = chunks
self.use_es = use_es
if use_es and es_hosts:
# 使用 Elasticsearch 作为 BM25 引擎
self.es = Elasticsearch(es_hosts)
self._setup_es_index()
else:
# 继续使用内存版 BM25(适合小规模数据)
from langchain_community.retrievers import BM25Retriever
self.bm25_retriever = BM25Retriever.from_documents(self.chunks, k=5)
self.setup_retrievers()
def _setup_es_index(self):
"""设置 ES 索引并导入数据"""
# 创建索引
if not self.es.indices.exists(index="recipe_chunks"):
self.es.indices.create(
index="recipe_chunks",
mappings={
"properties": {
"content": {"type": "text", "analyzer": "ik_max_word"}, # 中文分词
"metadata": {"type": "object"},
"chunk_id": {"type": "keyword"}
}
}
)
# 批量导入文档
from elasticsearch.helpers import bulk
def doc_generator():
for i, chunk in enumerate(self.chunks):
yield {
"_index": "recipe_chunks",
"_id": chunk.metadata.get('chunk_id', str(i)),
"_source": {
"content": chunk.page_content,
"metadata": chunk.metadata,
"chunk_id": chunk.metadata.get('chunk_id', str(i))
}
}
bulk(self.es, doc_generator())
logger.info(f"已向 ES 导入 {len(self.chunks)} 个文档")
def setup_retrievers(self):
"""设置向量检索器和 BM25 检索器"""
logger.info("正在设置检索器...")
# 向量检索器 - 基于语义相似度
self.vector_retriever = self.vectorstore.as_retriever(
search_type="similarity",
search_kwargs={"k": 5}
)
# ES 模式下不需要单独的 BM25 检索器,查询时直接用 ES
if not self.use_es:
# BM25 检索器 - 基于关键词匹配
from langchain_community.retrievers import BM25Retriever
self.bm25_retriever = BM25Retriever.from_documents(
self.chunks,
k=5
)
logger.info("检索器设置完成")
def hybrid_search(self, query: str, top_k: int = 3) -> List[Document]:
"""
混合检索 - 结合向量检索和 BM25 检索
Args:
query: 用户搜索的词
top_k: 最终返回几条结果
Returns:
检索到的文档列表
"""
# 向量检索
vector_docs = self.vector_retriever.invoke(query)
# BM25 检索(根据模式选择)
if self.use_es:
bm25_docs = self._es_search(query)
else:
bm25_docs = self.bm25_retriever.invoke(query)
# RRF 重排
reranked_docs = self._rrf_rerank(vector_docs, bm25_docs)
return reranked_docs[:top_k]
def _es_search(self, query: str, top_k: int = 10) -> List[Document]:
"""使用 Elasticsearch 进行 BM25 检索"""
response = self.es.search(
index="recipe_chunks",
query={
"match": {
"content": query
}
},
size=top_k
)
# 转换成 Document 对象
docs = []
for hit in response['hits']['hits']:
doc = Document(
page_content=hit['_source']['content'],
metadata=hit['_source']['metadata']
)
docs.append(doc)
return docs
# ... existing agent ...
```
---
## 💡 **总结建议**
对于你的情况(菜谱 RAG 系统):
| 现状 | 建议 |
| --------------- | ------------------------------------------ |
| **<1 万篇文档** | ✅ 继续用当前的 BM25 方案,简单好用 |
| **1-10 万篇** | 🔧 考虑换成 **Whoosh** 或 **Elasticsearch** |
| **>10 万篇** | 🚀 必须上 **Elasticsearch** 或 **Milvus** |
**关键判断标准:**
- 如果程序启动时间超过 1 分钟 → 该优化了
- 如果内存占用超过 2GB → 该换方案了
- 如果查询延迟超过 500ms → 该上专业引擎了
**最稳妥的升级路径:**
1. 先用当前方案,等业务增长
2. 遇到瓶颈时 → 上 Elasticsearch
3. 如果需要更好的向量检索 → 加 Milvus/FAISS 分布式版
# 所以BM25是不涉及向量转换的是吧?
**完全正确!BM25 完全不涉及向量转换!**
这是 BM25 和 FAISS 最本质的区别。让我给你详细对比一下:
---
## 🎯 **核心区别**
### **BM25:纯文本统计方法**
```python
# BM25 的工作流程(完全不用向量)
原始文本 = "宫保鸡丁的制作方法"
# ↓ 分词(切成词语)
["宫保鸡丁", "的", "制作", "方法"]
# ↓ 统计词频(纯数字统计)
{
"宫保鸡丁": 出现 1 次,
"制作": 出现 1 次,
"方法": 出现 1 次
}
# ↓ 计算 IDF(逆文档频率,还是纯数字)
IDF("宫保鸡丁") = log(总文档数 / 包含这个词的文档数)
# ↓ 最终得分(数学公式计算)
BM25 分数 = TF × IDF × 长度惩罚因子
# 全程都是数字运算,没有把文字转成向量!
```
**关键点:**
- ✅ 处理对象:**原始文本**(字符串)
- ✅ 核心技术:**统计学**(词频、文档频率)
- ✅ 不涉及:❌ 嵌入模型、❌ 向量空间、❌ 神经网络
- ✅ 本质:**关键词匹配算法**(和搜索引擎一样)
---
### **FAISS:必须向量化**
```python
# FAISS 的工作流程(必须先转成向量)
原始文本 = "宫保鸡丁的制作方法"
# ↓ 调用嵌入模型(AI 模型)
from langchain_huggingface import HuggingFaceEmbeddings
embeddings = HuggingFaceEmbeddings(model_name="BAAI/bge-small-zh-v1.5")
# ↓ 向量化(转成 768 维的数字数组)
vector = embeddings.embed_query("宫保鸡丁的制作方法")
# vector = [0.023, -0.045, 0.089, -0.012, ..., 0.067] ← 768 个数字
# ↓ 存入 FAISS 索引
faiss_index.add(vector)
# ↓ 查询时也要转成向量
query_vector = embeddings.embed_query("宫保鸡丁怎么做")
# ↓ 计算向量相似度(余弦相似度)
similarity = cosine_similarity(query_vector, faiss_index[0])
# 没有向量化就没法用 FAISS!
```
**关键点:**
- ✅ 处理对象:**向量**(数字数组)
- ✅ 核心技术:**线性代数**(向量乘法、距离计算)
- ✅ 必须有:✅ 嵌入模型、✅ 向量空间、✅ 归一化
- ✅ 本质:**语义相似度检索**(理解意思,不是匹配字眼)
---
## 📊 **对比表格**
| 特性 | **BM25** | **FAISS(向量检索)** |
| -------------------- | ---------------------------- | --------------------------------------- |
| **是否需要向量转换** | ❌ **不需要** | ✅ **必须需要** |
| **输入数据** | 原始文本(字符串) | 文本 → 向量(数字数组) |
| **依赖模型** | ❌ 不需要 AI 模型 | ✅ 需要嵌入模型(如 BGE、Sentence-BERT) |
| **工作原理** | 统计词频 + IDF | 向量空间中的距离计算 |
| **匹配方式** | 关键词匹配(死板) | 语义理解(灵活) |
| **计算类型** | 算术运算(加减乘除) | 线性代数(向量点积、余弦相似度) |
| **内存占用** | 倒排索引表(词典) | 向量矩阵(N×768 的浮点数) |
| **典型应用** | 传统搜索引擎(Google、百度) | 语义搜索、推荐系统 |
---
## 🔍 **实际例子对比**
假设用户搜索:**"西红柿炒鸡蛋"**
### **BM25 的处理方式:**
```python
# 1. 分词
query_tokens = ["西红柿", "炒", "鸡蛋"]
# 2. 在倒排索引里找包含这些词的文档
# (直接字符串匹配,不转弯子)
matching_docs = []
for doc in all_docs:
if "西红柿" in doc or "番茄" in doc: # ← 注意:找不到"番茄",因为不是同一个词
matching_docs.append(doc)
# 3. 按词频打分
score = count("西红柿") * IDF("西红柿") + count("鸡蛋") * IDF("鸡蛋")
# 结果:
# ✅ 找到:"西红柿炒鸡蛋的做法"(有完整匹配)
# ❌ 找不到:"番茄炒蛋的做法"(虽然意思一样,但词不一样)
```
---
### **FAISS 的处理方式:**
```python
# 1. 把查询转成向量
query_vector = embedding_model.embed_query("西红柿炒鸡蛋")
# query_vector = [0.123, -0.456, 0.789, ...]
# 2. 在向量空间里找最近的向量
# (计算欧氏距离或余弦相似度)
similar_vectors = faiss_index.search(query_vector, k=5)
# 结果:
# ✅ 找到:"西红柿炒鸡蛋的做法"(语义相近)
# ✅ 找到:"番茄炒蛋的做法"(虽然词不一样,但向量距离近!)
# ✅ 找到:"家常炒蛋的做法"(语义相关)
# 为什么能找到"番茄炒蛋"?
# 因为在训练时,嵌入模型就学过:
# "西红柿" 和 "番茄" 的向量很接近!
```
---
## 💡 **通俗类比**
### **BM25 就像一个"图书管理员"**
```
你:我要找关于"人工智能"的书
管理员:好的,我去查卡片目录...
(看哪些书的标题/摘要里有"人工智能"这几个字)
找到了:
✅ 《人工智能导论》(标题有"人工智能")
✅ 《人工智能技术应用》(标题有"人工智能")
❌ 《机器学习实战》(虽然没有"人工智能"这几个字,但内容是相关的)
管理员:抱歉,我只会匹配关键词,不懂含义...
```
---
### **FAISS 就像一个"懂人话的 AI 助手"**
```
你:我要找关于"人工智能"的书
AI 助手:我理解你的意思!(把问题转成向量)
(然后去向量空间里找语义相近的书)
找到了:
✅ 《人工智能导论》(语义相近)
✅ 《机器学习实战》(虽然没说"人工智能",但我懂这是相关的)
✅ 《深度学习入门》(语义相关)
AI 助手:这些都是你想要的,对吧?
```
---
## 🎯 **总结**
> **"所以 BM25 是不涉及向量转换的是吧?"**
**答案:是的!完全不涉及!**
**关键区别:**
- **BM25** = 统计关键词出现频率 → **算术题**
- **FAISS** = 把文字转成向量再算距离 → **几何题**(需要先向量化)
**为什么你的系统要两者结合?**
- BM25:精准匹配专有名词(如"宫保鸡丁"、"鱼香肉丝")
- FAISS:理解语义(如"西红柿"="番茄"、"土豆"="马铃薯")
- 合在一起 = 既精准又智能!
# 那用户在提问的时候,FAISS又是依据什么去命中索引的呢,FAISS建立索引的过程又是怎么样的呢,能否通俗易懂地举例说明
好的!我用最通俗易懂的方式给你讲清楚 FAISS 建立索引和命中索引的全过程。
## 🏗️ **FAISS 建立索引的完整过程**
让我用一个超级形象的比喻来解释:
---
### **比喻:把菜谱变成"数字指纹"**
想象一下,FAISS 就像一个**"数字指纹鉴定中心"**。
---
### **第 1 步:准备原材料(你的文档块)**
假设你有这 3 个菜谱块:
```python
chunks = [
"宫保鸡丁:鸡肉切丁,加花生米、干辣椒爆炒,麻辣鲜香",
"鱼香肉丝:猪肉切丝,配木耳、胡萝卜,酸甜口味",
"清蒸鲈鱼:整条鲈鱼清蒸,清淡鲜美,粤菜经典"
]
```
---
### **第 2 步:请"翻译官"把文字翻译成数字(向量化)**
```python
# 嵌入模型就像一个"翻译官"
# 它能把任何一段文字翻译成一串固定长度的数字
from langchain_huggingface import HuggingFaceEmbeddings
embeddings = HuggingFaceEmbeddings(model_name="BAAI/bge-small-zh-v1.5")
# 开始翻译!
vector_1 = embeddings.embed_query("宫保鸡丁:鸡肉切丁,加花生米、干辣椒爆炒,麻辣鲜香")
vector_2 = embeddings.embed_query("鱼香肉丝:猪肉切丝,配木耳、胡萝卜,酸甜口味")
vector_3 = embeddings.embed_query("清蒸鲈鱼:整条鲈鱼清蒸,清淡鲜美,粤菜经典")
# 翻译出来的结果长什么样?
# (假设简化成 5 维向量,实际是 768 维)
vector_1 = [0.82, -0.45, 0.91, -0.12, 0.33] # ← "宫保鸡丁"的数字指纹
vector_2 = [-0.23, 0.67, -0.55, 0.88, -0.41] # ← "鱼香肉丝"的数字指纹
vector_3 = [0.15, 0.22, -0.18, 0.45, 0.79] # ← "清蒸鲈鱼"的数字指纹
```
**关键点:**
- ✅ 语义相近的内容,数字指纹也相近
- ✅ 语义不同的内容,数字指纹差别大
---
### **第 3 步:把"数字指纹"整理成卡片目录(建索引)**
```python
from langchain_community.vectorstores import FAISS
# 创建 FAISS 索引
vectorstore = FAISS.from_documents(
documents=chunks, # 原始文档
embedding=embeddings # 翻译官
)
# 内部发生了什么?
# FAISS 做了一个巨大的表格:
"""
┌─────────────────────────────────────────────┐
│ FAISS 向量索引表 │
├─────────────────────────────────────────────┤
│ ID │ 数字指纹 (向量) │ 原文内容 │
├─────────────────────────────────────────────┤
│ 0 │ [0.82, -0.45, 0.91...] │ 宫保鸡丁... │
│ 1 │ [-0.23, 0.67, -0.55...]│ 鱼香肉丝... │
│ 2 │ [0.15, 0.22, -0.18...] │ 清蒸鲈鱼... │
│ ... │ ... │ ... │
└─────────────────────────────────────────────┘
"""
# 然后 FAISS 用一种聪明的算法(IVF 倒排索引)
# 把这些向量分组,方便快速查找
# 比如分成 3 个簇:
"""
簇 A(川菜系):
- vector_1 [0.82, -0.45, 0.91...] → 宫保鸡丁
- vector_2 [-0.23, 0.67, -0.55...] → 鱼香肉丝
簇 B(粤菜系):
- vector_3 [0.15, 0.22, -0.18...] → 清蒸鲈鱼
"""
```
**为什么要分组?**
- 就像图书馆把书按类别摆放
- 找菜谱时,不用跑遍整个图书馆,去对应区域就行
- **检索速度提升几十倍!**
---
### **第 4 步:保存索引到硬盘**
```python
vectorstore.save_local("./vector_index")
# 保存的文件:
# ./vector_index/
# ├── faiss.index ← 向量索引结构(那个大表格)
# └── index.pkl ← 原始文档 + metadata
```
---
## 🔍 **用户提问时,FAISS 如何命中索引?**
现在到了你最关心的部分!
---
### **场景:用户问"宫保鸡丁怎么做?"**
#### **第 1 步:把问题也转成数字指纹**
```python
# 同样的"翻译官",把用户的问题也转成向量
query = "宫保鸡丁怎么做?"
query_vector = embeddings.embed_query(query)
# query_vector = [0.80, -0.43, 0.89, -0.15, 0.31]
# ↑ 发现和 vector_1 非常接近!
```
**神奇的地方来了:**
- "宫保鸡丁:鸡肉切丁..." 的向量 = `[0.82, -0.45, 0.91, -0.12, 0.33]`
- "宫保鸡丁怎么做?" 的向量 = `[0.80, -0.43, 0.89, -0.15, 0.31]`
- **这两个向量在空间里几乎挨在一起!**
---
#### **第 2 步:计算距离,找最近的邻居**
```python
# FAISS 开始计算 query_vector 和所有向量的距离
# 方法 1:欧氏距离(直线距离)
distance_1 = sqrt((0.80-0.82)² + (-0.43+0.45)² + (0.89-0.91)² + ...)
# distance_1 ≈ 0.03 ← 非常近!
distance_2 = sqrt((0.80+0.23)² + (-0.43-0.67)² + (0.89+0.55)² + ...)
# distance_2 ≈ 2.15 ← 很远!
distance_3 = sqrt((0.80-0.15)² + (-0.43-0.22)² + (0.89+0.18)² + ...)
# distance_3 ≈ 1.87 ← 也比较远
# 方法 2:余弦相似度(夹角余弦值,越接近 1 越相似)
similarity_1 = cos(θ1) ≈ 0.99 ← 几乎一样!
similarity_2 = cos(θ2) ≈ 0.45 ← 不太像
similarity_3 = cos(θ3) ≈ 0.52 ← 有点相关
```
**可视化理解:**
```
向量空间(简化成 2D 平面)
● vector_2 (鱼香肉丝)
● vector_3 (清蒸鲈鱼)
★ query_vector (宫保鸡丁怎么做?)
●
● vector_1 (宫保鸡丁制作方法)
看到了吗?
- query_vector 和 vector_1 几乎贴在一起
- 离 vector_2 和 vector_3 都很远
```
---
#### **第 3 步:返回最近的结果**
```python
# FAISS 按距离排序,返回最近的 k 个
results = vectorstore.similarity_search(query, k=3)
# 返回结果:
# 第 1 名:vector_1 (宫保鸡丁制作方法) - 距离 0.03 ⭐⭐⭐
# 第 2 名:vector_3 (清蒸鲈鱼) - 距离 1.87 ⭐
# 第 3 名:vector_2 (鱼香肉丝) - 距离 2.15 ⭐
# 为什么清蒸鲈鱼也能排上?
# 因为都是"做菜"、"制作方法",语义上有一定相关性
```
---
## 🎯 **为什么能命中?核心原理!**
### **关键:嵌入模型学过"语义相近的词,向量也要相近"**
```python
# 嵌入模型在训练时见过海量数据,它学会了:
# "宫保鸡丁" 和 "宫保鸡丁怎么做" 应该很接近
# "宫保鸡丁" 和 "鱼香肉丝" 也应该比较接近(都是川菜)
# "宫保鸡丁" 和 "清蒸鲈鱼" 就稍远一些(一个是川菜,一个是粤菜)
# 所以当你问"宫保鸡丁怎么做?"时:
# - 模型把它转成向量
# - 这个向量和"宫保鸡丁"相关的向量天然就很接近
# - FAISS 一找一个准!
```
---
## 📊 **完整流程可视化**
```mermaid
graph TB
A["用户提问:
宫保鸡丁怎么做?"] --> B[嵌入模型翻译官]
B --> C["query_vector
[0.80, -0.43, 0.89...]
问题的数字指纹"]
D["FAISS 索引库
已存储的向量"] --> E["vector_1: [0.82, -0.45, 0.91...]
→ 宫保鸡丁制作方法"]
D --> F["vector_2: [-0.23, 0.67, -0.55...]
→ 鱼香肉丝做法"]
D --> G["vector_3: [0.15, 0.22, -0.18...]
→ 清蒸鲈鱼做法"]
C --> H{FAISS 距离计算}
E --> H
F --> H
G --> H
H --> I["距离排序:
1. vector_1 (0.03)
2. vector_3 (1.87)
3. vector_2 (2.15)"]
I --> J["返回结果:
宫保鸡丁制作方法
(命中!⭐⭐⭐)"]
style C fill:#f9f,stroke:#333
style E fill:#bbf,stroke:#333
style J fill:#bfb,stroke:#333
```
---
## 🌰 **再举一个更神奇的例子**
### **用户问:"土豆的做法"**
```python
# 向量化
query = "土豆的做法"
query_vector = embeddings.embed_query(query)
# 数据库里有这些菜谱:
# 1. "土豆烧牛肉:土豆切块,牛肉炖煮..."
# 2. "马铃薯泥:马铃薯蒸熟,压成泥..."
# 3. "番茄炒蛋:番茄切块,鸡蛋炒制..."
# 虽然原文没有出现"土豆"这个词:
# - "马铃薯泥"的向量会和"土豆的做法"非常接近!
# 为什么?
# 因为嵌入模型学过:土豆 = 马铃薯(是同一种东西)
# 所以即使你搜"土豆",也能找到"马铃薯"的菜!
# 这就是语义检索的神奇之处!
```
---
## 💡 **总结:FAISS 命中索引的本质**
| 步骤 | 做了什么 | 类比 |
| --------------- | ------------------ | ---------------------------- |
| **1. 建立索引** | 把所有文档转成向量 | 给每本书按内容生成数字指纹 |
| **2. 组织索引** | 用 FAISS 算法分组 | 把指纹做成卡片目录,分类摆放 |
| **3. 用户提问** | 把问题也转成向量 | 给问题也生成数字指纹 |
| **4. 匹配命中** | 计算向量距离 | 看哪个指纹和问题指纹最像 |
| **5. 返回结果** | 返回最近的 k 个 | 把最相关的书找出来给你 |
**核心思想:**
- ✅ **物以类聚**:语义相近的内容,向量空间里距离也近
- ✅ **人以群分**:FAISS 把相近的向量组织在一起,方便快速查找
- ✅ **一问一答**:用户的问题转成向量后,自然能找到相近的答案
**通俗理解:**
FAISS 就像一个"语义雷达",用户的问题发射出去(向量),最近的几个目标(文档)就会被锁定(命中)!
# 意思是这里建索引的时候,是把一个文档分成了好几个切片,然后再计算向量,然后再缓存索引的?
**完全正确!你理解得非常到位!**
让我用大白话给你确认一下整个流程:
---
## ✅ **是的!就是你说的那样**
### **完整流程:**
```python
# 第 1 步:有一个完整的文档(比如一篇菜谱)
原始文档 = """
# 宫保鸡丁
川菜经典代表作之一,口味麻辣鲜香。
## 所需食材
- 鸡胸肉 500g
- 花生米 100g
- 干辣椒 20g
- 花椒 10g
## 制作步骤
1. 鸡肉切丁,用料酒腌制
2. 热锅凉油,爆香干辣椒和花椒
3. 下鸡丁翻炒至变色
4. 加入花生米,调味出锅
## 附加内容
- 火候控制:全程大火快炒
- 关键技巧:先腌制鸡肉更入味
"""
# ↓ 分块(chunking)
# 第 2 步:切成几个小块(按 Markdown 标题分割)
chunks = [
"宫保鸡丁\n川菜经典代表作之一...", # chunk_1: 菜品介绍
"所需食材\n- 鸡胸肉 500g...", # chunk_2: 食材列表
"制作步骤\n1. 鸡肉切丁...", # chunk_3: 做法步骤
"附加内容\n- 火候控制..." # chunk_4: 烹饪技巧
]
# ↓ 向量化
# 第 3 步:每个小块单独转成向量
vector_1 = embeddings.embed_query(chunks[0]) # 介绍部分的向量
vector_2 = embeddings.embed_query(chunks[1]) # 食材部分的向量
vector_3 = embeddings.embed_query(chunks[2]) # 步骤部分的向量
vector_4 = embeddings.embed_query(chunks[3]) # 技巧部分的向量
# ↓ 建索引
# 第 4 步:把这些向量整理成索引
FAISS 索引 = {
vector_1 → chunks[0],
vector_2 → chunks[1],
vector_3 → chunks[2],
vector_4 → chunks[3]
}
# ↓ 保存
# 第 5 步:存到硬盘
FAISS 索引.save_local("./vector_index")
```
---
## 📊 **可视化对比**
### **不切块 vs 切块**
| 方式 | 处理方式 | 检索效果 | 你的系统用的哪种? |
| ---------- | ---------------------------------- | ---------------------------------------------------- | ------------------ |
| **不切块** | 整篇文档转成 1 个向量 | ❌ 用户问"需要什么食材"时,可能返回整篇文章,不够精准 | ❌ 不用这种 |
| **切块** | 每篇文档切成多个小块,每块转成向量 | ✅ 用户问"需要什么食材"时,精准返回"食材"那个小块 | ✅ 你用这种! |
---
## 🔍 **为什么要切块?**
看你的代码 `index_construction.py` 第 69-72 行:
```python
self.vectorstore = FAISS.from_documents(
documents=chunks, # ← 注意!这里传入的是 chunks(已经切好的小块)
embedding=self.embeddings
)
```
**这些 chunks 从哪里来?**
看 `data_preparation.py` 的 `chunk_documents()` 方法(第 141-166 行):
```python
def chunk_documents(self) -> List[Document]:
"""Markdown 结构感知分块"""
if not self.documents:
raise ValueError("请先加载文档")
# 使用 Markdown 标题分割器
chunks = self._markdown_header_split()
# 为每个 chunk 添加元数据
for i, chunk in enumerate(chunks):
chunk.metadata['batch_index'] = i
chunk.metadata['chunk_size'] = len(chunk.page_content)
self.chunks = chunks # ← 这里!把切好的小块存起来
return chunks
```
---
## 🌰 **举例说明好处**
假设你有 100 篇菜谱,每篇平均 4 个小块:
### **如果不切块:**
```python
# 只有 100 个向量
FAISS 索引 = {
vector_宫保鸡丁全文 → 整篇宫保鸡丁.md,
vector_鱼香肉丝全文 → 整篇鱼香肉丝.md,
...
}
# 用户问:"宫保鸡丁需要什么食材?"
query_vector = embeddings.embed_query("宫保鸡丁需要什么食材?")
# FAISS 找到最接近的向量 → 返回整篇文章
# ❌ 用户要自己从文章里找食材信息
```
---
### **切块后:**
```python
# 有 400 个向量(100 篇 × 4 块/篇)
FAISS 索引 = {
vector_宫保鸡丁_介绍 → "# 宫保鸡丁\n川菜经典...",
vector_宫保鸡丁_食材 → "## 所需食材\n- 鸡胸肉 500g...",
vector_宫保鸡丁_步骤 → "## 制作步骤\n1. 鸡肉切丁...",
vector_宫保鸡丁_技巧 → "## 附加内容\n- 火候控制...",
vector_鱼香肉丝_介绍 → ...,
vector_鱼香肉丝_食材 → ...,
vector_鱼香肉丝_步骤 → ...,
vector_鱼香肉丝_技巧 → ...,
...
}
# 用户问:"宫保鸡丁需要什么食材?"
query_vector = embeddings.embed_query("宫保鸡丁需要什么食材?")
# FAISS 计算距离,发现:
# - vector_宫保鸡丁_食材 离 query_vector 最近!⭐
# - vector_宫保鸡丁_步骤 也比较近
# - vector_宫保鸡丁_介绍 稍远
# 返回结果:
results = [
"## 所需食材\n- 鸡胸肉 500g\n- 花生米 100g...", # ← 精准命中!
"## 制作步骤\n1. 鸡肉切丁...",
"## 附加内容\n- 火候控制..."
]
# ✅ 直接把食材部分放在最前面,用户一眼就能看到!
```
---
## 💡 **切块的本质**
**就是把"粗粒度"变成"细粒度":**
```
粗粒度:
┌─────────────────────────────┐
│ 整篇宫保鸡丁 (1 个向量) │
└─────────────────────────────┘
细粒度(切块后):
┌──────────┬──────────┬──────────┬──────────┐
│ 介绍部分 │ 食材部分 │ 步骤部分 │ 技巧部分 │
│ (1 个向量) │ (1 个向量) │ (1 个向量) │ (1 个向量) │
└──────────┴──────────┴──────────┴──────────┘
```
**好处:**
1. ✅ **检索更精准**:问什么答什么,不返回无关内容
2. ✅ **节省 token**:只返回相关的小块,不用返回整篇文章
3. ✅ **提高命中率**:小块更容易和问题匹配上
---
## 🎯 **总结你的理解**
> **"意思是建索引的时候,是把一个文档分成了好几个切片,然后再计算向量,然后再缓存索引的?"**
**答案:100% 正确!**
**完整流程回顾:**
```mermaid
graph LR
A["原始文档
宫保鸡丁.md"] --> B["按 Markdown
标题切块"]
B --> C["4 个小块
chunks"]
C --> D["嵌入模型
向量化"]
D --> E["4 个向量
vector_1~4"]
E --> F["FAISS 建立索引
组织成卡片目录"]
F --> G["保存到硬盘
./vector_index/"]
style A fill:#f9f,stroke:#333
style C fill:#bbf,stroke:#333
style E fill:#bfb,stroke:#333
style G fill:#ff9,stroke:#333
```
**每一步的作用:**
1. **切块**:把大文档拆成小片段(按标题结构)
2. **向量化**:每个片段转成数字指纹
3. **建索引**:把指纹整理成高效的查找结构
4. **缓存**:存到硬盘,下次直接用
你理解得非常透彻!👍
# 意思是在匹配向量索引的时候,是不会涉及分词的,而是直接把用户的全部问题转成向量去查询索引的?
**完全正确!你又说对了!**
在 FAISS 向量检索的时候,**完全不分词**,直接把用户的整个问题当成一个整体转成向量去匹配!
---
## ✅ **是的!FAISS 检索时不分词**
### **完整流程对比:**
```python
# 用户提问:"宫保鸡丁需要什么食材?"
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# ❌ BM25 的做法(需要分词)
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
query = "宫保鸡丁需要什么食材?"
# 第 1 步:分词
tokens = ["宫保鸡丁", "需要", "什么", "食材"] ← 切成一个个词
# 第 2 步:对每个词查倒排索引
for token in tokens:
docs = inverted_index[token] # 找包含这个词的文档
# 第 3 步:统计词频,计算 BM25 分数
score = TF("宫保鸡丁") × IDF("宫保鸡丁") +
TF("食材") × IDF("食材") + ...
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# ✅ FAISS 的做法(不分词,整体处理)
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
query = "宫保鸡丁需要什么食材?"
# 第 1 步:直接整个句子转成向量(不用分词!)
query_vector = embeddings.embed_query("宫保鸡丁需要什么食材?")
# query_vector = [0.81, -0.44, 0.90, -0.13, 0.32, ...]
# 第 2 步:拿着这个向量去 FAISS 索引里找最近的邻居
results = faiss_index.search(query_vector, k=5)
# 第 3 步:返回距离最近的几个文档块
# 完成!全程没有分词!
```
---
## 📊 **核心区别对比表**
| 特性 | **BM25(关键词检索)** | **FAISS(向量检索)** |
| ------------ | ------------------------------------------------------------ | ------------------------------------------------------------ |
| **是否分词** | ✅ **必须分词** | ❌ **不分词** |
| **处理单位** | 词语(token) | 整句/整段(sentence) |
| **查询过程** | 宫保鸡丁 → 查倒排表
需要 → 查倒排表
什么 → 查倒排表
食材 → 查倒排表 | "宫保鸡丁需要什么食材?"
→ 一整句话转成向量
→ 一次查询搞定 |
| **匹配原理** | 看词语是否出现在文档中 | 看整个句子的语义是否相近 |
| **优势** | 精准匹配专有名词 | 理解整体语义 |
---
## 🔍 **为什么 FAISS 不需要分词?**
因为**嵌入模型(Embedding Model)已经帮你理解了整体语义**!
```python
# 嵌入模型就像一个"语义理解大师"
# 它见过海量数据,学会了:
"宫保鸡丁需要什么食材?"
和
"宫保鸡丁:所需食材\n- 鸡胸肉 500g\n- 花生米 100g..."
# 这两段话虽然用词不完全一样,但语义很接近
# 所以它们的向量也会很接近!
# 嵌入模型内部已经考虑了:
# - 词语顺序
# - 上下文关系
# - 语义关联
# - 同义词映射(如"土豆"="马铃薯")
# 所以你不需要手动分词,直接把整句话丢给它就行!
```
---
## 🌰 **实际例子说明**
### **场景 1:用户问"宫保鸡丁需要什么食材?"**
```python
# FAISS 的处理方式(不分词):
query = "宫保鸡丁需要什么食材?"
query_vector = embed_query(query)
# 直接整句转成向量:[0.81, -0.44, 0.90, ...]
# 索引里的向量:
chunks_vectors = {
"宫保鸡丁:川菜经典...": [0.82, -0.45, 0.91, ...], # 介绍部分
"所需食材:鸡胸肉 500g...": [0.79, -0.42, 0.88, ...], # 食材部分 ← 最接近!
"制作步骤:鸡肉切丁...": [0.75, -0.38, 0.85, ...], # 步骤部分
"火候控制:大火快炒...": [0.60, -0.30, 0.70, ...], # 技巧部分
}
# 计算距离后发现:
# "所需食材" 的向量和 query_vector 最接近!
# 直接命中!✅
# 整个过程没有分词,纯靠语义相似度!
```
---
### **场景 2:用户换种问法"做宫保鸡丁要准备哪些东西?"**
```python
# BM25 可能会懵:
query_tokens = ["做", "宫保鸡丁", "要", "准备", "哪些", "东西"]
# ❌ 原文中是"所需食材",没有"准备"、"东西"这些词,可能匹配不上
# FAISS 完全不受影响:
query = "做宫保鸡丁要准备哪些东西?"
query_vector = embed_query(query)
# 虽然用词不一样,但语义和"宫保鸡丁需要什么食材?"几乎一样
# 所以 query_vector 也非常接近"所需食材"那个 chunk 的向量
# 照样命中!✅
```
**这就是语义检索的强大之处:**
- ✅ 不依赖具体用词
- ✅ 理解整体意思
- ✅ 能处理同义表达
---
## 💡 **通俗类比**
### **BM25 像一个"查字典的人"**
```
你:我想找关于"苹果"的信息
BM25:好的,我先把你说的话切成词...
"我"、"想"、"找"、"关于"、"苹果"、"的"、"信息"
然后我去查字典,看哪些文档包含这些词...
找到了!这篇文档出现了 3 次"苹果"!
(但它不理解"苹果"是指水果还是手机公司)
```
---
### **FAISS 像一个"懂人话的人"**
```
你:我想找关于"苹果"的信息
FAISS:我明白了!(把整句话转成向量)
根据我的语义理解,你应该是指水果苹果...
我去向量空间里看看哪个内容和你的问题最像...
找到了!这个是讲"苹果的营养价值"的,应该就是你想要的!
(它理解整体语义,不是死板匹配关键词)
```
---
## 🎯 **总结**
> **"意思是在匹配向量索引的时候,是不会涉及分词的,而是直接把用户的全部问题转成向量去查询索引的?"**
**答案:100% 正确!**
**关键要点:**
| 步骤 | BM25 | FAISS |
| ------------ | -------------------------------- | ------------------------ |
| **用户输入** | "宫保鸡丁需要什么食材?" | "宫保鸡丁需要什么食材?" |
| **第 1 步** | ✅ 分词 → ["宫保鸡丁", "需要"...] | ❌ 不分词,整句处理 |
| **第 2 步** | 查每个词的倒排表 | 整句转成向量 |
| **第 3 步** | 统计词频,算 BM25 分 | 计算向量距离 |
| **本质** | 关键词匹配 | 语义理解 |
**为什么可以不分词?**
- 因为嵌入模型是个"语义编码器"
- 它能把整句话的含义压缩成一个向量
- 语义相近的句子,向量也相近
- 所以不需要分词,直接整体匹配就行!
你理解得非常到位!👍
# 那如果用户的提问很长很抽象呢,那是不是要考虑使用一些手段来实现问题重写了?
**你说得太对了!这确实是 RAG 系统的一个关键问题!**
长而抽象的问题直接向量化,效果会很差。让我详细解释为什么,以及怎么解决。
---
## 🚨 **长而抽象的问题有什么问题?**
### **问题 1:语义被稀释**
```python
# 用户问了一个很长很啰嗦的问题:
query = """
我最近想买一台笔记本电脑,预算大概在 5000-8000 元之间,
主要用于办公和偶尔玩一些轻度游戏,比如英雄联盟这种,
希望续航时间长一点,重量轻一点,方便携带,
屏幕大小无所谓,但分辨率最好高一些,
有什么推荐的吗?
"""
# 直接转成向量:
query_vector = embed_query(query) # 768 维
# 问题在哪里?
# 这个向量里包含了太多信息:
# - "笔记本电脑"(核心主题)✅
# - "5000-8000 元"(预算)
# - "办公"(用途)
# - "英雄联盟"(游戏需求)
# - "续航时间"(电池要求)
# - "重量轻"(便携性)
# - "分辨率"(屏幕要求)
# - ... 还有很多修饰词
# 结果:核心语义被一大堆细节稀释了!
# 向量可能偏离了"笔记本推荐"这个核心,跑到了"电子产品评测"或者"游戏攻略"的方向!
```
---
### **问题 2:噪声干扰**
```python
# 再看一个更抽象的例子:
query = """
我觉得最近状态不太好,每天早上起来都很累,
工作也没精神,注意力不集中,
是不是身体出了什么问题?有没有什么调理的方法?
"""
# 直接转成向量:
query_vector = embed_query(query)
# 这个向量会指向哪里?
# 可能是"健康问题"
# 可能是"职场压力"
# 可能是"心理咨询"
# 可能是"养生保健"
# 太模糊了!向量空间里这一片区域什么内容都有!
# 检索出来的东西可能五花八门,什么都不精准!
```
---
### **问题 3:向量偏离中心**
用图像来理解:
```
向量空间可视化(简化成 2D)
● 健康咨询
● 养生建议
● 心理问题 ★ query_vector (太长了,跑到中间去了)
● 职场压力
● 工作效率
看到了吗?
- 问题太长太杂 → 向量跑到几个簇的中间位置
- 离每个具体领域都不远不近
- 检索结果:每个领域都沾点边,但都不够精准!
```
---
## ✅ **解决方案:问题重写(Query Rewriting)**
你的系统里已经有这个功能了!看 `generation_integration.py`:
**已查看文件** *generation_integration.py 164-227*
看到了!你的系统已经有 `query_rewrite()` 方法了。但现在的版本比较简单,主要针对模糊查询。让我给你展示**针对长而抽象问题的完整解决方案**。
---
## 🛠️ **问题重写的 4 种核心技术**
### **技术 1:简化提炼(针对啰嗦问题)**
```python
# 原始问题(太长太啰嗦):
query = """
我最近想买一台笔记本电脑,预算大概在 5000-8000 元之间,
主要用于办公和偶尔玩一些轻度游戏,比如英雄联盟这种,
希望续航时间长一点,重量轻一点,方便携带,
屏幕大小无所谓,但分辨率最好高一些,
有什么推荐的吗?
"""
# ❌ 直接向量化 → 语义被稀释
# ✅ 先简化提炼:
重写后 = "5000-8000 元 轻薄笔记本电脑推荐 长续航 可玩英雄联盟"
# 好处:
# - 保留了核心信息(预算、用途、关键需求)
# - 去掉了修饰词和语气词
# - 向量更集中、更精准
```
---
### **技术 2:结构化拆分(针对复合问题)**
```python
# 原始问题(包含多个子问题):
query = """
我想减肥,应该吃什么食物比较好?
还有运动方面有什么建议?
另外每天睡几个小时合适?
"""
# ❌ 直接向量化 → 一个向量承载太多主题
# ✅ 拆分成多个独立查询:
queries = [
"减肥期间适合吃的食物推荐", # 饮食方面
"减肥有效的运动方式", # 运动方面
"减肥期间充足睡眠时间" # 睡眠方面
]
# 分别检索,然后合并结果
# 每个查询的向量都很聚焦,命中率更高!
```
---
### **技术 3:添加上下文(针对模糊问题)**
```python
# 原始问题(太抽象):
query = "感觉状态不好,怎么办?"
# ❌ 直接向量化 → 不知道指什么
# ✅ 结合对话历史添加上下文:
# 如果之前用户说过工作压力大...
重写后 = "工作压力大导致状态不好的调理方法"
# 或者根据当前场景(比如在美食 RAG 里):
重写后 = "缓解压力改善心情的食物推荐"
# 通过添加具体领域,让向量指向明确的区域!
```
---
### **技术 4:关键词增强(针对口语化问题)**
```python
# 原始问题(太口语化):
query = "有没有什么好吃的容易做的菜?"
# ❌ 直接向量化 → 向量空间里这一片很泛
# ✅ 添加专业术语:
重写后 = "简单易做的家常菜菜谱推荐 新手友好 快速烹饪"
# 加入了:
# - "家常菜"(菜品分类术语)
# - "新手友好"(难度描述)
# - "快速烹饪"(烹饪方式)
# 这些词能让向量更接近专业的食谱内容!
```
---
## 📊 **你的系统中的实际应用**
看你的代码 `main.py` 里是这么用的:
```python
def ask_question(self, question: str, stream: bool = False):
# 1. 查询路由(判断问题类型)
route_type = self.generation_module.query_router(question)
# 2. 智能查询重写(根据路由类型)
if route_type == 'list':
# 列表查询保持原查询
rewritten_query = question
else:
# 详细查询和一般查询使用智能重写
rewritten_query = self.generation_module.query_rewrite(question)
# 3. 用重写后的问题去检索
relevant_chunks = self.retrieval_module.hybrid_search(rewritten_query, top_k=self.config.top_k)
```
**这个流程是对的!但 `query_rewrite` 可以更强!**
---
## 🚀 **升级版 Query Rewrite 实现**
让我给你一个更强大的版本,专门处理长而抽象的问题:
```python
def query_rewrite(self, query: str) -> str:
"""
智能查询重写 - 优化用户提问以提高检索效果
Args:
query: 用户原始问题
Returns:
重写后的问题或原问题
"""
# 定义查询分析的提示词(增强版)
prompt = PromptTemplate(
template="""
你是一个专业的查询优化专家。请分析用户的查询,并重写以提高检索准确率。
原始查询:{query}
请按以下步骤处理:
**第 1 步:判断问题类型**
- 如果问题已经具体明确(如"宫保鸡丁怎么做"),直接返回原查询
- 如果问题过于冗长(超过 50 个字),提取核心信息
- 如果问题模糊抽象(如"有什么好吃的"),添加具体限定词
- 如果问题包含多个子问题,保留最核心的一个
**第 2 步:重写原则**
- 长度控制:20-40 字为佳,不要超过 60 字
- 保留核心:确保原意不变
- 增加术语:添加相关领域的专业词汇
- 去除噪声:删除语气词、修饰词等无关内容
**第 3 步:输出格式**
只输出重写后的查询,不要任何解释。
示例:
输入:"我最近想买台笔记本电脑,预算 5000-8000 元,主要用来办公和玩游戏,有什么推荐?"
输出:"5000-8000 元笔记本电脑推荐 办公游戏两用"
输入:"做菜"
输出:"简单易做的家常菜谱"
输入:"宫保鸡丁怎么做"
输出:"宫保鸡丁的制作方法"
输入:"感觉最近状态不太好,身体疲惫,精神不佳,是不是健康问题?怎么调理?"
输出:"身体疲惫精神不佳的调理方法"
现在请处理:
输入:"{query}"
输出:""",
input_variables=["query"]
)
try:
# 构建重写流水线
chain = (
{"query": RunnablePassthrough()}
| prompt
| self.llm
| StrOutputParser()
)
# 执行重写
rewritten_query = chain.invoke(query).strip()
# 安全检查:如果重写后为空或明显不合理,使用原查询
if not rewritten_query or len(rewritten_query) < 3:
logger.warning(f"查询重写失败,使用原查询:'{query}'")
return query
# 记录日志
if rewritten_query != query:
logger.info(f"查询已优化:'{query}' → '{rewritten_query}'")
else:
logger.info(f"查询无需优化:'{query}'")
return rewritten_query
except Exception as e:
logger.error(f"查询重写过程出错:{e},使用原查询")
return query # 出问题时返回原查询
# ... existing agent ...
```
---
## 💡 **更多实用技巧**
### **技巧 1:多轮重写(逐步优化)**
```python
def advanced_query_rewrite(self, query: str, max_iterations: int = 2):
"""多轮重写,逐步优化"""
current_query = query
for i in range(max_iterations):
# 第 1 轮:简化(去掉冗余)
if i == 0:
prompt = "请简化这个问题,保留核心意思,去掉多余修饰..."
# 第 2 轮:增强(添加术语)
elif i == 1:
prompt = "请添加相关专业术语,使问题更具体..."
rewritten = self._rewrite_with_prompt(current_query, prompt)
# 如果质量变好了,就停止
if self._is_better(rewritten, query):
return rewritten
current_query = rewritten
return current_query
```
---
### **技巧 2:假设性问题(HyDE 技术)**
```python
def hypothetical_answer_rewrite(self, query: str) -> str:
"""
HyDE (Hypothetical Document Embeddings) 技术
思路:先让 AI 编一个假设性答案,然后用答案去检索
为什么有效?因为答案和问题的向量天然接近!
"""
# 第 1 步:生成假设性答案
answer_prompt = f"""
请为以下问题写一个简短的答案(100 字以内):
{query}
"""
hypothetical_answer = self.llm.invoke(answer_prompt).content
# 第 2 步:用假设答案作为检索查询
# 这样比直接用问题去检索更精准!
return hypothetical_answer
```
---
### **技巧 3:反向转换(Sub-Query Decomposition)**
```python
def decompose_complex_query(self, query: str) -> List[str]:
"""
把复杂问题拆成多个简单子问题
例子:
输入:"我想做一顿晚餐,有荤有素,还要有汤,怎么做?"
输出:[
"简单易做的荤菜菜谱",
"简单易做的素菜菜谱",
"家常汤品做法"
]
"""
prompt = PromptTemplate(
template="""
请把以下复杂问题拆分成 2-4 个简单的子问题:
原始问题:{query}
子问题列表(每行一个):
"""
)
response = (prompt | self.llm | StrOutputParser()).invoke({"query": query})
# 解析响应,得到子问题列表
sub_queries = [q.strip() for q in response.split('\n') if q.strip()]
return sub_queries
# 使用方式:
# 对每个子问题分别检索,然后合并结果!
```
---
## 🎯 **总结**
> **"那如果用户的提问很长很抽象呢,那是不是要考虑使用一些手段来实现问题重写了?"**
**答案:是的!而且你的系统已经在用了!**
**关键要点:**
| 问题类型 | 特征 | 解决技术 | 重写效果 |
| ------------ | ---------------------- | ---------- | ------------------ |
| **冗长啰嗦** | 超过 50 字,一堆修饰词 | 简化提炼 | 保留核心,去掉废话 |
| **模糊抽象** | "有什么好的推荐" | 添加上下文 | 加上具体领域限定 |
| **复合问题** | 包含多个子问题 | 结构化拆分 | 拆成多个独立查询 |
| **口语化** | "想吃点啥" | 关键词增强 | 加入专业术语 |
**为什么要重写?**
- ✅ 让向量更聚焦(不被冗余信息稀释)
- ✅ 提高命中率(更接近索引内容的表达方式)
- ✅ 减少噪声(去掉无关词汇的干扰)
**最佳实践:**
1. 短小精悍(20-40 字)
2. 保留核心(原意不变)
3. 添加术语(靠近专业内容)
4. 去除噪声(删掉语气词、修饰语)
你的直觉非常准!这确实是提升 RAG 检索质量的关键技术!👍
# graph_indexing工作流
输入数据(从 Neo4j 读取):
┌──────────────┐
│ 宫保鸡丁 │ (Recipe)
│ (recipe_001) │
└──────────────┘
│
│ REQUIRES
↓
┌──────────────┐
│ 鸡胸肉 │ (Ingredient)
│ (ing_123) │
└──────────────┘
处理流程:
1️⃣ 创建实体键值对
├─ 宫保鸡丁 → {
│ index_keys: ["宫保鸡丁"],
│ value_content: "菜品名称:宫保鸡丁\n分类:川菜\n难度:★★★",
│ entity_type: "Recipe"
│ }
│
└─ 鸡胸肉 → {
index_keys: ["鸡胸肉"],
value_content: "食材名称:鸡胸肉\n类别:肉类",
entity_type: "Ingredient"
}
2️⃣ 创建关系键值对
└─ rel_001 → {
index_keys: ["REQUIRES", "食材搭配", "宫保鸡丁_食材", "鸡胸肉"],
value_content: "关系类型:REQUIRES\n源实体:宫保鸡丁\n目标实体:鸡胸肉",
source_entity: "recipe_001",
target_entity: "ing_123"
}
3️⃣ 去重优化
└─ 检查有没有重复的实体和关系,有就合并
4️⃣ 输出结果
└─ entity_kv_store: 所有实体的键值对
└─ relation_kv_store: 所有关系的键值对
└─ key_to_entities: 反向索引(词→实体 IDs)
└─ key_to_relations: 反向索引(词→关系 IDs)
关键设计点:
✅ 一键多词:一个实体/关系可以有多个索引键,提高命中率
✅ 双向索引:既能通过 ID 找信息,也能通过关键词找 ID
✅ 自动去重:避免重复数据影响检索质量
✅ LLM 增强:可选 AI 生成智能关键词(更贴近用户搜索习惯)
# graph_rag_retrieval.py工作流程图
我来为这个图 RAG 检索模块添加详细注释,并绘制工作流程图。
## 一、工作流程图
```mermaid
graph TD
A[用户查询] --> B[查询意图理解
understand_graph_query]
B --> C{识别查询类型}
C -->|实体关系查询 | D1[执行实体关系遍历]
C -->|多跳查询 | D2[执行多跳图遍历
multi_hop_traversal]
C -->|子图查询 | D3[提取知识子图
extract_knowledge_subgraph]
C -->|路径查找 | D4[执行最短路径查找
_find_shortest_paths]
C -->|聚类查询 | D5[执行聚类分析]
D1 --> E[路径转换为文档
_paths_to_documents]
D2 --> E
D3 --> F[图结构推理
graph_structure_reasoning]
F --> G[子图转换为文档
_subgraph_to_documents]
D4 --> E
D5 --> E
E --> H[基于图相关性排序
_rank_by_graph_relevance]
G --> H
H --> I[返回 Top-K 结果]
J[Neo4j 图数据库] -.-> D1
J -.-> D2
J -.-> D3
J -.-> D4
J -.-> D5
K[LLM 客户端] -.-> B
K -.-> F
style A fill:#e1f5ff
style I fill:#d4edda
style J fill:#fff3cd
style K fill:#f8d7da
```
## 二、添加详细注释的代码
## 三、核心要点总结
### 1. **图 RAG vs 传统向量检索**
| 特性 | 向量检索 | 图 RAG |
| -------- | ---------- | -------------- |
| 匹配方式 | 语义相似度 | 图结构遍历 |
| 关联性 | 直接匹配 | 多跳推理 |
| 可解释性 | 低 | 高(路径可见) |
| 适用场景 | 模糊查询 | 关系推理 |
### 2. **五大查询类型**
- **实体关系查询**:A 和 B 有什么关系?
- **多跳查询**:A 通过什么连接到 C?(核心优势)
- **子图查询**:A 相关的所有信息
- **路径查找**:从 A 到 B 的最佳路径
- **聚类查询**:和 A 相似的都有什么?
### 3. **工作流程**
```
用户查询 → 意图理解 → 选择策略 → 图遍历/子图提取 → 推理 → 转文档 → 排序 → 返回
```
### 4. **性能优化**
- 实体/关系缓存(预加载高频节点)
- 索引构建(度数排序)
- 降级方案(Neo4j 不可用时)
# hybrid_retrieval.py工作流程
我来为这个混合检索模块添加详细注释,并绘制工作流程图。
## 一、工作流程图
```mermaid
graph TB
A[用户查询] --> B[关键词提取
extract_query_keywords]
B --> C{双层检索}
C -->|实体级关键词 | D[实体级检索
entity_level_retrieval]
C -->|主题级关键词 | E[主题级检索
topic_level_retrieval]
D --> D1[图索引匹配
get_entities_by_key]
D1 --> D2[获取邻居节点
_get_node_neighbors]
D2 --> D3[Neo4j 补充检索
_neo4j_entity_level_search]
D3 --> F[结果合并去重]
E --> E1[图索引关系匹配
get_relations_by_key]
E1 --> E2[分类信息匹配
get_entities_by_key]
E2 --> E3[Neo4j 主题检索
_neo4j_topic_level_search]
E3 --> F
F --> G[双层检索结果
dual_level_retrieval]
H[并行执行] --> G
H --> I[增强向量检索
vector_search_enhanced]
I --> I1[Milvus 向量检索
similarity_search]
I1 --> I2[图信息增强
_get_node_neighbors]
I2 --> J[Round-robin 轮询合并]
G --> J
J --> K[去重 & 重排序]
K --> L[返回 Top-K 结果]
M[图索引模块
GraphIndexing] -.-> D1
M -.-> E1
N[Milvus 模块] -.-> I1
O[Neo4j 数据库] -.-> D3
O -.-> E3
O -.-> I2
P[LLM 客户端] -.-> B
style A fill:#e1f5ff
style L fill:#d4edda
style M fill:#fff3cd
style N fill:#fff3cd
style O fill:#f8d7da
style P fill:#f8d7da
style J fill:#ffe6cc
```
## 二、添加详细注释的代码
```python
```
## 三、核心要点总结
### 1. **双层检索范式对比**
| 层级 | 检索对象 | 适用场景 | 示例 |
| ------ | -------- | ----------------- | -------------- |
| 实体级 | 具体实体 | 明确提及食材/菜品 | "鸡肉怎么做?" |
| 主题级 | 抽象概念 | 模糊需求/风格偏好 | "减肥菜推荐" |
### 2. **三种检索源**
- **图索引**:内存键值对,速度最快(优先使用)
- **Neo4j**:图数据库,支持复杂查询(补充检索)
- **Milvus**:向量检索,语义相似度(并行执行)
### 3. **Round-robin 轮询合并**
```
双 层:[A1, A2, A3, ...]
向量:[B1, B2, B3, ...]
合并:[A1, B1, A2, B2, A3, B3, ...]
```
优势:避免单一检索源的偏见,保证结果多样性
### 4. **工作流程**
```
查询 → 关键词提取 → [实体级检索 + 主题级检索] || 向量检索 → Round-robin 合并 → 去重排序 → 返回
```
# 可否给我通俗易懂地举例说明,从用户提问“有什么好吃的鱼肉的菜吗”到输出回答,中间究竟经历了什么,每一步的中间结果又是什么,最后模型是怎么基于milvus的检索和neo4j的检索汇总答案的
好的!我用大白话+详细例子给你讲清楚整个过程。咱们从用户提问"有什么好吃的鱼肉的菜吗"开始,一步步拆解:
## 📝 完整流程图解
```mermaid
graph TB
A["用户提问:
有什么好吃的鱼肉的菜吗"] --> B[第 1 步:关键词提取]
B --> C{LLM 分析查询}
C --> D["实体关键词:
['鱼', '鱼肉', '鲫鱼', '鲈鱼']"]
C --> E["主题关键词:
['好吃', '美味', '下饭菜']"]
D --> F[第 2 步:双层检索
并行执行]
E --> F
F --> G["实体级检索
entity_level_retrieval"]
F --> H["主题级检索
topic_level_retrieval"]
G --> G1["图索引匹配:
get_entities_by_key('鱼')"]
G1 --> G2["找到实体:
nodeId=200000001
name='清蒸鲈鱼'"]
G2 --> G3["获取邻居:
['鲈鱼', '葱', '姜', '蒸']"]
G3 --> G4["增强内容:
清蒸鲈鱼 + 相关信息:鲈鱼,葱,姜"]
H --> H1["关系匹配:
get_relations_by_key('好吃')"]
H1 --> H2["找到关系:
'清蒸鲈鱼'-[:HAS_RATING]->'5 星'"]
H2 --> H3["分类匹配:
category CONTAINS '好吃'"]
G4 --> I["双层检索结果合并
去重 + 排序"]
H3 --> I
I --> J["dual_docs = [doc1, doc2, doc3]"]
K["同时并行执行
向量检索"] --> L[Milvus 向量搜索]
L --> M["找到相似文档:
cosine_distance=0.15"]
M --> N["图增强:
添加邻居信息"]
N --> O["vector_docs = [doc4, doc5]"]
J --> P[第 3 步:Round-robin 合并]
O --> P
P --> P1["i=0: 取 dual_docs[0]
→ doc1(清蒸鲈鱼)"]
P1 --> P2["i=0: 取 vector_docs[0]
→ doc4(水煮鱼)"]
P2 --> P3["i=1: 取 dual_docs[1]
→ doc2(红烧鱼)"]
P3 --> P4["i=1: 取 vector_docs[1]
→ doc5(酸菜鱼)"]
P4 --> Q["merged_docs =
[清蒸鲈鱼,水煮鱼,
红烧鱼,酸菜鱼]"]
Q --> R["取 top_k=5
返回最终结果"]
R --> S["最终输出:
[清蒸鲈鱼,水煮鱼,
红烧鱼,酸菜鱼,... ]"]
style A fill:#e1f5ff
style S fill:#d4edda
style G fill:#fff3cd
style H fill:#fff3cd
style K fill:#fff3cd
style P fill:#ffe6cc
```
## 🔍 详细步骤拆解(带真实中间结果)
### **第 1 步:关键词提取**
用户问:"有什么好吃的鱼肉的菜吗"
系统调用 LLM 分析这个查询:
```python
# 实际执行的 prompt
prompt = """
作为烹饪知识助手,请分析以下查询并提取关键词:
查询:有什么好吃的鱼肉的菜吗
提取规则:
1. 实体级关键词:具体的食材、菜品名称
2. 主题级关键词:抽象概念、烹饪主题
"""
```
**LLM 返回的中间结果:**
```json
{
"entity_keywords": ["鱼", "鱼肉", "鲫鱼", "鲈鱼", "鲤鱼"],
"topic_keywords": ["好吃", "美味", "下饭菜", "家常菜"]
}
```
---
### **第 2 步:双层检索(并行执行)**
#### **2.1 实体级检索 `entity_level_retrieval`**
拿着 `["鱼", "鱼肉", "鲫鱼", "鲈鱼"]` 去检索
**具体过程:**
1️⃣ **遍历每个关键词**,在图索引中查找:
```python
for keyword in ["鱼", "鱼肉", "鲫鱼", "鲈鱼"]:
entities = graph_indexing.get_entities_by_key(keyword)
```
2️⃣ **图索引匹配结果**(假设):
```python
# keyword = "鱼"
entities = [
{
"node_id": "200000001",
"entity_name": "清蒸鲈鱼",
"entity_type": "Recipe",
"value_content": "清蒸鲈鱼\n主料:鲈鱼 1 条\n配料:葱、姜、料酒\n做法:蒸制 8 分钟",
"index_keys": ["鱼", "鲈鱼", "蒸菜"]
},
{
"node_id": "200000002",
"entity_name": "红烧鲤鱼",
"entity_type": "Recipe",
"value_content": "红烧鲤鱼\n主料:鲤鱼 1 条\n配料:酱油、糖、醋\n做法:红烧 15 分钟",
"index_keys": ["鱼", "鲤鱼", "红烧"]
}
]
```
3️⃣ **获取邻居节点**(丰富上下文):
```python
neighbors = _get_node_neighbors("200000001")
# 返回:["鲈鱼", "葱", "姜", "蒸", "清淡"]
```
4️⃣ **构建增强内容**:
```python
enhanced_content = """清蒸鲈鱼
主料:鲈鱼 1 条
配料:葱、姜、料酒
做法:蒸制 8 分钟
相关信息:鲈鱼,葱,姜,蒸,清淡"""
```
5️⃣ **生成 RetrievalResult**:
```python
RetrievalResult(
content="清蒸鲈鱼\n相关信息:鲈鱼,葱,姜...",
node_id="200000001",
node_type="Recipe",
relevance_score=0.9, # 精确匹配得分高
retrieval_level="entity",
metadata={
"entity_name": "清蒸鲈鱼",
"matched_keyword": "鱼"
}
)
```
6️⃣ **如果图索引结果不够**,用 Neo4j 补充检索:
```cypher
UNWIND ['鱼', '鱼肉'] as keyword
CALL db.index.fulltext.queryNodes('recipe_fulltext_index', '鱼*')
YIELD node, score
WHERE node:Recipe
RETURN node.nodeId, node.name, node.description, score
```
**Neo4j 返回结果示例:**
```
node_id: "200000003", name: "水煮鱼", description: "麻辣鲜香的水煮鱼", score: 0.85
node_id: "200000004", name: "酸菜鱼", description: "酸爽开胃的酸菜鱼", score: 0.78
```
**实体级检索最终结果:**
```python
entity_results = [
RetrievalResult(content="清蒸鲈鱼...", node_id="200000001", relevance_score=0.9),
RetrievalResult(content="红烧鲤鱼...", node_id="200000002", relevance_score=0.9),
RetrievalResult(content="水煮鱼...", node_id="200000003", relevance_score=0.85 * 0.7),
RetrievalResult(content="酸菜鱼...", node_id="200000004", relevance_score=0.78 * 0.7),
]
```
---
#### **2.2 主题级检索 `topic_level_retrieval`**
拿着 `["好吃", "美味", "下饭菜", "家常菜"]` 去检索
**具体过程:**
1️⃣ **使用图索引的关系键值对**:
```python
for keyword in ["好吃", "美味", "下饭菜"]:
relations = graph_indexing.get_relations_by_key(keyword)
```
2️⃣ **关系匹配结果**(假设):
```python
# keyword = "好吃"
relations = [
{
"relation_id": "rel_001",
"source_entity": "200000001", # 清蒸鲈鱼
"target_entity": "300000001", # 5 星 rating
"relation_type": "HAS_RATING",
"value_content": "评分:5 星(非常好吃)"
}
]
```
3️⃣ **构建主题内容**:
```python
content_parts = [
"主题:好吃",
"评分:5 星(非常好吃)",
"相关菜品:清蒸鲈鱼",
"相关信息:5 星 rating"
]
```
4️⃣ **使用分类信息匹配**:
```python
# 查询 category/cuisineType/tags 包含"好吃"的菜品
cypher_query = """
MATCH (r:Recipe)
WHERE r.category CONTAINS '好吃'
OR r.cuisineType CONTAINS '好吃'
OR r.tags CONTAINS '好吃'
RETURN r.nodeId, r.name, r.category, r.difficulty
"""
```
**Neo4j 返回结果示例:**
```
node_id: "200000005", name: "宫保鸡丁", category: "好吃下饭菜", difficulty: "简单"
node_id: "200000006", name: "麻婆豆腐", category: "经典好吃", difficulty: "简单"
```
5️⃣ **生成主题级检索结果**:
```python
topic_results = [
RetrievalResult(
content="主题:好吃\n评分:5 星\n相关菜品:清蒸鲈鱼",
node_id="200000001",
relevance_score=0.95, # 主题匹配得分
retrieval_level="topic",
metadata={"matched_keyword": "好吃"}
),
RetrievalResult(
content="菜品:宫保鸡丁\n分类:好吃下饭菜",
node_id="200000005",
relevance_score=0.75,
retrieval_level="topic",
metadata={"source": "neo4j_fallback"}
)
]
```
---
#### **2.3 合并双层检索结果**
```python
# 合并 entity_results 和 topic_results
all_results = entity_results + topic_results
# 去重(避免同一道菜出现多次)
seen_nodes = set()
unique_results = []
for result in sorted(all_results, key=lambda x: x.relevance_score, reverse=True):
if result.node_id not in seen_nodes:
seen_nodes.add(result.node_id)
unique_results.append(result)
# 转换为 Document 格式
dual_docs = [
Document(
page_content="清蒸鲈鱼\n主料:鲈鱼 1 条\n相关信息:鲈鱼,葱,姜",
metadata={
"node_id": "200000001",
"recipe_name": "清蒸鲈鱼",
"relevance_score": 0.9,
"search_type": "dual_level"
}
),
Document(
page_content="红烧鲤鱼\n主料:鲤鱼 1 条",
metadata={
"node_id": "200000002",
"recipe_name": "红烧鲤鱼",
"relevance_score": 0.9,
"search_type": "dual_level"
}
),
Document(page_content="水煮鱼...", metadata={"node_id": "200000003", ...}),
Document(page_content="酸菜鱼...", metadata={"node_id": "200000004", ...}),
]
```
---
### **第 3 步:向量检索(与双层检索并行)**
与此同时,另一条线程在执行 Milvus 向量检索:
```python
# 将用户查询转换为向量
query_vector = llm_embedding.encode("有什么好吃的鱼肉的菜吗")
# Milvus 向量相似度搜索
vector_docs = milvus_module.similarity_search(query, k=10) # 先扩大范围
```
**Milvus 返回结果示例:**
```python
[
{
"text": "豆瓣鱼\n主料:草鱼 1 条\n配料:郫县豆瓣酱、姜、蒜\n特点:色泽红亮,味道鲜美",
"metadata": {"node_id": "200000007", "recipe_name": "豆瓣鱼"},
"score": 0.12 # COSINE 距离(越小越相似)
},
{
"text": "剁椒鱼头\n主料:大鱼头 1 个\n配料:剁椒、豆豉\n特点:鲜辣适口,肉质细嫩",
"metadata": {"node_id": "200000008", "recipe_name": "剁椒鱼头"},
"score": 0.18
},
{
"text": "孔雀开屏鱼\n主料:武昌鱼 1 条\n摆盘精美如孔雀开屏",
"metadata": {"node_id": "200000009", "recipe_name": "孔雀开屏鱼"},
"score": 0.22
}
]
```
**图信息增强**(给每个结果添加邻居):
```python
for doc in vector_docs:
neighbors = _get_node_neighbors(doc["metadata"]["node_id"])
# 例如:["草鱼", "豆瓣酱", "川菜", "辣"]
doc["text"] += f"\n相关信息:{', '.join(neighbors[:3])}"
```
**转换后的 vector_docs:**
```python
vector_docs = [
Document(
page_content="豆瓣鱼\n主料:草鱼 1 条\n相关信息:草鱼,豆瓣酱,川菜",
metadata={
"node_id": "200000007",
"recipe_name": "豆瓣鱼",
"score": 0.12, # COSINE 距离
"search_type": "vector_enhanced"
}
),
Document(
page_content="剁椒鱼头\n相关信息:鱼头,剁椒,湘菜",
metadata={
"node_id": "200000008",
"recipe_name": "剁椒鱼头",
"score": 0.18,
"search_type": "vector_enhanced"
}
),
Document(...)
]
```
---
### **第 4 步:Round-robin 轮询合并(关键!)**
现在有两份结果:
- `dual_docs` = [清蒸鲈鱼,红烧鲤鱼,水煮鱼,酸菜鱼]
- `vector_docs` = [豆瓣鱼,剁椒鱼头,孔雀开屏鱼]
**Round-robin 合并过程:**
```python
merged_docs = []
seen_doc_ids = set()
# 第 1 轮:i=0
# 先从 dual_docs 取
doc = dual_docs[0] # 清蒸鲈鱼
if doc.node_id not in seen_doc_ids:
merged_docs.append(doc)
seen_doc_ids.add("200000001")
# 再从 vector_docs 取
doc = vector_docs[0] # 豆瓣鱼
if doc.node_id not in seen_doc_ids:
merged_docs.append(doc)
seen_doc_ids.add("200000007")
# 第 2 轮:i=1
# 从 dual_docs 取
doc = dual_docs[1] # 红烧鲤鱼
merged_docs.append(doc)
# 从 vector_docs 取
doc = vector_docs[1] # 剁椒鱼头
merged_docs.append(doc)
# 第 3 轮:i=2
doc = dual_docs[2] # 水煮鱼
merged_docs.append(doc)
doc = vector_docs[2] # 孔雀开屏鱼
merged_docs.append(doc)
# 第 4 轮:i=3
doc = dual_docs[3] # 酸菜鱼
merged_docs.append(doc)
# vector_docs 已经取完了,跳过
```
**合并后的结果:**
```python
merged_docs = [
清蒸鲈鱼 (dual),
豆瓣鱼 (vector),
红烧鲤鱼 (dual),
剁椒鱼头 (vector),
水煮鱼 (dual),
孔雀开屏鱼 (vector),
酸菜鱼 (dual)
]
```
**为什么要 Round-robin?**
- 避免单一检索源的偏见
- 保证多样性(既有精确匹配的,也有语义相似的)
- 双源互补:dual_level 擅长精确匹配,vector 擅长语义扩展
---
### **第 5 步:取 Top-K 并返回**
```python
# 假设 top_k=5
final_docs = merged_docs[:5]
# 最终返回:
[
清蒸鲈鱼, # 来自双层检索
豆瓣鱼, # 来自向量检索
红烧鲤鱼, # 来自双层检索
剁椒鱼头, # 来自向量检索
水煮鱼 # 来自双层检索
]
```
---
## 🎯 总结:三种检索方式的分工
| 检索方式 | 作用 | 优势 | 举例 |
| -------------- | ------------------ | -------------- | ---------------------------------------- |
| **实体级检索** | 找具体鱼肉相关的菜 | 精确匹配关键词 | 用户说"鱼",直接找到"清蒸鲈鱼" |
| **主题级检索** | 找"好吃"的菜 | 理解抽象概念 | 找到评分高、标记为"好吃"的菜 |
| **向量检索** | 找语义相似的菜 | 发现隐含关联 | 虽然没提"豆瓣",但知道豆瓣鱼也是鱼的做法 |
**最终答案的特点:**
1. ✅ 有精确匹配的(清蒸鲈鱼、红烧鲤鱼)
2. ✅ 有语义扩展的(豆瓣鱼、剁椒鱼头)
3. ✅ 兼顾多样性和相关性
4. ✅ 每道菜都附带了丰富的上下文信息(邻居节点)
这就是为什么混合检索比单一检索更强大!🎉
# intelligent_query_router原理
我将为这个智能查询路由器添加详细注释,并绘制工作流程图。
首先让我分析一下代码结构,然后添加通俗易懂的详细注释:
现在我为您绘制工作流程图:
```mermaid
graph TD
Start([用户查询开始]) --> AnalyzeQuery[1. 查询特征分析]
AnalyzeQuery --> LLMAnalysis{LLM 智能分析}
LLMAnalysis -->|成功 | ParseJSON[解析 JSON 分析结果]
LLMAnalysis -->|失败 | RuleBased[2. 基于规则的降级分析]
ParseJSON --> GetAnalysis[获取查询分析结果
复杂度/关系密集度/推理需求/实体数量]
RuleBased --> GetAnalysis
GetAnalysis --> UpdateStats[3. 更新路由统计]
UpdateStats --> StrategyDecision{4. 策略决策}
StrategyDecision -->|HYBRID_TRADITIONAL
简单查询 | TraditionalSearch[传统混合检索
关键词 + 向量检索]
StrategyDecision -->|GRAPH_RAG
复杂推理 | GraphSearch[图 RAG 检索
知识图谱多跳推理]
StrategyDecision -->|COMBINED
全面结果 | CombinedSearch[组合检索
并行执行两种检索]
CombinedSearch --> ThreadTraditional[传统检索线程]
CombinedSearch --> ThreadGraph[图检索线程]
ThreadTraditional --> MergeResults[合并结果]
ThreadGraph --> MergeResults
TraditionalSearch --> PostProcess[5. 结果后处理
添加元数据/去重/排序]
GraphSearch --> PostProcess
MergeResults --> PostProcess
PostProcess --> ReturnResult([返回结果 + 分析报告])
%% 样式定义
classDef process fill:#e1f5ff,stroke:#0066cc;
classDef decision fill:#fff4e1,stroke:#ff9900;
classDef startend fill:#e8f5e9,stroke:#2e7d32;
classDef parallel fill:#fce4ec,stroke:#c2185b;
class Start,ReturnResult startend
class AnalyzeQuery,ParseJSON,GetAnalysis,UpdateStats,TraditionalSearch,GraphSearch,PostProcess process
class LLMAnalysis,StrategyDecision,RuleBased decision
class CombinedSearch,ThreadTraditional,ThreadGraph,MergeResults parallel
```
**工作流程说明表:**
| 步骤 | 环节名称 | 核心任务 | 技术要点 |
| ---- | ---------------- | -------------------- | --------------------------------------------------- |
| 1 | 查询特征分析 | 理解用户意图 | 调用 LLM 分析复杂度、关系密集度、推理需求、实体数量 |
| 2 | 降级分析(可选) | LLM 失败时的保底方案 | 基于关键词规则快速估算 |
| 3 | 更新统计 | 记录路由决策 | 计数器累加,用于监控和优化 |
| 4 | 策略决策 | 选择检索引擎 | 根据分析结果三选一:传统/图 RAG/组合 |
| 4.1 | 传统检索 | 快速匹配 | 关键词 BM25 + 向量相似度 |
| 4.2 | 图 RAG 检索 | 深度推理 | 知识图谱上的多跳路径查找 |
| 4.3 | 组合检索 | 并行执行 | 线程池并发,Round-robin 合并 |
| 5 | 结果后处理 | 优化输出 | 添加元数据、去重、截断 |
| 6 | 返回结果 | 交付给用户 | 文档列表 + 分析报告 |
这个流程图展示了从用户查询到最终返回结果的完整过程,包括:
- **智能分析**:用 LLM 深度理解查询特征
- **灵活路由**:根据查询特点自动选择最佳策略
- **容错机制**:LLM 失败时用规则方法兜底
- **并行优化**:组合策略时并发执行提升速度
- **可解释性**:返回结果时附带决策理由
# session_cache_manager原理
我来为这个会话缓存管理模块添加详细注释,并绘制工作流程图。
```mermaid
flowchart TD
Start([用户发起查询]) --> Init{SessionCacheManager
初始化}
Init --> |首次使用| CreateSession[创建会话结构
session_caches
session_embeddings
session_contexts]
Init --> |已存在| CheckCache{check_semantic_cache
检查语义缓存}
CreateSession --> CheckCache
CheckCache --> |session_id无效| NoCache[返回None
未命中缓存]
CheckCache --> |session不存在| NoCache
CheckCache --> |缓存为空| NoCache
CheckCache --> |有缓存数据| CalcEmbedding[计算查询向量
embedding_model.embed_documents]
CalcEmbedding --> LoopStart{遍历缓存项}
LoopStart --> |还有缓存项| GetCachedEmb[获取缓存向量]
GetCachedEmb --> CalcSim[_calculate_similarity
计算余弦相似度]
CalcSim --> CheckThresh{相似度 >= 0.75?}
CheckThresh --> |否| LoopStart
CheckThresh --> |是且更优| UpdateBest[更新最佳匹配
best_similarity
best_response]
UpdateBest --> LoopStart
LoopStart --> |遍历完成| HasMatch{找到匹配?}
HasMatch --> |是| CacheHit[🎯 缓存命中
返回缓存答案]
HasMatch --> |否| NoCache
NoCache --> ExecuteQuery[执行正常查询流程
检索/生成答案]
CacheHit --> End([返回答案给用户])
ExecuteQuery --> AddContext[add_to_context
添加对话到上下文]
AddContext --> InitContext{会话上下文存在?}
InitContext --> |否| CreateContext[创建上下文列表]
InitContext --> |是| AppendMsg[追加对话记录
query + response + timestamp]
CreateContext --> AppendMsg
AppendMsg --> CheckCtxLen{上下文长度 > 10?}
CheckCtxLen --> |是| RemoveOldest[删除最早对话
pop 0]
CheckCtxLen --> |否| AddCache
RemoveOldest --> AddCache
AddCache[add_to_semantic_cache
添加到语义缓存] --> InitCache{会话缓存存在?}
InitCache --> |否| CreateCache[创建缓存结构]
InitCache --> |是| CheckCacheSize{缓存数 >= 50?}
CreateCache --> CheckCacheSize
CheckCacheSize --> |是| DelOldestCache[删除最旧缓存项
FIFO策略]
CheckCacheSize --> |否| CalcNewEmb[计算查询向量]
DelOldestCache --> CalcNewEmb
CalcNewEmb --> StoreCache[存储问答对和向量
session_caches
session_embeddings]
StoreCache --> LogOp[记录操作日志]
LogOp --> End
End --> NextQuery([等待下一个查询])
subgraph 缓存检查流程
CheckCache
CalcEmbedding
LoopStart
GetCachedEmb
CalcSim
CheckThresh
UpdateBest
HasMatch
CacheHit
NoCache
end
subgraph 上下文管理流程
AddContext
InitContext
CreateContext
AppendMsg
CheckCtxLen
RemoveOldest
end
subgraph 缓存添加流程
AddCache
InitCache
CreateCache
CheckCacheSize
DelOldestCache
CalcNewEmb
StoreCache
LogOp
end
style CacheHit fill:#90EE90
style NoCache fill:#FFB6C1
style CalcEmbedding fill:#87CEEB
style CalcSim fill:#87CEEB
style StoreCache fill:#DDA0DD
style AppendMsg fill:#DDA0DD
```
## 核心机制说明表
| 组件 | 数据结构 | 作用 | 容量限制 | 淘汰策略 |
| ---------------------- | ---------------------------------------------- | ---------------------------- | ------------ | ---------------- |
| **session_caches** | `{session_id: {query: {response, timestamp}}}` | 存储问答对,快速检索 | 每会话50条 | FIFO(先进先出) |
| **session_embeddings** | `{session_id: {query: np.ndarray}}` | 存储查询向量,用于相似度计算 | 与caches同步 | 随caches同步删除 |
| **session_contexts** | `{session_id: [{query, response, timestamp}]}` | 维护对话历史,支持上下文理解 | 每会话10条 | 删除最早对话 |
## 关键参数说明
| 参数 | 默认值 | 作用 | 调整建议 |
| -------------------------- | ------ | ------------------ | -------------------------- |
| **cache_threshold** | 0.75 | 语义相似度阈值 | 提高→更严格,降低→更易命中 |
| **max_session_cache_size** | 50 | 每会话最大缓存数 | 根据内存大小调整 |
| **max_context_length** | 10 | 每会话最大上下文数 | 根据对话复杂度调整 |
现在代码已经添加了非常详细的注释,涵盖了:
1. **每个方法的功能说明和工作流程**
2. **参数的详细解释**
3. **代码逻辑的逐步说明**
4. **实际应用场景举例**
5. **异常处理和边界情况**
工作流程图展示了完整的生命周期:从初始化 → 缓存检查 → 未命中时的处理 → 添加到上下文 → 添加到缓存的全过程。
# 会话机制举例说明
我来用几个通俗易懂的生活化例子,帮你理解这个会话缓存管理模块是如何工作的。
## 📚 核心概念类比
把这个模块想象成一个**餐厅服务员的大脑记忆系统**:
| 技术概念 | 生活类比 | 说明 |
| -------------------------- | ------------------ | ------------------------------------ |
| **session_id** | 不同的餐桌号 | 1号桌、2号桌的客人互不干扰 |
| **session_caches** | 服务员的快速记忆本 | 记录每桌客人问过的问题和答案 |
| **session_embeddings** | 问题的"语义指纹" | 把问题转换成数字特征,方便比对相似度 |
| **session_contexts** | 对话历史记录 | 记住刚才聊了什么,理解上下文 |
| **cache_threshold (0.75)** | 相似度判断标准 | 两个问题有多像才算"同一个问题" |
---
## 🎯 场景一:语义缓存命中(避免重复劳动)
### 情景描述
用户在**同一个聊天窗口**问了两次相似的问题:
```
时间线:
14:00 - 用户问:"红烧肉怎么做?"
→ AI检索知识库,生成详细答案(耗时3秒)
→ 缓存管理器保存:问题+答案+向量指纹
14:05 - 用户问:"红烧肉的做法是什么?"
→ 缓存管理器发现:这和之前的问题很像!(相似度0.82)
→ 直接返回缓存的答案(耗时0.1秒)✅ 节省2.9秒!
```
### 代码执行流程
```python
# 第一次提问:红烧肉怎么做?
cache_manager.check_semantic_cache("红烧肉怎么做", session_id="user_123")
# 返回 None(缓存中没有,需要正常检索)
# AI生成答案后,添加到缓存
cache_manager.add_to_semantic_cache(
"红烧肉怎么做",
"首先准备五花肉,切成块状...",
"user_123"
)
# 此时缓存结构:
# session_caches["user_123"] = {
# "红烧肉怎么做": {
# "response": "首先准备五花肉...",
# "timestamp": "2026-04-09T14:00:00"
# }
# }
# session_embeddings["user_123"]["红烧肉怎么做"] = [0.1, 0.5, -0.3, ...] # 向量表示
# 第二次提问:红烧肉的做法是什么?
result = cache_manager.check_semantic_cache("红烧肉的做法是什么", session_id="user_123")
# 内部执行:
# 1. 计算"红烧肉的做法是什么"的向量
# 2. 与缓存中的"红烧肉怎么做"向量计算余弦相似度 = 0.82
# 3. 0.82 >= 0.75(阈值),判定为相似问题
# 4. 返回缓存的答案 ✅
```
### 为什么要这样做?
- **节省时间**:不用重新检索数据库、重新生成答案
- **降低成本**:减少LLM调用次数(每次调用都要花钱)
- **提升体验**:用户几乎立即得到回答
---
## 🔗 场景二:上下文管理(理解多轮对话)
### 情景描述
用户进行**连续追问**,需要AI理解上下文:
```
用户:"宫保鸡丁怎么做?"
AI: "需要准备鸡胸肉、花生、干辣椒..."
用户:"那素食版本呢?" ← 关键:这里的"那"指的是宫保鸡丁
AI: "素食版宫保鸡丁可以用豆腐代替鸡肉..."
用户:"它的热量高吗?" ← 关键:这里的"它"指的是素食版宫保鸡丁
AI: "素食版宫保鸡丁热量约300大卡/份..."
```
### 代码执行流程
```python
# 第一轮对话
cache_manager.add_to_context("user_123", "宫保鸡丁怎么做?", "需要准备鸡胸肉...")
# session_contexts["user_123"] = [
# {"query": "宫保鸡丁怎么做?", "response": "需要准备鸡胸肉...", "timestamp": "..."}
# ]
# 第二轮对话
cache_manager.add_to_context("user_123", "那素食版本呢?", "素食版可以用豆腐...")
# session_contexts["user_123"] = [
# {"query": "宫保鸡丁怎么做?", "response": "需要准备鸡胸肉...", ...},
# {"query": "那素食版本呢?", "response": "素食版可以用豆腐...", ...}
# ]
# 第三轮对话前,构建增强查询
enhanced_query = cache_manager.get_context_for_query(
"user_123",
"它的热量高吗?"
)
# 返回的增强查询内容:
"""
用户问: 宫保鸡丁怎么做?
AI答: 需要准备鸡胸肉、花生、干辣椒...
用户问: 那素食版本呢?
AI答: 素食版宫保鸡丁可以用豆腐代替鸡肉...
当前问题: 它的热量高吗?
"""
# AI收到这个增强查询后,就知道"它"指的是"素食版宫保鸡丁"
```
### 为什么要这样做?
- **解决指代消解**:理解"它"、"那"、"这个"指的是什么
- **保持对话连贯**:AI不会忘记前面聊过什么
- **限制上下文长度**:只保留最近3轮,避免提示词太长影响性能
---
## 🏪 场景三:多会话隔离(不同用户互不干扰)
### 情景描述
**两个用户同时使用**系统,他们的缓存完全独立:
```
用户A(session_id="alice"):
- 问:"蛋糕怎么做?"
- 缓存了烘焙类问题的答案
用户B(session_id="bob"):
- 问:"火锅底料怎么炒?"
- 缓存了川菜类问题的答案
用户A再问:"火锅底料怎么炒?"
→ 不会命中用户B的缓存!因为session_id不同
→ 会正常检索并生成新答案
```
### 代码执行流程
```python
# 用户A的操作
cache_manager.add_to_semantic_cache("蛋糕怎么做", "准备面粉、鸡蛋...", "alice")
# session_caches["alice"] = {"蛋糕怎么做": {...}}
# session_caches["bob"] = {} # 不受影响
# 用户B的操作
cache_manager.add_to_semantic_cache("火锅底料怎么炒", "准备牛油、花椒...", "bob")
# session_caches["alice"] = {"蛋糕怎么做": {...}}
# session_caches["bob"] = {"火锅底料怎么炒": {...}}
# 用户A问火锅底料
result = cache_manager.check_semantic_cache("火锅底料怎么炒", "alice")
# 只在 session_caches["alice"] 中查找
# 找不到"火锅底料"相关问题,返回 None
# 即使用户B已经问过这个问题,也不会共享缓存
```
### 为什么要这样做?
- **隐私保护**:用户A看不到用户B的对话历史
- **个性化**:每个用户的兴趣不同,缓存也应该不同
- **数据隔离**:避免混淆不同用户的偏好和上下文
---
## 🗑️ 场景四:缓存淘汰机制(防止内存爆炸)
### 情景描述
单个用户问了**超过50个问题**,缓存满了怎么办?
```
用户问了第1个问题 → 缓存大小: 1/50
用户问了第2个问题 → 缓存大小: 2/50
...
用户问了第50个问题 → 缓存大小: 50/50(已满)
用户问第51个问题 → 删除第1个问题的缓存 → 缓存大小: 50/50
用户问第52个问题 → 删除第2个问题的缓存 → 缓存大小: 50/50
```
### 代码执行流程
```python
# 假设缓存已经有50条记录
print(len(cache_manager.session_caches["user_123"])) # 输出: 50
# 添加第51个问题
cache_manager.add_to_semantic_cache("第51个问题", "答案...", "user_123")
# 内部执行:
# if len(session_cache) >= 50:
# oldest_key = next(iter(session_cache)) # 获取最早插入的问题
# del session_cache[oldest_key] # 删除最旧的问答对
# del session_embeddings[oldest_key] # 删除对应的向量
# 结果:缓存仍然是50条,但替换掉了最旧的那条
```
### 为什么要这样做?
- **控制内存占用**:防止程序运行久了吃掉所有内存
- **FIFO策略**:先进先出,最旧的数据最可能不再需要
- **自动维护**:不需要手动清理,系统自动管理
---
## 📊 场景五:相似度阈值的实际效果
### 情景描述
`cache_threshold = 0.75` 这个参数如何影响缓存命中率?
| 用户新问题 | 缓存中的旧问题 | 相似度 | 是否命中(阈值0.75) | 说明 |
| ---------------- | -------------------- | ------ | ------------------ | ------------------------ |
| "红烧肉怎么做" | "红烧肉的做法" | 0.85 | ✅ 命中 | 表达方式不同,意思相同 |
| "宫保鸡丁" | "宫保鸡丁的做法步骤" | 0.78 | ✅ 命中 | 简写vs完整表达 |
| "蛋糕" | "面包" | 0.65 | ❌ 未命中 | 虽然都是烘焙,但差异较大 |
| "清蒸鱼" | "红烧鱼" | 0.72 | ❌ 未命中 | 烹饪方法不同 |
| "怎么做番茄炒蛋" | "番茄炒蛋的制作方法" | 0.92 | ✅ 命中 | 几乎完全相同的意思 |
### 调整阈值的影响
```python
# 如果调高阈值到 0.9
cache_manager.cache_threshold = 0.9
# 结果:只有非常相似的问题才会命中缓存
# 优点:答案更准确
# 缺点:缓存命中率降低,节省的效果不明显
# 如果调低阈值到 0.6
cache_manager.cache_threshold = 0.6
# 结果:稍微有点像的问题就会命中缓存
# 优点:缓存命中率高,响应更快
# 缺点:可能返回不太相关的答案
```
---
## 🔄 完整工作流程示例
让我用一个真实的对话场景展示整个模块的工作过程:
```python
# ========== 初始化 ==========
cache_manager = SessionCacheManager(embedding_model=my_model)
# ========== 用户Alice的第一次提问 ==========
query1 = "糖醋排骨怎么做?"
session_id = "alice_001"
# 步骤1: 检查缓存
cached_answer = cache_manager.check_semantic_cache(query1, session_id)
# 结果: None(首次提问,缓存为空)
# 步骤2: AI正常检索并生成答案(这里省略检索过程)
answer1 = "准备排骨500克,白糖3勺,醋2勺..."
# 步骤3: 添加到上下文
cache_manager.add_to_context(session_id, query1, answer1)
# 步骤4: 添加到语义缓存
cache_manager.add_to_semantic_cache(query1, answer1, session_id)
# 日志: 📝 已添加到会话缓存 alice_001, 当前大小: 1/50
# ========== 用户Alice的第二次提问(相似问题)==========
query2 = "糖醋排骨的做法是什么?"
# 步骤1: 检查缓存
cached_answer = cache_manager.check_semantic_cache(query2, session_id)
# 内部: 计算相似度 = 0.88 >= 0.75,命中!
# 日志: 🎯 会话缓存命中! Session: alice_001, 相似度: 0.880
# 结果: "准备排骨500克,白糖3勺,醋2勺..."(直接返回,无需检索)
# ========== 用户Alice的第三次提问(上下文相关)==========
query3 = "那糖醋里脊呢?"
# 步骤1: 检查缓存(未命中,因为之前没问过里脊)
cached_answer = cache_manager.check_semantic_cache(query3, session_id)
# 结果: None
# 步骤2: 构建增强查询
enhanced_query = cache_manager.get_context_for_query(session_id, query3)
# 返回:
# """
# 用户问: 糖醋排骨怎么做?
# AI答: 准备排骨500克,白糖3勺,醋2勺...
# 当前问题: 那糖醋里脊呢?
# """
# 步骤3: AI使用增强查询生成答案(知道"那"指的是类似的糖醋做法)
answer3 = "糖醋里脊的做法和糖醋排骨类似,只是把排骨换成里脊肉..."
# 步骤4: 保存到缓存和上下文
cache_manager.add_to_context(session_id, query3, answer3)
cache_manager.add_to_semantic_cache(query3, answer3, session_id)
# ========== 用户Bob开始对话(完全独立的会话)==========
session_id_bob = "bob_001"
# Bob问同样的问题,不会命中Alice的缓存
cached = cache_manager.check_semantic_cache("糖醋排骨怎么做?", session_id_bob)
# 结果: None(Bob的会话是空的)
# ========== 查看统计信息 ==========
stats = cache_manager.get_session_stats()
# 返回:
# {
# 'total_sessions': 2, # Alice和Bob两个会话
# 'total_cached_queries': 3, # Alice缓存了3个问题
# 'total_contexts': 3, # Alice有3轮对话历史
# 'cache_threshold': 0.75,
# 'max_session_cache_size': 50,
# 'max_context_length': 10
# }
# ========== 清除Alice的缓存 ==========
cache_manager.clear_session_cache("alice_001")
# 日志: 🗑️ 已清除会话 alice_001 的缓存(共3个存储区)
```
---
## 💡 总结:这个模块解决了什么问题?
| 问题 | 解决方案 | 效果 |
| -------------------- | ------------ | ------------------------ |
| 用户反复问相似问题 | 语义缓存机制 | 响应速度提升30倍 |
| 多轮对话不理解上下文 | 上下文管理 | AI能理解"它"、"那"等指代 |
| 多个用户同时使用 | 会话隔离 | 数据互不干扰 |
| 长时间运行内存泄漏 | 自动淘汰机制 | 内存占用可控 |
| 缓存命中率难以监控 | 统计信息接口 | 便于调优参数 |
这个模块就像是给AI系统装了一个**智能短期记忆系统**,让它既能记住刚才聊过什么,又能聪明地复用之前的答案,还能同时服务多个用户而不混淆!
# 那如果我的这个项目要扩大到十几万用户的规模,那会话还能缓存到内存吗?到时候我要怎么设计一套上下文机制和用户会话管理和缓存管理呢
非常好的问题!从单用户/小并发扩展到十几万用户,**内存缓存方案会彻底崩溃**。让我给你详细分析问题和提供完整的架构升级方案。
---
## 🚨 当前方案在大规模下的问题
### 1. **内存爆炸**
```python
# 假设场景:10万日活用户,5%同时在线
并发用户数 = 100,000 × 5% = 5,000人
# 每个用户的内存占用
每用户缓存 = 50条 × (问题100字节 + 答案500字节 + 向量768维×4字节)
≈ 50 × (600 + 3072) bytes
≈ 180 KB
# 总会话缓存占用
总内存 = 5,000 × 180 KB ≈ 900 MB
# 加上上下文历史
每用户上下文 = 10轮 × (问题100 + 答案500) × 2(冗余) ≈ 12 KB
总上下文 = 5,000 × 12 KB ≈ 60 MB
# 合计:~1 GB(这只是缓存部分!)
```
**问题清单:**
| 问题 | 说明 | 严重程度 |
| ---------------- | ------------------------------------------------ | -------- |
| **内存泄漏风险** | Python字典不会自动回收,长期运行可能涨到10GB+ | 🔴 严重 |
| **进程重启丢失** | 服务器更新/崩溃后,所有会话数据清零 | 🟡 中等 |
| **无法水平扩展** | 多实例部署时,用户A的请求可能落到实例B,缓存失效 | 🔴 严重 |
| **GC压力大** | 大量小对象导致垃圾回收频繁,影响性能 | 🟡 中等 |
### 2. **实际案例推演**
```
时间线:
08:00 - 系统启动,内存占用 200MB
09:00 - 早高峰,1000个并发会话,内存 400MB
12:00 - 午高峰,5000个并发会话,内存 1.2GB
18:00 - 晚高峰,8000个并发会话,内存 2GB
23:00 - 深夜,500个会话,但内存仍为 1.8GB(旧数据未清理)
第7天:
- 内存持续增长到 8GB
- 触发OOM Killer,进程被系统杀死 ❌
```
---
## 🏗️ 大规模架构设计方案
### 核心设计原则
```mermaid
graph TB
A[10万用户规模] --> B{设计目标}
B --> C[分布式部署
支持水平扩展]
B --> D[数据持久化
重启不丢失]
B --> E[高性能访问
毫秒级响应]
B --> F[成本可控
冷热分离]
C --> G[Redis集群
会话缓存]
D --> H[MySQL/MongoDB
会话元数据]
E --> I[本地LRU缓存
热点数据]
F --> J[TTL自动过期
分级存储]
style G fill:#FFD700
style H fill:#90EE90
style I fill:#87CEEB
```
---
## 📐 完整架构方案(三层缓存体系)
### 架构总览
```
┌─────────────────────────────────────────────────────────┐
│ 用户请求层 (10万+) │
└──────────────┬──────────────────┬───────────────────────┘
│ │
▼ ▼
┌─────────────────────────────────────────────────────────┐
│ L1: 本地缓存 (进程内, <1ms) │
│ • 容量: 每实例 1000 个热点会话 │
│ • 技术: functools.lru_cache / cachetools │
│ • 命中: ~30% (最热数据) │
└──────────────┬──────────────────┬───────────────────────┘
│ Miss │ Hit
▼ ▼
┌─────────────────────────────────────────────────────────┐
│ L2: Redis 集群 (分布式, 1-5ms) │
│ • 容量: 50,000 活跃会话 │
│ • 技术: Redis Hash + Vector Similarity Search │
│ • 命中: ~80% (剩余热数据) │
└──────────────┬──────────────────┬───────────────────────┘
│ Miss │ Hit
▼ ▼
┌─────────────────────────────────────────────────────────┐
│ L3: 持久化存储 (MySQL/MongoDB, 10-50ms) │
│ • 容量: 全量历史会话 │
│ • 技术: 关系型/文档数据库 │
│ • 用途: 冷数据归档、数据分析 │
└─────────────────────────────────────────────────────────┘
```
---
## 💻 完整代码实现
### 方案一:基于 Redis 的分布式会话缓存(推荐)
```python
"""
分布式会话缓存管理模块
支持10万+用户规模的会话管理、上下文管理和缓存管理
架构设计:
- L1: 本地LRU缓存(热点数据,<1ms)
- L2: Redis集群(活跃会话,1-5ms)
- L3: MySQL/MongoDB(全量历史,持久化)
"""
import json
import logging
import hashlib
from typing import Dict, List, Optional, Any, Tuple
from datetime import datetime, timedelta
from functools import lru_cache
import numpy as np
import redis
from redis.commands.search.field import VectorField, TextField, NumericField
from redis.commands.search.indexDefinition import IndexDefinition, IndexType
from redis.commands.search.query import Query
import mysql.connector # 或 pymongo
logger = logging.getLogger(__name__)
class DistributedSessionCacheManager:
"""
分布式会话缓存管理器
核心改进:
1. 使用Redis替代内存字典,支持分布式部署
2. 引入TTL机制,自动清理过期会话
3. 三级缓存架构,平衡性能和成本
4. 会话状态持久化,支持断线重连
"""
def __init__(
self,
embedding_model=None,
redis_host: str = "localhost",
redis_port: int = 6379,
redis_db: int = 0,
redis_password: str = None,
mysql_config: Dict[str, str] = None,
local_cache_size: int = 1000,
session_ttl: int = 3600, # 会话过期时间:1小时
active_session_limit: int = 50000 # Redis中最大活跃会话数
):
"""
初始化分布式缓存管理器
参数:
embedding_model: 文本向量化模型
redis_host/port/db/password: Redis连接配置
mysql_config: MySQL配置(用于持久化)
local_cache_size: 本地缓存大小(每进程)
session_ttl: 会话存活时间(秒),超时自动清理
active_session_limit: Redis中最大会话数,超限淘汰最久未访问
"""
self.embedding_model = embedding_model
self.session_ttl = session_ttl
self.active_session_limit = active_session_limit
# ========== L1: 本地缓存(最快,但容量小)==========
# 使用LRU策略,自动淘汰最少使用的会话
self._local_cache_get = lru_cache(maxsize=local_cache_size)(self._remote_get)
self._local_cache_context = lru_cache(maxsize=local_cache_size)(self._remote_get_context)
# ========== L2: Redis集群(分布式,容量大)==========
try:
self.redis_client = redis.Redis(
host=redis_host,
port=redis_port,
db=redis_db,
password=redis_password,
decode_responses=True, # 自动解码字符串
socket_connect_timeout=5,
socket_timeout=5
)
# 测试连接
self.redis_client.ping()
logger.info(f"✅ Redis连接成功: {redis_host}:{redis_port}")
except Exception as e:
logger.error(f"❌ Redis连接失败: {e}")
raise
# 初始化Redis向量索引(用于语义搜索)
self._init_redis_vector_index()
# ========== L3: MySQL持久化(可选)==========
self.mysql_config = mysql_config
if mysql_config:
try:
self.mysql_conn = mysql.connector.connect(**mysql_config)
self._init_mysql_tables()
logger.info("✅ MySQL连接成功")
except Exception as e:
logger.warning(f"⚠️ MySQL连接失败,将禁用持久化: {e}")
self.mysql_conn = None
else:
self.mysql_conn = None
def _init_redis_vector_index(self):
"""
初始化Redis向量索引
使用RediSearch模块创建向量相似度索引
注意:需要Redis Stack版本(包含RediSearch模块)
"""
try:
# 检查是否已存在索引
existing_indexes = self.redis_client.ft("idx:session_vectors").info()
if existing_indexes:
logger.info("Redis向量索引已存在,跳过创建")
return
except:
pass # 索引不存在,继续创建
try:
# 定义索引字段
schema = (
TextField("$.session_id", as_name="session_id"),
TextField("$.query_hash", as_name="query_hash"),
VectorField(
"$.embedding", # 向量字段路径
"FLAT", # 索引类型:FLAT(精确)或 HNSW(近似)
{
"TYPE": "FLOAT32",
"DIM": 768, # 向量维度(根据embedding模型调整)
"DISTANCE_METRIC": "COSINE" # 距离度量:余弦相似度
},
as_name="embedding"
),
NumericField("$.timestamp", as_name="timestamp")
)
# 创建索引
definition = IndexDefinition(prefix=["session_vec:"], index_type=IndexType.JSON)
self.redis_client.ft("idx:session_vectors").create_index(
fields=schema,
definition=definition
)
logger.info("✅ Redis向量索引创建成功")
except Exception as e:
logger.warning(f"⚠️ Redis向量索引创建失败(可能需要Redis Stack): {e}")
def _init_mysql_tables(self):
"""初始化MySQL表结构(用于持久化会话历史)"""
cursor = self.mysql_conn.cursor()
# 会话元数据表
cursor.execute("""
CREATE TABLE IF NOT EXISTS sessions (
session_id VARCHAR(128) PRIMARY KEY,
user_id VARCHAR(128),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_active TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
message_count INT DEFAULT 0,
INDEX idx_last_active (last_active),
INDEX idx_user_id (user_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
""")
# 对话历史表
cursor.execute("""
CREATE TABLE IF NOT EXISTS conversation_history (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
session_id VARCHAR(128),
query TEXT,
response TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (session_id) REFERENCES sessions(session_id) ON DELETE CASCADE,
INDEX idx_session_created (session_id, created_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
""")
self.mysql_conn.commit()
cursor.close()
logger.info("✅ MySQL表结构初始化完成")
# ==================== 核心方法 ====================
def check_semantic_cache(self, query: str, session_id: str) -> Optional[str]:
"""
检查语义缓存(三级缓存流程)
流程:
1. 查L1本地缓存(最快)
2. 查L2 Redis向量索引(次快)
3. 未命中则返回None,由上层调用检索模块
"""
if not session_id:
return None
try:
# Step 1: 计算查询向量和哈希
query_embedding = self.embedding_model.embed_documents([query])[0]
query_hash = self._compute_query_hash(query)
# Step 2: 尝试L1缓存(本地LRU)
cached_response = self._local_cache_get(session_id, query_hash)
if cached_response:
logger.debug(f"🎯 L1缓存命中: {session_id[:8]}...")
return cached_response
# Step 3: 查L2 Redis向量索引
redis_response = self._search_redis_vectors(query_embedding, session_id)
if redis_response:
# 写入L1缓存
self._local_cache_get(session_id, query_hash)
logger.info(f"🎯 L2缓存命中: {session_id[:8]}...")
return redis_response
# Step 4: 未命中
return None
except Exception as e:
logger.error(f"❌ 语义缓存检查失败: {e}", exc_info=True)
return None
def _search_redis_vectors(self, query_embedding: List[float], session_id: str) -> Optional[str]:
"""
在Redis向量索引中搜索相似查询
参数:
query_embedding: 查询向量(768维)
session_id: 会话ID
返回:
缓存的答案,或未命中返回None
"""
try:
# 构建向量搜索查询
vector_bytes = np.array(query_embedding, dtype=np.float32).tobytes()
# 查询条件:同一会话 + 相似度>0.75
base_query = f"@session_id:{session_id}"
k = 5 # 返回top-5最相似的
# 执行向量搜索
query_obj = (
Query(f"{base_query}=>[KNN {k} @embedding $vector AS score]")
.sort_by("score")
.return_fields("query_hash", "response", "score")
.dialect(2)
)
results = self.redis_client.ft("idx:session_vectors").search(
query_obj,
query_params={"vector": vector_bytes}
)
# 检查是否有满足阈值的结果
for doc in results.docs:
score = float(doc.score)
if score >= 0.75: # 相似度阈值
logger.debug(f"Redis向量搜索命中,相似度: {score:.3f}")
return doc.response
return None
except Exception as e:
logger.warning(f"Redis向量搜索失败: {e}")
return None
def add_to_semantic_cache(self, query: str, response: str, session_id: str):
"""
添加到语义缓存(写入L2 Redis + 异步写入L3 MySQL)
"""
if not session_id:
return
try:
# Step 1: 计算向量和哈希
query_embedding = self.embedding_model.embed_documents([query])[0]
query_hash = self._compute_query_hash(query)
# Step 2: 存入Redis JSON(带TTL)
cache_key = f"session_vec:{session_id}:{query_hash}"
cache_data = {
"session_id": session_id,
"query_hash": query_hash,
"query": query[:200], # 截断保存
"response": response,
"embedding": query_embedding,
"timestamp": datetime.now().timestamp()
}
# 使用JSON存储,支持向量索引
self.redis_client.json().set(cache_key, "$", cache_data)
self.redis_client.expire(cache_key, self.session_ttl) # 设置过期时间
# Step 3: 更新会话活跃度
self.redis_client.zadd("active_sessions", {session_id: datetime.now().timestamp()})
self.redis_client.expire("active_sessions", self.session_ttl * 2)
# Step 4: 限制活跃会话数量(淘汰最久的)
self._evict_old_sessions()
# Step 5: 异步写入MySQL(不阻塞主流程)
if self.mysql_conn:
self._async_save_to_mysql(session_id, query, response)
# Step 6: 清除L1缓存(保证一致性)
self._local_cache_get.cache_clear()
logger.debug(f"📝 已添加到Redis缓存: {session_id[:8]}...")
except Exception as e:
logger.error(f"❌ 添加语义缓存失败: {e}", exc_info=True)
def add_to_context(self, session_id: str, query: str, response: str):
"""
添加对话到上下文(Redis + MySQL双写)
"""
if not session_id:
return
try:
# Step 1: 存入Redis List(最新N条)
context_key = f"context:{session_id}"
context_entry = json.dumps({
"query": query,
"response": response[:500], # 截断避免过大
"timestamp": datetime.now().isoformat()
})
# 推入列表头部
self.redis_client.lpush(context_key, context_entry)
# 保留最近10条
self.redis_client.ltrim(context_key, 0, 9)
# 设置TTL
self.redis_client.expire(context_key, self.session_ttl)
# Step 2: 同步到MySQL(完整历史)
if self.mysql_conn:
self._save_context_to_mysql(session_id, query, response)
# Step 3: 清除L1上下文缓存
self._local_cache_context.cache_clear()
logger.debug(f"📝 已添加上下文: {session_id[:8]}...")
except Exception as e:
logger.error(f"❌ 添加上下文失败: {e}", exc_info=True)
def get_context_for_query(self, session_id: str, current_query: str) -> str:
"""
获取增强的查询上下文(从Redis读取)
"""
if not session_id:
return current_query
try:
# Step 1: 尝试L1缓存
context_list = self._local_cache_context(session_id)
if context_list is None:
# Step 2: 从Redis读取
context_key = f"context:{session_id}"
context_entries = self.redis_client.lrange(context_key, 0, 2) # 最近3条
if not context_entries:
return current_query
context_list = [json.loads(entry) for entry in context_entries]
# Step 3: 格式化上下文
context_parts = []
for item in reversed(context_list): # 按时间正序
context_parts.append(f"用户问: {item['query']}")
resp = item['response'][:100]
if len(item['response']) > 100:
resp += "..."
context_parts.append(f"AI答: {resp}")
context_parts.append(f"当前问题: {current_query}")
enhanced_query = "\n".join(context_parts)
logger.debug(f"🔗 已构建上下文增强查询: {len(context_list)}轮历史")
return enhanced_query
except Exception as e:
logger.warning(f"上下文获取失败: {e},使用原始查询")
return current_query
def _remote_get(self, session_id: str, query_hash: str) -> Optional[str]:
"""从Redis直接获取缓存(供L1缓存调用)"""
cache_key = f"session_vec:{session_id}:{query_hash}"
data = self.redis_client.json().get(cache_key)
return data.get("response") if data else None
def _remote_get_context(self, session_id: str) -> Optional[List[Dict]]:
"""从Redis获取上下文(供L1缓存调用)"""
context_key = f"context:{session_id}"
entries = self.redis_client.lrange(context_key, 0, 9)
if not entries:
return None
return [json.loads(entry) for entry in entries]
def _evict_old_sessions(self):
"""
淘汰过期会话(控制Redis内存占用)
策略:
- 当活跃会话数超过限制时,删除最久未访问的会话
"""
try:
current_count = self.redis_client.zcard("active_sessions")
if current_count > self.active_session_limit:
# 计算需要删除的数量
to_delete = current_count - self.active_session_limit
# 获取最旧的to_delete个会话
old_sessions = self.redis_client.zrange(
"active_sessions", 0, to_delete - 1
)
# 批量删除
pipe = self.redis_client.pipeline()
for session_id in old_sessions:
pipe.delete(f"context:{session_id}")
# 注意:向量缓存通过TTL自动过期,无需手动删除
pipe.zrem("active_sessions", session_id)
pipe.execute()
logger.info(f"♻️ 已淘汰 {len(old_sessions)} 个过期会话")
except Exception as e:
logger.warning(f"会话淘汰失败: {e}")
def _async_save_to_mysql(self, session_id: str, query: str, response: str):
"""异步保存到MySQL(简化版,实际应使用线程池)"""
try:
cursor = self.mysql_conn.cursor()
# 插入或更新会话
cursor.execute("""
INSERT INTO sessions (session_id, message_count)
VALUES (%s, 1)
ON DUPLICATE KEY UPDATE
message_count = message_count + 1,
last_active = NOW()
""", (session_id,))
# 插入对话记录
cursor.execute("""
INSERT INTO conversation_history (session_id, query, response)
VALUES (%s, %s, %s)
""", (session_id, query[:5000], response[:10000]))
self.mysql_conn.commit()
cursor.close()
except Exception as e:
logger.warning(f"MySQL保存失败: {e}")
def _save_context_to_mysql(self, session_id: str, query: str, response: str):
"""保存上下文到MySQL"""
self._async_save_to_mysql(session_id, query, response)
@staticmethod
def _compute_query_hash(query: str) -> str:
"""计算查询的哈希值(用于快速匹配完全相同的查询)"""
return hashlib.md5(query.encode('utf-8')).hexdigest()
def clear_session_cache(self, session_id: str):
"""清除指定会话的所有缓存"""
try:
pipe = self.redis_client.pipeline()
pipe.delete(f"context:{session_id}")
pipe.zrem("active_sessions", session_id)
pipe.execute()
# 清除L1缓存
self._local_cache_get.cache_clear()
self._local_cache_context.cache_clear()
logger.info(f"🗑️ 已清除会话缓存: {session_id[:8]}...")
except Exception as e:
logger.error(f"清除会话缓存失败: {e}")
def get_session_stats(self) -> Dict[str, Any]:
"""获取缓存统计信息"""
try:
active_sessions = self.redis_client.zcard("active_sessions")
return {
'active_sessions': active_sessions,
'session_ttl': self.session_ttl,
'active_session_limit': self.active_session_limit,
'redis_memory_used': self.redis_client.info('memory')['used_memory_human'],
'mysql_enabled': self.mysql_conn is not None
}
except Exception as e:
logger.error(f"获取统计信息失败: {e}")
return {}
```
---
## 📊 性能对比表
| 指标 | 原方案(内存) | 新方案(Redis) | 提升 |
| -------------- | --------------- | --------------------- | ------------ |
| **并发支持** | 单进程~5000会话 | 集群~50万会话 | **100倍** |
| **响应时间** | 0.1ms | 1-5ms | 略慢但可接受 |
| **内存占用** | 无限制(危险) | 可控(配置maxmemory) | **安全** |
| **水平扩展** | ❌ 不支持 | ✅ 支持 | **关键** |
| **数据持久化** | ❌ 重启丢失 | ✅ RDB/AOF | **可靠** |
| **运维复杂度** | 低 | 中 | 需部署Redis |
---
## 🚀 部署架构建议
### 生产环境配置
```yaml
# docker-compose.yml
version: '3.8'
services:
# Redis集群(主从+哨兵)
redis-master:
image: redis/redis-stack:latest
ports:
- "6379:6379"
volumes:
- redis-data:/data
command: redis-server --appendonly yes --maxmemory 4gb --maxmemory-policy allkeys-lru
redis-slave-1:
image: redis/redis-stack:latest
command: redis-server --slaveof redis-master 6379
redis-slave-2:
image: redis/redis-stack:latest
command: redis-server --slaveof redis-master 6379
# MySQL(持久化)
mysql:
image: mysql:8.0
environment:
MYSQL_ROOT_PASSWORD: your_password
MYSQL_DATABASE: session_db
volumes:
- mysql-data:/var/lib/mysql
# 应用服务(多实例)
app:
build: .
replicas: 5 # 5个实例负载均衡
environment:
REDIS_HOST: redis-master
MYSQL_HOST: mysql
depends_on:
- redis-master
- mysql
volumes:
redis-data:
mysql-data:
```
---
## 💰 成本估算(10万日活)
| 组件 | 配置 | 月成本(阿里云) |
| --------------- | ------------------- | ---------------- |
| **Redis集群** | 主从3节点,8GB内存 | ¥1,500 |
| **MySQL** | 高可用版,100GB存储 | ¥800 |
| **应用服务器** | 5台4核8G | ¥2,500 |
| **负载均衡SLB** | - | ¥200 |
| **总计** | - | **¥5,000/月** |
---
## 🎯 总结与建议
### 渐进式迁移路线
```
阶段1(当前): 内存缓存 → 适合 <1000并发
阶段2(短期): 单机Redis → 适合 <1万并发
阶段3(中期): Redis主从 → 适合 <5万并发
阶段4(长期): Redis集群+MySQL分库 → 适合 >10万并发
```
### 关键决策点
| 用户规模 | 推荐方案 | 理由 |
| ------------- | ------------------- | ------------------ |
| **< 1,000** | 保持现状(内存) | 简单够用 |
| **1K - 10K** | 单机Redis | 成本低,易维护 |
| **10K - 50K** | Redis主从+哨兵 | 高可用,读扩展 |
| **> 50K** | Redis集群+MySQL分库 | 水平扩展,海量数据 |
需要我提供具体的Redis配置文件或者MySQL表优化建议吗?
# 那如果我希望能匹配到类似的回答呢,怎么设计使用向量数据库去命中类似问题的缓存
非常好的问题!你想要的是**语义级别的缓存命中**,而不仅仅是完全相同的查询。让我给你设计一套基于向量数据库的相似问答匹配方案。
---
## 🎯 核心思路对比
### 当前方案 vs 向量数据库方案
| 维度 | 当前方案(内存遍历) | 向量数据库方案 |
| -------------- | -------------------------------- | -------------------------- |
| **匹配方式** | 线性遍历所有缓存,逐一计算相似度 | ANN近似最近邻搜索 |
| **时间复杂度** | O(n),n=缓存数量(最多50) | O(log n),即使百万级也很快 |
| **可扩展性** | 差,超过1000条就慢 | 好,支持亿级向量 |
| **跨会话匹配** | ❌ 不支持(会话隔离) | ✅ 可选(全局/会话级) |
| **持久化** | ❌ 重启丢失 | ✅ 永久保存 |
---
## 🏗️ 完整架构设计
### 方案选择:Redis Vector Search(推荐)
```mermaid
graph TB
A[用户提问] --> B[计算查询向量]
B --> C{Redis向量搜索}
C -->|相似度>0.85| D[🎯 高置信度命中
直接返回缓存答案]
C -->|0.75-0.85| E[⚠️ 中置信度
返回缓存+标注来源]
C -->|<0.75| F[❌ 未命中
正常检索流程]
D --> G[记录命中日志]
E --> H[用户确认是否正确]
H -->|是| I[添加到缓存]
H -->|否| F
F --> J[Milvus检索知识库]
J --> K[LLM生成答案]
K --> L[存入Redis向量索引]
style D fill:#90EE90
style E fill:#FFD700
style F fill:#FFB6C1
```
---
## 💻 完整代码实现
### 基于 Redis Vector Search 的语义缓存
```python
"""
基于向量数据库的语义缓存管理模块
支持大规模相似问题匹配和智能缓存命中
技术栈:
- Redis Stack (RediSearch + JSON + Vector Search)
- 支持HNSW/FLAT索引算法
- 多级相似度阈值策略
"""
import json
import logging
import hashlib
import time
from typing import Dict, List, Optional, Any, Tuple
from datetime import datetime
import numpy as np
import redis
from redis.commands.search.field import VectorField, TextField, NumericField, TagField
from redis.commands.search.indexDefinition import IndexDefinition, IndexType
from redis.commands.search.query import Query
from redis.commands.search.aggregation import AggregateRequest, Reducer
logger = logging.getLogger(__name__)
class VectorSemanticCacheManager:
"""
向量语义缓存管理器
核心特性:
1. 基于Redis向量搜索,支持毫秒级相似度匹配
2. 多级阈值策略:高置信度直接返回,中置信度需确认
3. 支持跨会话匹配(可选)
4. 自动学习机制:记录用户反馈优化阈值
5. 缓存预热:高频问题预加载到内存
"""
def __init__(
self,
embedding_model=None,
redis_host: str = "localhost",
redis_port: int = 6379,
redis_db: int = 0,
redis_password: str = None,
vector_dimension: int = 768,
high_confidence_threshold: float = 0.85, # 高置信度阈值
medium_confidence_threshold: float = 0.75, # 中置信度阈值
top_k_results: int = 5, # 返回top-K个相似结果
cache_ttl: int = 86400 * 7, # 缓存过期时间:7天
enable_cross_session: bool = False, # 是否启用跨会话匹配
index_type: str = "HNSW" # 索引类型:HNSW(快) 或 FLAT(准)
):
"""
初始化向量语义缓存
参数说明:
embedding_model: 文本向量化模型
redis_host/port/db/password: Redis连接配置
vector_dimension: 向量维度(根据embedding模型调整)
high_confidence_threshold: 高置信度阈值,超过直接返回
medium_confidence_threshold: 中置信度阈值,需要用户确认
top_k_results: 每次搜索返回的最相似结果数
cache_ttl: 缓存存活时间(秒)
enable_cross_session: 是否允许跨会话匹配(隐私考虑)
index_type: 索引算法
- HNSW: 分层导航小世界图,速度快,适合大规模
- FLAT: 暴力搜索,精度高,适合小规模
"""
self.embedding_model = embedding_model
self.vector_dimension = vector_dimension
self.high_threshold = high_confidence_threshold
self.medium_threshold = medium_confidence_threshold
self.top_k = top_k_results
self.cache_ttl = cache_ttl
self.enable_cross_session = enable_cross_session
self.index_type = index_type
# 连接Redis
try:
self.redis_client = redis.Redis(
host=redis_host,
port=redis_port,
db=redis_db,
password=redis_password,
decode_responses=True,
socket_connect_timeout=5,
socket_timeout=5
)
self.redis_client.ping()
logger.info(f"✅ Redis连接成功: {redis_host}:{redis_port}")
except Exception as e:
logger.error(f"❌ Redis连接失败: {e}")
raise
# 初始化向量索引
self._create_vector_index()
# 统计信息
self.stats = {
'total_queries': 0,
'high_confidence_hits': 0,
'medium_confidence_hits': 0,
'misses': 0,
'avg_search_time_ms': 0
}
def _create_vector_index(self):
"""
创建Redis向量索引
索引结构:
- session_id: 会话ID(Tag字段,用于过滤)
- query_hash: 查询哈希(用于去重)
- query_text: 原始查询文本(用于展示)
- response: 缓存的答案
- embedding: 向量(768维,用于相似度搜索)
- similarity_score: 历史最高相似度(用于优化)
- hit_count: 命中次数(用于热度排序)
- created_at: 创建时间戳
- last_hit_at: 最后命中时间戳
"""
index_name = "idx:semantic_cache"
try:
# 检查索引是否已存在
try:
existing_info = self.redis_client.ft(index_name).info()
if existing_info:
logger.info(f"✅ 向量索引已存在: {index_name}")
return
except:
pass # 索引不存在,继续创建
# 定义索引字段
schema = (
# Tag字段:用于精确过滤(会话ID)
TagField("$.session_id", as_name="session_id"),
# Text字段:用于全文搜索和展示
TextField("$.query_hash", as_name="query_hash"),
TextField("$.query_text", as_name="query_text"),
TextField("$.response", as_name="response"),
# Vector字段:核心,用于相似度搜索
VectorField(
"$.embedding", # JSON路径
self.index_type, # 索引类型:HNSW 或 FLAT
{
"TYPE": "FLOAT32",
"DIM": self.vector_dimension,
"DISTANCE_METRIC": "COSINE", # 余弦相似度
# HNSW参数(仅当index_type=HNSW时有效)
"M": 16, # 每个节点的连接数,越大越准但越慢
"EF_CONSTRUCTION": 200, # 构建时的搜索深度
"EF_RUNTIME": 50 # 查询时的搜索深度
},
as_name="embedding"
),
# Numeric字段:用于数值过滤和排序
NumericField("$.similarity_score", as_name="similarity_score"),
NumericField("$.hit_count", as_name="hit_count"),
NumericField("$.created_at", as_name="created_at"),
NumericField("$.last_hit_at", as_name="last_hit_at")
)
# 创建索引定义
definition = IndexDefinition(
prefix=["semcache:"], # Key前缀
index_type=IndexType.JSON
)
# 执行创建
self.redis_client.ft(index_name).create_index(
fields=schema,
definition=definition
)
logger.info(f"✅ 向量索引创建成功: {index_name}")
logger.info(f" - 索引类型: {self.index_type}")
logger.info(f" - 向量维度: {self.vector_dimension}")
logger.info(f" - 距离度量: COSINE")
except Exception as e:
logger.error(f"❌ 向量索引创建失败: {e}")
raise
def check_semantic_cache(
self,
query: str,
session_id: str = None,
require_high_confidence: bool = False
) -> Dict[str, Any]:
"""
检查语义缓存(核心方法)
工作流程:
1. 将查询转换为向量
2. 在Redis中执行向量相似度搜索
3. 根据相似度分级处理:
- 高置信度(>=0.85): 直接返回
- 中置信度(0.75-0.85): 返回但标记需确认
- 低置信度(<0.75): 返回None,走正常检索
参数:
query: 用户查询文本
session_id: 会话ID(如果enable_cross_session=False则必须提供)
require_high_confidence: 是否只接受高置信度结果
返回:
Dict包含:
- found: bool, 是否找到
- confidence: str, 'high'/'medium'/'none'
- response: str, 缓存的答案
- similarity: float, 相似度分数
- source_query: str, 匹配的原始问题
- needs_confirmation: bool, 是否需要用户确认
"""
start_time = time.time()
self.stats['total_queries'] += 1
try:
# Step 1: 计算查询向量
query_embedding = self.embedding_model.embed_documents([query])[0]
query_vector = np.array(query_embedding, dtype=np.float32).tobytes()
# Step 2: 构建搜索查询
# 如果禁用跨会话,则限定在当前会话内搜索
filter_expr = ""
if not self.enable_cross_session and session_id:
filter_expr = f"@session_id:{{{session_id}}}"
# Step 3: 执行向量搜索
search_query = (
Query(f"{filter_expr}=>[KNN {self.top_k} @embedding $vector AS score]")
.sort_by("score")
.return_fields(
"session_id",
"query_hash",
"query_text",
"response",
"similarity_score",
"hit_count",
"score"
)
.dialect(2)
)
# 执行搜索
results = self.redis_client.ft("idx:semantic_cache").search(
search_query,
query_params={"vector": query_vector}
)
# Step 4: 分析结果
search_time_ms = (time.time() - start_time) * 1000
self._update_avg_search_time(search_time_ms)
if not results.docs:
# 未找到任何相似结果
self.stats['misses'] += 1
logger.debug(f"❌ 缓存未命中: {query[:30]}...")
return {
'found': False,
'confidence': 'none',
'response': None,
'similarity': 0.0,
'source_query': None,
'needs_confirmation': False
}
# Step 5: 获取最佳匹配(第一个结果)
best_match = results.docs[0]
# 注意:Redis返回的是距离(distance),需要转换为相似度
# COSINE距离 = 1 - 相似度
distance = float(best_match.score)
similarity = 1 - distance
source_query = best_match.query_text
cached_response = best_match.response
# Step 6: 根据相似度分级处理
if similarity >= self.high_threshold:
# 高置信度:直接返回
self.stats['high_confidence_hits'] += 1
self._update_hit_stats(best_match.query_hash, similarity)
logger.info(
f"🎯 高置信度命中! 相似度: {similarity:.3f}, "
f"源问题: {source_query[:50]}..."
)
return {
'found': True,
'confidence': 'high',
'response': cached_response,
'similarity': similarity,
'source_query': source_query,
'needs_confirmation': False
}
elif similarity >= self.medium_threshold:
# 中置信度:返回但标记需确认
self.stats['medium_confidence_hits'] += 1
logger.info(
f"⚠️ 中置信度命中! 相似度: {similarity:.3f}, "
f"源问题: {source_query[:50]}..."
)
return {
'found': True,
'confidence': 'medium',
'response': cached_response,
'similarity': similarity,
'source_query': source_query,
'needs_confirmation': True
}
else:
# 低置信度:视为未命中
self.stats['misses'] += 1
logger.debug(
f"❌ 相似度不足: {similarity:.3f} < {self.medium_threshold}"
)
return {
'found': False,
'confidence': 'none',
'response': None,
'similarity': similarity,
'source_query': source_query,
'needs_confirmation': False
}
except Exception as e:
logger.error(f"❌ 语义缓存检查失败: {e}", exc_info=True)
return {
'found': False,
'confidence': 'none',
'response': None,
'similarity': 0.0,
'source_query': None,
'needs_confirmation': False
}
def add_to_cache(
self,
query: str,
response: str,
session_id: str = None,
metadata: Dict[str, Any] = None
) -> bool:
"""
添加问答对到向量缓存
参数:
query: 用户问题
response: AI回答
session_id: 会话ID
metadata: 额外元数据(如菜品分类、难度等)
返回:
bool: 是否添加成功
"""
try:
# Step 1: 计算向量和哈希
query_embedding = self.embedding_model.embed_documents([query])[0]
query_hash = self._compute_hash(query)
# Step 2: 检查是否已存在相同问题(去重)
existing_key = f"semcache:{query_hash}"
if self.redis_client.exists(existing_key):
logger.debug(f"⚠️ 问题已存在,更新缓存: {query[:30]}...")
# 可以选择更新或跳过
# self._update_existing_cache(existing_key, response)
return True
# Step 3: 构建缓存对象
now_timestamp = datetime.now().timestamp()
cache_entry = {
"session_id": session_id or "global",
"query_hash": query_hash,
"query_text": query[:500], # 截断避免过长
"response": response,
"embedding": query_embedding,
"similarity_score": 1.0, # 初始值
"hit_count": 0,
"created_at": now_timestamp,
"last_hit_at": 0,
"metadata": metadata or {}
}
# Step 4: 存入Redis JSON
cache_key = f"semcache:{query_hash}"
self.redis_client.json().set(cache_key, "$", cache_entry)
# Step 5: 设置TTL
self.redis_client.expire(cache_key, self.cache_ttl)
logger.info(f"📝 已添加到向量缓存: {query[:30]}... (hash: {query_hash[:8]})")
return True
except Exception as e:
logger.error(f"❌ 添加缓存失败: {e}", exc_info=True)
return False
def confirm_cache_quality(
self,
query: str,
is_helpful: bool,
actual_similarity: float = None
):
"""
用户反馈机制:确认缓存答案是否有用
用途:
- 记录用户对中置信度结果的反馈
- 动态调整阈值
- 标记低质量缓存
参数:
query: 原始查询
is_helpful: 是否有用
actual_similarity: 实际相似度(如果已知)
"""
try:
query_hash = self._compute_hash(query)
cache_key = f"semcache:{query_hash}"
# 获取缓存条目
entry = self.redis_client.json().get(cache_key)
if not entry:
logger.warning(f"缓存条目不存在: {query_hash}")
return
if is_helpful:
# 有用:增加命中计数,提升权重
current_hits = entry.get('hit_count', 0)
self.redis_client.json().set(
cache_key,
"$.hit_count",
current_hits + 1
)
logger.info(f"👍 用户确认有用: {query[:30]}...")
else:
# 无用:降低相似度评分,可能后续清理
if actual_similarity:
self.redis_client.json().set(
cache_key,
"$.similarity_score",
actual_similarity * 0.8 # 降权
)
logger.warning(f"👎 用户反馈无用: {query[:30]}...")
except Exception as e:
logger.error(f"反馈记录失败: {e}")
def get_similar_questions(
self,
query: str,
top_n: int = 5,
session_id: str = None
) -> List[Dict[str, Any]]:
"""
获取相似问题列表(用于推荐或调试)
参数:
query: 查询文本
top_n: 返回数量
session_id: 会话ID(可选)
返回:
相似问题列表,按相似度排序
"""
try:
# 计算向量
query_embedding = self.embedding_model.embed_documents([query])[0]
query_vector = np.array(query_embedding, dtype=np.float32).tobytes()
# 构建过滤条件
filter_expr = ""
if session_id and not self.enable_cross_session:
filter_expr = f"@session_id:{{{session_id}}}"
# 执行搜索
search_query = (
Query(f"{filter_expr}=>[KNN {top_n} @embedding $vector AS score]")
.sort_by("score")
.return_fields(
"query_text",
"response",
"hit_count",
"score"
)
.dialect(2)
)
results = self.redis_client.ft("idx:semantic_cache").search(
search_query,
query_params={"vector": query_vector}
)
# 格式化结果
similar_questions = []
for doc in results.docs:
distance = float(doc.score)
similarity = 1 - distance
similar_questions.append({
'question': doc.query_text,
'answer': doc.response,
'similarity': similarity,
'hit_count': int(doc.hit_count)
})
return similar_questions
except Exception as e:
logger.error(f"获取相似问题失败: {e}")
return []
def cleanup_low_quality_cache(
self,
min_hit_count: int = 0,
max_age_days: int = 30
) -> int:
"""
清理低质量缓存
策略:
- 命中次数为0且超过N天的缓存
- 用户反馈无用的缓存
参数:
min_hit_count: 最小命中次数
max_age_days: 最大存活天数
返回:
清理的数量
"""
try:
cutoff_timestamp = datetime.now().timestamp() - (max_age_days * 86400)
# 使用聚合查询找出符合条件的缓存
agg_request = AggregateRequest("*") \
.group_by("@query_hash", Reducer.max("@created_at").alias("max_created")) \
.filter(f"@max_created < {cutoff_timestamp}")
results = self.redis_client.ft("idx:semantic_cache").aggregate(agg_request)
cleaned_count = 0
for row in results.rows:
query_hash = row[1] # 第二个元素是query_hash
cache_key = f"semcache:{query_hash}"
# 检查命中次数
entry = self.redis_client.json().get(cache_key)
if entry and entry.get('hit_count', 0) <= min_hit_count:
self.redis_client.delete(cache_key)
cleaned_count += 1
logger.info(f"🗑️ 已清理 {cleaned_count} 个低质量缓存")
return cleaned_count
except Exception as e:
logger.error(f"清理缓存失败: {e}")
return 0
def get_cache_statistics(self) -> Dict[str, Any]:
"""获取缓存统计信息"""
try:
# 总缓存数
total_keys = self.redis_client.dbsize()
# 计算命中率
total_queries = self.stats['total_queries']
hit_rate = (
(self.stats['high_confidence_hits'] + self.stats['medium_confidence_hits'])
/ total_queries * 100
if total_queries > 0 else 0
)
return {
'total_cached_items': total_keys,
'total_queries': total_queries,
'high_confidence_hits': self.stats['high_confidence_hits'],
'medium_confidence_hits': self.stats['medium_confidence_hits'],
'misses': self.stats['misses'],
'hit_rate_percent': round(hit_rate, 2),
'avg_search_time_ms': round(self.stats['avg_search_time_ms'], 2),
'high_threshold': self.high_threshold,
'medium_threshold': self.medium_threshold,
'index_type': self.index_type
}
except Exception as e:
logger.error(f"获取统计信息失败: {e}")
return {}
# ==================== 辅助方法 ====================
def _update_hit_stats(self, query_hash: str, similarity: float):
"""更新命中统计"""
try:
cache_key = f"semcache:{query_hash}"
# 增加命中计数
current_hits = self.redis_client.json().get(cache_key, "$.hit_count")[0]
self.redis_client.json().set(
cache_key,
"$.hit_count",
current_hits + 1
)
# 更新最后命中时间
self.redis_client.json().set(
cache_key,
"$.last_hit_at",
datetime.now().timestamp()
)
# 更新最高相似度
self.redis_client.json().set(
cache_key,
"$.similarity_score",
max(similarity, self.redis_client.json().get(cache_key, "$.similarity_score")[0])
)
except Exception as e:
logger.warning(f"更新命中统计失败: {e}")
def _update_avg_search_time(self, search_time_ms: float):
"""更新平均搜索时间(移动平均)"""
alpha = 0.1 # 平滑系数
current_avg = self.stats['avg_search_time_ms']
self.stats['avg_search_time_ms'] = alpha * search_time_ms + (1 - alpha) * current_avg
@staticmethod
def _compute_hash(text: str) -> str:
"""计算文本哈希"""
return hashlib.md5(text.encode('utf-8')).hexdigest()
def clear_all_cache(self):
"""清空所有缓存(谨慎使用)"""
try:
keys = self.redis_client.keys("semcache:*")
if keys:
self.redis_client.delete(*keys)
logger.info(f"🗑️ 已清空所有语义缓存(共{len(keys)}条)")
except Exception as e:
logger.error(f"清空缓存失败: {e}")
```
---
## 📊 使用示例
### 完整工作流程
```python
# ========== 初始化 ==========
from langchain_huggingface import HuggingFaceEmbeddings
# 加载embedding模型
embedding_model = HuggingFaceEmbeddings(
model_name="BAAI/bge-small-zh-v1.5",
model_kwargs={'device': 'cpu'}
)
# 创建向量缓存管理器
cache_manager = VectorSemanticCacheManager(
embedding_model=embedding_model,
redis_host="localhost",
redis_port=6379,
vector_dimension=512, # bge-small的维度
high_confidence_threshold=0.85,
medium_confidence_threshold=0.75,
enable_cross_session=False # 保护隐私,不跨会话匹配
)
# ========== 场景1: 首次提问(缓存未命中)==========
query1 = "红烧肉怎么做?"
result = cache_manager.check_semantic_cache(query1, session_id="user_123")
if not result['found']:
print("❌ 缓存未命中,执行正常检索...")
# 调用Milvus检索 + LLM生成
answer = retrieve_and_generate(query1)
# 存入缓存
cache_manager.add_to_cache(query1, answer, session_id="user_123")
print("✅ 已添加到缓存")
# ========== 场景2: 相似问题(高置信度命中)==========
query2 = "红烧肉的做法是什么?"
result = cache_manager.check_semantic_cache(query2, session_id="user_123")
if result['confidence'] == 'high':
print(f"🎯 高置信度命中! 相似度: {result['similarity']:.3f}")
print(f" 源问题: {result['source_query']}")
print(f" 答案: {result['response'][:100]}...")
# 直接返回,无需重新检索
final_answer = result['response']
# ========== 场景3: 中置信度(需要用户确认)==========
query3 = "糖醋排骨怎么烧?"
result = cache_manager.check_semantic_cache(query3, session_id="user_123")
if result['confidence'] == 'medium':
print(f"⚠️ 找到相似问题(相似度: {result['similarity']:.3f})")
print(f" 源问题: {result['source_query']}")
print(f" 建议答案: {result['response'][:100]}...")
# 询问用户是否有帮助
user_feedback = ask_user("这个答案对你有帮助吗?(yes/no)")
if user_feedback == 'yes':
cache_manager.confirm_cache_quality(query3, is_helpful=True)
final_answer = result['response']
else:
cache_manager.confirm_cache_quality(query3, is_helpful=False)
# 重新检索
final_answer = retrieve_and_generate(query3)
cache_manager.add_to_cache(query3, final_answer, session_id="user_123")
# ========== 场景4: 获取相似问题推荐 ==========
similar = cache_manager.get_similar_questions("鸡肉怎么做", top_n=3)
print("\n📚 相关问题推荐:")
for i, item in enumerate(similar, 1):
print(f"{i}. {item['question']} (相似度: {item['similarity']:.2f})")
# ========== 查看统计信息 ==========
stats = cache_manager.get_cache_statistics()
print(f"\n📊 缓存统计:")
print(f" 总缓存数: {stats['total_cached_items']}")
print(f" 查询次数: {stats['total_queries']}")
print(f" 命中率: {stats['hit_rate_percent']}%")
print(f" 平均搜索时间: {stats['avg_search_time_ms']}ms")
# ========== 定期清理低质量缓存 ==========
cleaned = cache_manager.cleanup_low_quality_cache(
min_hit_count=0,
max_age_days=30
)
print(f"🗑️ 已清理 {cleaned} 个低质量缓存")
```
---
## 🎯 多级阈值策略详解
### 为什么需要分级?
| 置信度 | 相似度范围 | 处理方式 | 用户体验 | 适用场景 |
| ------------ | ---------- | ---------- | ---------- | ---------------------- |
| **高置信度** | ≥0.85 | 直接返回 | ⚡ 极速响应 | 表达不同但意思完全相同 |
| **中置信度** | 0.75-0.85 | 返回+确认 | ⚠️ 需验证 | 部分相关,可能需要调整 |
| **低置信度** | <0.75 | 视为未命中 | 🔄 重新检索 | 完全不同的问题 |
### 实际案例
```python
用户问:"宫保鸡丁怎么做?"
缓存中有:"宫保鸡丁的做法步骤"
→ 相似度: 0.92 → 高置信度 ✅ 直接返回
用户问:"辣子鸡如何制作?"
缓存中有:"宫保鸡丁的做法步骤"
→ 相似度: 0.78 → 中置信度 ⚠️ 返回但提示"这是宫保鸡丁的做法,您需要吗?"
用户问:"蛋糕怎么烤?"
缓存中有:"宫保鸡丁的做法步骤"
→ 相似度: 0.45 → 低置信度 ❌ 重新检索
```
---
## 🚀 性能优化建议
### 1. 索引选择
| 索引类型 | 适用规模 | 速度 | 精度 | 内存占用 |
| ------------- | ---------- | ---- | -------- | -------- |
| **FLAT** | <1万条 | 慢 | 100%精确 | 低 |
| **HNSW** | 1万-1000万 | 快 | ~95% | 中 |
| **HNSW+量化** | >1000万 | 极快 | ~90% | 低 |
### 2. 参数调优
```python
# 小规模(<1万条)
cache_manager = VectorSemanticCacheManager(
index_type="FLAT", # 精确搜索
top_k_results=3 # 少返回几个
)
# 中规模(1万-100万)
cache_manager = VectorSemanticCacheManager(
index_type="HNSW",
vector_dimension=768,
top_k_results=5
)
# 大规模(>100万)
cache_manager = VectorSemanticCacheManager(
index_type="HNSW",
top_k_results=10,
# HNSW内部参数已在_create_vector_index中配置
)
```
---
## 💡 总结
这套方案的核心优势:
1. **语义理解**:不是关键词匹配,而是真正的语义相似度
2. **分级处理**:高置信度快速返回,中置信度人工确认
3. **自学习能力**:通过用户反馈不断优化
4. **高性能**:Redis向量搜索毫秒级响应
5. **可扩展**:支持从几百到几千万的平滑扩展
需要我补充MySQL持久化层或者缓存预热策略的代码吗?
# web_service_handler工作原理
我来为这个Web服务处理模块添加详细注释,并绘制工作流程图。
首先让我查看一下相关的依赖模块,以便更好地理解整体架构:
**已查看文件** *session_cache_manager.py*
**已查看文件** *intelligent_query_router.py*
现在我来为这个Web服务处理模块添加详细注释,并绘制工作流程图:
## 📝 详细注释版本
```python
"""
Web服务处理模块 - RAG系统的HTTP接口层
负责将内部RAG功能暴露为REST API,供前端调用
核心职责:
1. API路由管理:定义所有HTTP端点(聊天、推荐、统计等)
2. 请求处理:解析用户输入、验证参数、调用后端逻辑
3. 响应格式化:统一返回JSON格式,支持流式输出
4. 静态文件服务:托管前端构建产物(HTML/CSS/JS)
5. 错误处理:捕获异常并返回友好的错误信息
技术栈:
- Flask: Web框架,提供HTTP服务器功能
- Flask-CORS: 跨域资源共享,允许前端跨域访问
- SSE (Server-Sent Events): 实现流式响应,提升用户体验
"""
import logging
import json
import time
import concurrent.futures
from datetime import datetime
from typing import Dict, Any, Optional
from flask import jsonify
logger = logging.getLogger(__name__)
class WebServiceHandler:
"""
Web服务处理器 - RAG系统的网关层
类比理解:
就像餐厅的前台服务员,负责:
1. 接待顾客(接收HTTP请求)
2. 理解需求(解析请求参数)
3. 通知厨房(调用RAG系统处理)
4. 上菜服务(返回响应数据)
主要功能:
1. API路由处理:定义和管理所有HTTP端点
2. 静态文件服务:提供前端页面和资源文件
3. 错误处理:统一捕获和格式化异常
4. 响应格式化:确保返回数据符合前端期望的格式
"""
def __init__(self, rag_system):
"""
初始化Web服务处理器
参数:
rag_system: RAG系统实例,包含所有检索和生成能力
这是整个AI系统的"大脑",负责实际的问答逻辑
工作原理:
- 保存rag_system引用,后续API调用时会使用它来处理查询
- app初始化为None,需要调用setup_flask_app()后才创建Flask应用
"""
self.rag_system = rag_system
self.app = None
def setup_flask_app(self):
"""
设置Flask应用和路由 - 启动Web服务器的准备工作
工作流程:
1. 导入Flask相关库(如果未安装会抛出ImportError)
2. 创建Flask应用实例
3. 启用CORS(跨域资源共享),允许前端从不同域名访问
4. 调用_setup_routes()注册所有API路由
5. 返回配置好的Flask应用对象
返回:
Flask应用对象,可传递给app.run()启动服务器
如果导入失败返回None
异常处理:
捕获ImportError,记录日志并返回None,避免程序崩溃
"""
try:
from flask import Flask, request, jsonify, Response
from flask_cors import CORS
# 创建Flask应用实例,作为HTTP服务器
self.app = Flask(__name__)
# 启用CORS,允许浏览器跨域访问API
# 例如:前端运行在localhost:3000,后端在localhost:5000
CORS(self.app)
# 注册所有API路由(聊天、推荐、统计等端点)
self._setup_routes()
return self.app
except ImportError as e:
logger.error(f"Flask导入失败: {e}")
return None
def _setup_routes(self):
"""
设置所有API路由 - 定义系统的HTTP接口清单
注册的端点:
1. GET / → 主页(前端SPA入口)
2. GET / → 静态文件服务(CSS/JS/图片等)
3. GET /health → 健康检查(监控系统状态)
4. POST /api/chat → 普通聊天API(一次性返回完整答案)
5. POST /api/chat/stream → 流式聊天API(逐字返回答案)
6. POST /api/recipes/recommendations → 菜谱推荐API
7. GET /api/recipes/ → 菜谱详情API
8. GET /api/stats → 系统统计API
设计原则:
- RESTful风格:资源命名清晰,HTTP方法语义化
- 统一前缀:所有API以/api开头,便于区分和权限控制
- 版本友好:未来可扩展为/api/v1/chat等
"""
from flask import request, jsonify, Response, send_from_directory
@self.app.route('/')
def serve_index():
"""
提供主页 - SPA应用的入口页面
当用户访问 http://localhost:5000/ 时:
- 返回前端构建的index.html文件
- 前端JavaScript接管路由,实现单页应用效果
"""
return self._serve_static_file('index.html')
@self.app.route('/')
def serve_static(filename):
"""
提供静态文件服务 - 托管前端资源
匹配规则:
- 捕获所有未匹配的路径(如 /css/style.css、/js/app.js)
- 从frontend/dist目录提供对应文件
安全考虑:
- 通过_serve_static_file方法进行路径遍历攻击防护
- 禁止访问..或/开头的危险路径
"""
return self._serve_static_file(filename)
@self.app.route('/health', methods=['GET'])
def health_check():
"""
健康检查端点 - 用于监控和运维
用途:
1. 负载均衡器检测后端服务是否存活
2. Docker/K8s健康探针
3. 运维人员快速确认系统状态
返回示例:
{
"status": "healthy",
"timestamp": "2026-04-10 14:30:00",
"service": "RAG System"
}
"""
return jsonify({
"status": "healthy",
"timestamp": str(datetime.now()),
"service": "RAG System"
})
@self.app.route('/api/chat', methods=['POST'])
def chat():
"""
聊天API - 普通响应模式
请求方式:POST
Content-Type: application/json
请求体示例:
{
"message": "红烧肉怎么做?",
"session_id": "user_123"
}
响应示例:
{
"response": "首先准备五花肉...",
"query": "红烧肉怎么做?",
"session_id": "user_123",
"timestamp": "2026-04-10 14:30:00",
"from_cache": false
}
特点:
- 等待完整答案生成后一次性返回
- 适合简单查询或对实时性要求不高的场景
"""
return self._handle_chat_request()
@self.app.route('/api/chat/stream', methods=['POST'])
def chat_stream():
"""
聊天API - 流式响应模式(SSE)
与普通chat的区别:
- 使用Server-Sent Events技术逐字返回答案
- 用户体验更好,类似ChatGPT的打字机效果
- 减少用户等待焦虑
响应格式(SSE协议):
data: {"chunk": "首"}
data: {"chunk": "先"}
data: {"chunk": "准"}
data: [DONE]
前端处理方式:
- 使用EventSource或fetch + ReadableStream接收
- 每收到一个chunk就追加到显示区域
"""
return self._handle_stream_request()
@self.app.route('/api/recipes/recommendations', methods=['POST'])
def get_recommendations():
"""
获取菜谱推荐 - 首页推荐功能
请求体(可选):
{
"preferences": {
"cuisine": "川菜",
"difficulty": "easy"
}
}
返回示例:
{
"success": true,
"data": [
{
"id": "recipe_001",
"name": "麻婆豆腐",
"image": "/images/mapo_tofu.jpg",
"difficulty": "medium"
}
],
"message": "推荐获取成功"
}
应用场景:
- 首页随机推荐3个菜谱
- 引导用户探索新菜品
"""
return self._handle_recommendations_request()
@self.app.route('/api/recipes/', methods=['GET'])
def get_recipe_detail(recipe_id):
"""
获取菜谱详情 - 查看具体做法
URL参数:
- recipe_id: 菜谱唯一标识符(如 "hongshao_rou_001")
返回示例:
{
"success": true,
"data": {
"id": "hongshao_rou_001",
"name": "红烧肉",
"ingredients": ["五花肉", "冰糖", "生抽"],
"steps": ["切块", "焯水", "炒糖色"],
"image": "/images/hongshao_rou.jpg"
}
}
应用场景:
- 用户点击推荐卡片查看详情
- 搜索结果显示详情页
"""
return self._handle_recipe_detail_request(recipe_id)
@self.app.route('/api/stats', methods=['GET'])
def get_stats():
"""
获取系统统计信息 - 监控和调试
返回示例:
{
"cache_stats": {
"total_sessions": 5,
"total_cached_queries": 127,
"cache_threshold": 0.75
},
"route_stats": {
"traditional_count": 45,
"graph_rag_count": 32,
"combined_count": 10,
"traditional_ratio": 0.52,
"graph_rag_ratio": 0.37
},
"system_info": {
"timestamp": "2026-04-10 14:30:00",
"status": "running"
}
}
用途:
- 管理员监控系统运行状态
- 分析缓存命中率
- 优化检索策略比例
"""
return self._handle_stats_request()
def _serve_static_file(self, filename):
"""
提供静态文件服务 - 安全地托管前端资源
工作流程:
1. 安全检查:防止路径遍历攻击(如../../etc/passwd)
2. 定位文件:从frontend/dist目录查找
3. 特殊处理:index.html作为SPA fallback
4. 异常处理:文件不存在时返回index.html(支持前端路由)
参数:
filename: 请求的文件名(如 "css/style.css"、"js/app.js")
返回:
Flask响应对象(文件内容或index.html)
安全机制:
- 拒绝包含..的路径(防止向上跳转目录)
- 拒绝以/开头的绝对路径
- 只允许访问frontend/dist目录下的文件
示例:
>>> _serve_static_file("index.html")
# 返回主页HTML
>>> _serve_static_file("../secret.txt")
"Forbidden" 403 # 阻止恶意访问
"""
import os
from flask import send_from_directory
# 第一步:安全检查,防止路径遍历攻击
# 攻击者可能尝试访问 ../../etc/passwd 等敏感文件
if '..' in filename or filename.startswith('/'):
return "Forbidden", 403 # 返回403禁止访问
# 第二步:确定前端文件路径
# frontend/dist是Next.js/Vite等工具的默认构建输出目录
frontend_path = os.path.join(os.getcwd(), 'frontend', 'dist')
try:
# 第三步:根据文件名返回对应文件
if filename == 'index.html' or filename == '':
# 主页请求,返回index.html
return send_from_directory(frontend_path, 'index.html')
else:
# 其他静态资源(CSS/JS/图片等)
return send_from_directory(frontend_path, filename)
except FileNotFoundError:
# 第四步:文件不存在时的fallback策略
# 对于SPA应用,所有未知路径都应返回index.html
# 让前端JavaScript处理路由(如 /about、/recipe/123)
return send_from_directory(frontend_path, 'index.html')
def _handle_chat_request(self):
"""
处理普通聊天请求 - 非流式问答接口
工作流程详解:
1. 参数提取:从请求体获取message和session_id
2. 并行优化:同时执行缓存检查和上下文预处理
3. 缓存命中判断:如果找到相似历史问答,直接返回
4. RAG检索:未命中缓存时,调用智能路由器检索相关知识
5. 答案生成:基于检索结果生成自然语言回答
6. 缓存更新:将新问答对存入缓存,加速下次查询
7. 响应返回:JSON格式返回完整答案
性能优化亮点:
- 使用ThreadPoolExecutor并行执行缓存检查和查询预处理
- 缓存命中时立即取消不必要的预处理任务
- 减少串行等待时间,提升响应速度
返回:
Flask JSON响应,包含答案、元数据和缓存标记
错误处理:
- 参数验证:消息为空时返回400错误
- 异常捕获:任何错误都返回500错误和详细信息
"""
from flask import request, jsonify
try:
# 第一步:解析请求参数
data = request.get_json() # 从HTTP请求体解析JSON
query = data.get('message', '') # 用户输入的查询文本
session_id = data.get('session_id', '') # 会话ID,用于区分不同用户/窗口
# 参数验证:确保查询不为空
if not query:
return jsonify({"error": "消息不能为空"}), 400
# 第二步:🚀 并行执行缓存检查和预处理(性能优化关键)
cached_response = None # 缓存的答案(如果命中)
enhanced_query = query # 增强后的查询(包含上下文)
def check_cache():
"""
子任务1:检查语义缓存
作用:
- 在当前会话中查找与当前查询相似的历史问答
- 相似度阈值:0.75(可在SessionCacheManager中配置)
示例:
用户之前问过"红烧肉怎么做",现在问"红烧肉的做法是什么"
→ 系统识别为相似问题,直接返回之前的答案
"""
nonlocal cached_response
cached_response = self.rag_system.cache_manager.check_semantic_cache(query, session_id)
def prepare_query():
"""
子任务2:准备增强查询
作用:
- 获取会话历史上下文(最近3轮对话)
- 将历史对话与当前查询组合成增强提示词
示例:
历史:"红烧肉怎么做" → "首先准备五花肉..."
当前:"那用电饭煲怎么做"
→ 增强为:"用户问: 红烧肉怎么做 AI答: 首先... 当前问题: 那用电饭煲怎么做"
"""
nonlocal enhanced_query
enhanced_query = self.rag_system.cache_manager.get_context_for_query(session_id, query)
# 第三步:并行执行两个独立任务(最大化效率)
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
# 提交两个任务到线程池
future_cache = executor.submit(check_cache) # 任务1:检查缓存
future_query = executor.submit(prepare_query) # 任务2:准备查询
# 等待缓存检查完成(最多1秒)
concurrent.futures.wait([future_cache], timeout=1)
if cached_response:
# 第四步A:缓存命中,快速返回
# 取消正在进行的查询预处理(节省资源)
future_query.cancel()
# 将缓存答案添加到上下文历史(保持对话连贯性)
self.rag_system.cache_manager.add_to_context(session_id, query, cached_response)
# 返回缓存的答案,标记from_cache=True
return jsonify({
"response": cached_response,
"query": query,
"session_id": session_id,
"timestamp": str(datetime.now()),
"from_cache": True # 前端可根据此字段显示"来自缓存"提示
})
# 第四步B:缓存未命中,等待查询预处理完成(最多2秒)
concurrent.futures.wait([future_query], timeout=2)
# 第五步:执行完整的RAG流程(缓存未命中时)
# 智能路由器会根据查询复杂度选择最佳检索策略
documents, analysis = self.rag_system.query_router.route_query(
query=enhanced_query, # 使用增强后的查询(包含上下文)
top_k=self.rag_system.config.top_k # 检索top_k个相关文档
)
# 第六步:基于检索结果生成最终答案
# generation_module会使用LLM综合多个文档生成连贯回答
response = self.rag_system.generation_module.generate_adaptive_answer(enhanced_query, documents)
# 第七步:将新生成的问答对存入缓存(加速下次查询)
self.rag_system.cache_manager.add_to_semantic_cache(query, response, session_id)
self.rag_system.cache_manager.add_to_context(session_id, query, response)
# 第八步:返回完整响应
return jsonify({
"response": response,
"query": query,
"timestamp": str(datetime.now())
})
except Exception as e:
# 错误处理:记录日志并返回友好错误信息
logger.error(f"Chat API错误: {e}")
return jsonify({"error": str(e)}), 500
def _handle_stream_request(self):
"""
处理流式聊天请求 - 打字机效果的实时响应
与普通chat的核心区别:
1. 响应方式:使用SSE(Server-Sent Events)逐字返回
2. 用户体验:用户几乎立即看到第一个字,无需等待完整答案
3. 技术实现:Python生成器(yield)+ Flask Response
SSE协议格式:
data: {"chunk": "首"}\n\n
data: {"chunk": "先"}\n\n
data: [DONE]\n\n
工作流程:
1. 参数验证:检查message是否为空
2. 定义生成器函数generate():逐步产生响应片段
3. 并行优化:缓存检查和上下文预处理
4. 流式返回:根据是否命中缓存选择不同的流式策略
5. 错误处理:捕获异常并通过SSE返回错误信息
返回:
Flask Response对象,mimetype为text/event-stream
浏览器会通过EventSource API接收流式数据
前端使用示例:
const eventSource = new EventSource('/api/chat/stream');
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data === '[DONE]') {
eventSource.close();
} else {
displayArea.textContent += data.chunk;
}
};
"""
from flask import request, Response
try:
# 第一步:解析请求参数
data = request.get_json()
query = data.get('message', '')
session_id = data.get('session_id', '')
# 参数验证
if not query:
return jsonify({"error": "消息不能为空"}), 400
# 第二步:定义生成器函数(SSE的核心)
def generate():
"""
SSE生成器 - 逐步产生响应片段
yield语法说明:
- 每次yield暂停函数执行,向客户端发送一个数据块
- 下次调用时从上次暂停的位置继续执行
- 实现真正的流式传输,而非缓冲后一次性发送
"""
try:
# 第三步:🚀 并行执行缓存检查和预处理
cached_response = None
enhanced_query = query
def check_cache():
nonlocal cached_response
cached_response = self.rag_system.cache_manager.check_semantic_cache(query, session_id)
def prepare_query():
nonlocal enhanced_query
enhanced_query = self.rag_system.cache_manager.get_context_for_query(session_id, query)
# 并行执行两个任务
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
future_cache = executor.submit(check_cache)
future_query = executor.submit(prepare_query)
# 等待缓存检查完成(最多1秒)
concurrent.futures.wait([future_cache], timeout=1)
if cached_response:
# 第四步A:缓存命中,快速流式返回
future_query.cancel() # 取消不必要的预处理
# 将缓存答案添加到上下文
self.rag_system.cache_manager.add_to_context(session_id, query, cached_response)
# 将缓存答案分块发送(模拟打字机效果)
chunk_size = 3 # 每次发送3个字符
for i in range(0, len(cached_response), chunk_size):
chunk = cached_response[i:i+chunk_size]
data_obj = {"chunk": chunk, "from_cache": True}
# SSE协议要求格式:data: JSON\n\n
yield f"data: {json.dumps(data_obj)}\n\n"
time.sleep(0.02) # 20ms延迟,模拟打字速度
# 发送结束标记
yield f"data: [DONE]\n\n"
return
# 第四步B:缓存未命中,等待查询预处理完成
concurrent.futures.wait([future_query], timeout=2)
# 第五步:执行完整的RAG流程(流式生成)
documents, analysis = self.rag_system.query_router.route_query(
query=enhanced_query,
top_k=self.rag_system.config.top_k
)
# 第六步:流式生成答案(逐字返回)
full_response = "" # 累积完整答案,用于缓存
for chunk in self.rag_system.generation_module.generate_adaptive_answer_stream(enhanced_query, documents):
full_response += chunk # 累积完整答案
data_obj = {"chunk": chunk}
yield f"data: {json.dumps(data_obj)}\n\n" # 立即发送当前片段
# 第七步:将完整答案存入缓存
self.rag_system.cache_manager.add_to_semantic_cache(query, full_response, session_id)
self.rag_system.cache_manager.add_to_context(session_id, query, full_response)
# 第八步:发送结束标记
yield f"data: [DONE]\n\n"
except Exception as e:
# 错误处理:通过SSE返回错误信息
logger.error(f"Stream API错误: {e}")
error_msg = f"抱歉,处理您的问题时出现错误:{str(e)}"
data_obj = {"chunk": error_msg}
yield f"data: {json.dumps(data_obj)}\n\n"
yield f"data: [DONE]\n\n"
# 第三步:创建SSE响应对象
response = Response(generate(), mimetype='text/event-stream')
# 设置必要的HTTP头
response.headers['Cache-Control'] = 'no-cache' # 禁用缓存,确保实时性
response.headers['Connection'] = 'keep-alive' # 保持连接打开
response.headers['Access-Control-Allow-Origin'] = '*' # 允许跨域
return response
except Exception as e:
# 外层错误处理(如参数解析失败)
logger.error(f"Stream API错误: {e}")
return jsonify({"error": str(e)}), 500
def _handle_recommendations_request(self):
"""
处理菜谱推荐请求 - 首页推荐功能
工作流程:
1. 解析请求参数(可选的用户偏好)
2. 调用菜谱管理器获取随机推荐
3. 返回带图片的推荐列表
请求体(可选):
{
"preferences": {
"cuisine": "川菜",
"difficulty": "easy",
"cooking_time": "<30min"
}
}
返回示例:
{
"success": true,
"data": [
{
"id": "recipe_001",
"name": "麻婆豆腐",
"image": "/images/mapo_tofu.jpg",
"difficulty": "medium",
"cooking_time": "20分钟"
},
...
],
"message": "推荐获取成功"
}
应用场景:
- 首页加载时展示3个推荐菜谱
- 引导用户探索新菜品
- 提高用户粘性
"""
from flask import request, jsonify
try:
# 第一步:解析请求参数(允许为空)
data = request.get_json() or {}
preferences = data.get('preferences', {}) # 用户偏好(暂未使用)
# 第二步:获取随机推荐菜谱(带图片)
# limit=3表示返回3个推荐
recipes = self.rag_system.recipe_manager.get_random_recipes_with_images(limit=3)
# 第三步:返回标准化响应
return jsonify({
"success": True,
"data": recipes,
"message": "推荐获取成功"
})
except Exception as e:
# 错误处理
logger.error(f"推荐API错误: {e}")
return jsonify({"error": str(e)}), 500
def _handle_recipe_detail_request(self, recipe_id):
"""
处理菜谱详情请求 - 查看具体做法
工作流程:
1. 根据recipe_id查询菜谱详情
2. 如果找到,返回完整信息
3. 如果未找到,返回404错误
URL参数:
- recipe_id: 菜谱唯一标识符(如 "hongshao_rou_001")
返回示例(成功):
{
"success": true,
"data": {
"id": "hongshao_rou_001",
"name": "红烧肉",
"category": "meat_dish",
"ingredients": [
{"name": "五花肉", "amount": "500g"},
{"name": "冰糖", "amount": "30g"}
],
"steps": [
"五花肉切成3cm见方的块",
"冷水下锅焯水去腥",
"炒糖色至枣红色"
],
"image": "/images/hongshao_rou.jpg",
"cooking_time": "90分钟",
"difficulty": "medium"
}
}
返回示例(未找到):
{
"error": "菜谱不存在"
}
"""
from flask import jsonify
try:
# 第一步:根据ID查询菜谱
recipe = self.rag_system.recipe_manager.get_recipe_by_id(recipe_id)
# 第二步:判断是否找到
if recipe:
# 找到菜谱,返回详细信息
return jsonify({
"success": True,
"data": recipe
})
else:
# 未找到菜谱,返回404
return jsonify({"error": "菜谱不存在"}), 404
except Exception as e:
# 错误处理
logger.error(f"菜谱详情API错误: {e}")
return jsonify({"error": str(e)}), 500
def _handle_stats_request(self):
"""
处理统计信息请求 - 系统监控和调试
工作流程:
1. 从缓存管理器获取会话统计
2. 从查询路由器获取路由统计
3. 添加系统基本信息
4. 返回综合统计数据
返回示例:
{
"cache_stats": {
"total_sessions": 5, # 当前活跃会话数
"total_cached_queries": 127, # 缓存的查询总数
"total_contexts": 43, # 上下文消息总数
"cache_threshold": 0.75, # 相似度阈值
"max_session_cache_size": 50, # 每个会话最大缓存数
"max_context_length": 10 # 每个会话最大上下文长度
},
"route_stats": {
"traditional_count": 45, # 传统检索次数
"graph_rag_count": 32, # 图RAG检索次数
"combined_count": 10, # 组合策略次数
"total_queries": 87, # 总查询数
"traditional_ratio": 0.52, # 传统检索比例
"graph_rag_ratio": 0.37, # 图RAG比例
"combined_ratio": 0.11 # 组合策略比例
},
"system_info": {
"timestamp": "2026-04-10 14:30:00",
"status": "running"
}
}
应用场景:
- 管理员Dashboard展示系统状态
- 分析缓存命中率(优化性能)
- 监控检索策略分布(调整路由阈值)
- 故障排查和性能调优
"""
from flask import jsonify
try:
# 第一步:收集各模块统计信息
stats = {
# 缓存统计:会话数、缓存命中率等
"cache_stats": self.rag_system.cache_manager.get_session_stats(),
# 路由统计:各种检索策略的使用比例
"route_stats": self.rag_system.query_router.get_route_statistics(),
# 系统基本信息
"system_info": {
"timestamp": str(datetime.now()),
"status": "running"
}
}
# 第二步:返回综合统计
return jsonify(stats)
except Exception as e:
# 错误处理
logger.error(f"统计API错误: {e}")
return jsonify({"error": str(e)}), 500
```
## 🔄 工作流程图
```mermaid
graph TB
Start[用户发起HTTP请求] --> RouteCheck{路由匹配}
RouteCheck -->|GET /| ServeIndex[提供index.html]
RouteCheck -->|GET /static| ServeStatic[提供静态文件]
RouteCheck -->|GET /health| HealthCheck[健康检查]
RouteCheck -->|POST /api/chat| ChatHandler[普通聊天API]
RouteCheck -->|POST /api/chat/stream| StreamHandler[流式聊天API]
RouteCheck -->|POST /api/recipes/rec| RecommendHandler[菜谱推荐API]
RouteCheck -->|GET /api/recipes/id| DetailHandler[菜谱详情API]
RouteCheck -->|GET /api/stats| StatsHandler[系统统计API]
ServeIndex --> End1[返回HTML页面]
ServeStatic --> End2[返回CSS/JS/图片]
HealthCheck --> End3[返回JSON状态]
ChatHandler --> ParseParams[解析请求参数]
ParseParams --> ValidateParams{参数验证}
ValidateParams -->|消息为空| Error400[返回400错误]
ValidateParams -->|验证通过| ParallelStart[🚀并行执行]
ParallelStart --> Task1[任务1: 检查语义缓存]
ParallelStart --> Task2[任务2: 准备增强查询]
Task1 --> CacheCheck{缓存命中?}
Task2 --> WaitQuery[等待预处理完成]
CacheCheck -->|是| CancelTask[取消预处理任务]
CancelTask --> AddContext1[添加到上下文历史]
AddContext1 --> ReturnCache[返回缓存答案
from_cache=true]
CacheCheck -->|否| WaitQuery
WaitQuery --> RouteQuery[智能路由查询]
RouteQuery --> AnalyzeQuery[LLM分析查询特征]
AnalyzeQuery --> SelectStrategy{选择检索策略}
SelectStrategy -->|简单查询| TraditionalSearch[传统混合检索]
SelectStrategy -->|复杂推理| GraphRAGSearch[图RAG检索]
SelectStrategy -->|不确定| CombinedSearch[组合检索策略]
TraditionalSearch --> GenerateAnswer[LLM生成答案]
GraphRAGSearch --> GenerateAnswer
CombinedSearch --> GenerateAnswer
GenerateAnswer --> CacheUpdate[更新语义缓存]
CacheUpdate --> AddContext2[添加到上下文历史]
AddContext2 --> ReturnResponse[返回完整答案]
StreamHandler --> ParseStreamParams[解析请求参数]
ParseStreamParams --> ValidateStream{参数验证}
ValidateStream -->|消息为空| StreamError400[返回400错误]
ValidateStream -->|验证通过| StreamParallel[🚀并行执行]
StreamParallel --> StreamTask1[任务1: 检查缓存]
StreamParallel --> StreamTask2[任务2: 准备查询]
StreamTask1 --> StreamCacheCheck{缓存命中?}
StreamTask2 --> StreamWaitQuery[等待预处理]
StreamCacheCheck -->|是| StreamCancel[取消预处理]
StreamCancel --> StreamChunkCache[分块发送缓存答案
每次3字符,间隔20ms]
StreamChunkCache --> StreamDone1[发送DONE标记]
StreamCacheCheck -->|否| StreamWaitQuery
StreamWaitQuery --> StreamRouteQuery[智能路由查询]
StreamRouteQuery --> StreamSelectStrategy{选择策略}
StreamSelectStrategy -->|简单| StreamTraditional[传统检索]
StreamSelectStrategy -->|复杂| StreamGraphRAG[图RAG检索]
StreamSelectStrategy -->|不确定| StreamCombined[组合检索]
StreamTraditional --> StreamGenerate[流式生成答案]
StreamGraphRAG --> StreamGenerate
StreamCombined --> StreamGenerate
StreamGenerate --> StreamYield[逐字yield返回]
StreamYield --> StreamCacheUpdate[更新缓存]
StreamCacheUpdate --> StreamAddContext[添加上下文]
StreamAddContext --> StreamDone2[发送DONE标记]
RecommendHandler --> GetRecipes[获取随机推荐]
GetRecipes --> ReturnRecipes[返回3个推荐菜谱]
DetailHandler --> QueryRecipe[根据ID查询菜谱]
QueryRecipe --> RecipeFound{找到菜谱?}
RecipeFound -->|是| ReturnDetail[返回菜谱详情]
RecipeFound -->|否| Return404[返回404错误]
StatsHandler --> CollectStats[收集各模块统计]
CollectStats --> ReturnStats[返回综合统计数据]
ReturnResponse --> End4[返回JSON响应]
StreamDone1 --> End5[SSE流结束]
StreamDone2 --> End5
ReturnRecipes --> End6[返回JSON响应]
ReturnDetail --> End7[返回JSON响应]
Return404 --> End8[返回404响应]
ReturnStats --> End9[返回JSON响应]
Error400 --> End10[返回错误响应]
StreamError400 --> End11[返回错误响应]
style ParallelStart fill:#ff9999
style StreamParallel fill:#ff9999
style CacheCheck fill:#99ff99
style StreamCacheCheck fill:#99ff99
style GenerateAnswer fill:#99ccff
style StreamGenerate fill:#99ccff
style RouteQuery fill:#ffcc99
style StreamRouteQuery fill:#ffcc99
```
## 📊 核心流程说明表
| 流程阶段 | 普通聊天API | 流式聊天API | 说明 |
| --------------- | ----------------------------- | ------------------ | --------------------- |
| **1. 参数解析** | 提取message和session_id | 同左 | 验证必填参数 |
| **2. 并行优化** | ThreadPoolExecutor执行2个任务 | 同左 | 缓存检查 + 查询预处理 |
| **3. 缓存判断** | 命中则直接返回 | 命中则分块流式返回 | 相似度阈值0.75 |
| **4. 智能路由** | LLM分析查询复杂度 | 同左 | 决定使用哪种检索策略 |
| **5. 知识检索** | 传统/图RAG/组合检索 | 同左 | 根据策略执行检索 |
| **6. 答案生成** | 一次性生成完整答案 | 逐字流式生成 | 使用LLM综合多文档 |
| **7. 缓存更新** | 存入语义缓存和上下文 | 同左 | 加速下次查询 |
| **8. 响应返回** | JSON格式一次性返回 | SSE协议逐块返回 | 流式体验更好 |
## 💡 关键技术点总结
1. **并行优化**: 使用`ThreadPoolExecutor`同时执行缓存检查和查询预处理,减少串行等待
2. **智能缓存**: 基于向量相似度的语义缓存,能识别"红烧肉怎么做"和"红烧肉的做法"是相似问题
3. **上下文管理**: 维护最近3轮对话历史,支持多轮对话的连贯性
4. **智能路由**: LLM分析查询复杂度,自动选择传统检索或图RAG检索
5. **流式传输**: SSE协议实现打字机效果,提升用户体验
6. **安全防护**: 防止路径遍历攻击,只允许访问frontend/dist目录
7. **错误降级**: 任何环节失败都有fallback机制,保证系统可用性