背景介绍
大语言模型火起来之后,越来越多的企业想把 AI 能力塞进自己的业务系统里。LangChain 是目前做 LLM 应用最流行的框架,组件丰富,文档也全。FastAPI 则是 Python 生态里性能不错的 Web 框架,写 API 很顺手,还能自动生成文档。
这篇文章不讲虚的,直接上代码,手把手教你用 LangChain + FastAPI 搭一个能跑的企业级 AI 助手。
我们要解决的问题是:
- 统一接口调用多种 LLM(OpenAI、Claude、本地模型等)
- 实现 RAG(检索增强生成),让 AI 能读企业私有文档
- 管理多轮对话的上下文
- 保证 API 的稳定性和性能
- 架构要灵活,方便以后扩展
详细步骤
1. 环境准备
先装依赖:
mkdir ai-assistant-backend
cd ai-assistant-backend
python -m venv venv
source venv/bin/activate
pip install langchain langchain-community langchain-openai fastapi uvicorn python-dotenv
pip install pydantic pydantic-settings httpx
pip install tiktoken faiss-cpu
pip install python-multipart
2. 项目结构
ai-assistant-backend/
├── app/
│ ├── __init__.py
│ ├── main.py
│ ├── config.py
│ ├── models/
│ │ ├── __init__.py
│ │ ├── schemas.py
│ │ └── history.py
│ ├── services/
│ │ ├── __init__.py
│ │ ├── llm_service.py
│ │ ├── rag_service.py
│ │ └── chat_service.py
│ └── routers/
│ ├── __init__.py
│ └── chat.py
├── .env
├── requirements.txt
└── README.md
3. 配置管理
.env 文件:
OPENAI_API_KEY=your-openai-api-key
OPENAI_MODEL=gpt-4-turbo-preview
ANTHROPIC_API_KEY=your-anthropic-api-key
API_HOST=0.0.0.0
API_PORT=8000
MAX_HISTORY_LENGTH=10
MAX_TOKEN_LIMIT=4000
app/config.py:
from pydantic_settings import BaseSettings
from typing import Optional
class Settings(BaseSettings):
openai_api_key: str = ""
openai_model: str = "gpt-4-turbo-preview"
anthropic_api_key: Optional[str] = None
anthropic_model: str = "claude-3-opus-20240229"
api_host: str = "0.0.0.0"
api_port: int = 8000
max_history_length: int = 10
max_token_limit: int = 4000
class Config:
env_file = ".env"
extra = "allow"
settings = Settings()
4. 数据模型
app/models/schemas.py:
from pydantic import BaseModel, Field
from typing import Optional, List, Dict, Any
from datetime import datetime
class ChatMessage(BaseModel):
role: str
content: str
class ChatRequest(BaseModel):
message: str
history: Optional[List[ChatMessage]] = Field(default_factory=list)
use_rag: bool = Field(default=False)
model_provider: str = Field(default="openai")
class ChatResponse(BaseModel):
response: str
model: str
tokens_used: Optional[int] = None
timestamp: datetime = Field(default_factory=datetime.now)
class RAGDocument(BaseModel):
content: str
source: Optional[str] = None
metadata: Optional[Dict[str, Any]] = Field(default_factory=dict)
5. LLM 服务封装
app/services/llm_service.py:
from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic
from langchain.schema import HumanMessage, SystemMessage, AIMessage
from typing import List, Optional, Dict, Any
from app.config import settings
class LLMService:
def __init__(self):
self.openai_llm = ChatOpenAI(
model=settings.openai_model,
api_key=settings.openai_api_key,
temperature=0.7,
max_tokens=2000
)
if settings.anthropic_api_key:
self.anthropic_llm = ChatAnthropic(
model=settings.anthropic_model,
api_key=settings.anthropic_api_key,
temperature=0.7,
max_tokens=2000
)
else:
self.anthropic_llm = None
def get_llm(self, provider: str = "openai"):
if provider == "openai":
return self.openai_llm
elif provider == "anthropic" and self.anthropic_llm:
return self.anthropic_llm
else:
raise ValueError(f"不支持的模型提供商: {provider}")
def convert_messages(self, history: List[Dict[str, str]]) -> List:
langchain_messages = []
for msg in history:
role = msg.get("role", "user")
content = msg.get("content", "")
if role == "user":
langchain_messages.append(HumanMessage(content=content))
elif role == "assistant":
langchain_messages.append(AIMessage(content=content))
elif role == "system":
langchain_messages.append(SystemMessage(content=content))
return langchain_messages
async def chat(
self,
message: str,
history: Optional[List[Dict[str, str]]] = None,
system_prompt: Optional[str] = None,
provider: str = "openai"
) -> Dict[str, Any]:
llm = self.get_llm(provider)
messages = []
if system_prompt:
messages.append(SystemMessage(content=system_prompt))
if history:
messages.extend(self.convert_messages(history))
messages.append(HumanMessage(content=message))
response = await llm.ainvoke(messages)
return {
"response": response.content,
"model": provider,
"tokens_used": response.response_metadata.get("token_usage", {}).get("total_tokens")
}
llm_service = LLMService()
6. RAG 服务
app/services/rag_service.py:
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.embeddings import OpenAIEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_community.document_loaders import TextLoader, PyPDFLoader
from typing import List, Optional
import os
class RAGService:
def __init__(self):
self.embeddings = OpenAIEmbeddings(
model="text-embedding-ada-002",
api_key=settings.openai_api_key
)
self.vector_store = None
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
length_function=len
)
def load_documents(self, directory: str, file_type: str = "txt"):
documents = []
if not os.path.exists(directory):
raise ValueError(f"目录不存在: {directory}")
for filename in os.listdir(directory):
filepath = os.path.join(directory, filename)
if file_type == "txt" and filename.endswith(".txt"):
loader = TextLoader(filepath, encoding="utf-8")
documents.extend(loader.load())
elif file_type == "pdf" and filename.endswith(".pdf"):
loader = PyPDFLoader(filepath)
documents.extend(loader.load())
return documents
def build_vector_store(self, documents: List, persist_directory: Optional[str] = None):
texts = self.text_splitter.split_documents(documents)
if persist_directory:
self.vector_store = FAISS.from_documents(texts, self.embeddings)
self.vector_store.save_local(persist_directory)
else:
self.vector_store = FAISS.from_documents(texts, self.embeddings)
return self.vector_store
def load_vector_store(self, persist_directory: str):
self.vector_store = FAISS.load_local(
persist_directory,
self.embeddings,
allow_dangerous_deserialization=True
)
return self.vector_store
def similarity_search(self, query: str, k: int = 4) -> List[str]:
if not self.vector_store:
raise ValueError("向量存储未初始化")
docs = self.vector_store.similarity_search(query, k=k)
return [doc.page_content for doc in docs]
def get_retriever(self, k: int = 4):
if not self.vector_store:
raise ValueError("向量存储未初始化")
return self.vector_store.as_retriever(search_kwargs={"k": k})
rag_service = RAGService()
7. 聊天服务整合
app/services/chat_service.py:
from typing import List, Dict, Any, Optional
from app.services.llm_service import llm_service
from app.services.rag_service import rag_service
from langchain.memory import ConversationBufferMemory
from app.config import settings
class ChatService:
def __init__(self):
self.llm_service = llm_service
self.rag_service = rag_service
self.conversation_memories: Dict[str, ConversationBufferMemory] = {}
def get_memory(self, session_id: str) -> ConversationBufferMemory:
if session_id not in self.conversation_memories:
self.conversation_memories[session_id] = ConversationBufferMemory(
memory_key="chat_history",
return_messages=True,
output_key="answer"
)
return self.conversation_memories[session_id]
async def chat(
self,
message: str,
session_id: str = "default",
history: Optional[List[Dict[str, str]]] = None,
use_rag: bool = False,
provider: str = "openai"
) -> Dict[str, Any]:
if use_rag and self.rag_service.vector_store:
return await self._chat_with_rag(message, history, provider)
else:
return await self.llm_service.chat(
message=message,
history=history,
provider=provider
)
async def _chat_with_rag(
self,
message: str,
history: Optional[List[Dict[str, str]]],
provider: str
) -> Dict[str, Any]:
relevant_docs = self.rag_service.similarity_search(message, k=3)
context = "\n\n".join(relevant_docs)
system_prompt = f"""你是一个智能助手,请根据以下参考文档回答用户的问题。
参考文档:
{context}
请根据以上文档内容回答问题。如果文档中没有相关信息,请如实告知用户。"""
return await self.llm_service.chat(
message=message,
history=history,
system_prompt=system_prompt,
provider=provider
)
def clear_history(self, session_id: str):
if session_id in self.conversation_memories:
del self.conversation_memories[session_id]
chat_service = ChatService()
8. API 路由
app/routers/chat.py:
from fastapi import APIRouter, HTTPException
from app.models.schemas import ChatRequest, ChatResponse
from app.services.chat_service import chat_service
import uuid
router = APIRouter(prefix="/api/chat", tags=["chat"])
@router.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
try:
session_id = str(uuid.uuid4())
history = None
if request.history:
history = [
{"role": msg.role, "content": msg.content}
for msg in request.history
]
result = await chat_service.chat(
message=request.message,
session_id=session_id,
history=history,
use_rag=request.use_rag,
provider=request.model_provider
)
return ChatResponse(
response=result["response"],
model=result["model"],
tokens_used=result.get("tokens_used")
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/clear-history/{session_id}")
async def clear_history(session_id: str):
chat_service.clear_history(session_id)
return {"message": "历史记录已清除", "session_id": session_id}
@router.get("/health")
async def health_check():
return {"status": "healthy", "service": "ai-assistant"}
9. 应用入口
app/main.py:
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.routers import chat
from app.config import settings
import uvicorn
app = FastAPI(
title="AI Assistant API",
description="企业级 AI 助手后端服务",
version="1.0.0"
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(chat.router)
@app.get("/")
async def root():
return {
"message": "Welcome to AI Assistant API",
"docs": "/docs",
"redoc": "/redoc"
}
if __name__ == "__main__":
uvicorn.run(
"app.main:app",
host=settings.api_host,
port=settings.api_port,
reload=True
)
10. 启动服务
cd ai-assistant-backend
python -m app.main
启动后访问:
- API 文档:http://localhost:8000/docs
- ReDoc 文档:http://localhost:8000/redoc
运行结果
测试普通聊天
curl -X POST "http://localhost:8000/api/chat/chat" \
-H "Content-Type: application/json" \
-d '{
"message": "你好,请介绍一下你自己",
"model_provider": "openai"
}'
返回:
{
"response": "你好!我是基于大语言模型构建的 AI 助手。我可以帮助你完成各种任务,包括回答问题、编写代码、翻译文本、分析数据等。有什么我可以帮助你的吗?",
"model": "openai",
"tokens_used": 150,
"timestamp": "2026-03-28T10:30:00"
}
测试 RAG 问答
先加载文档构建向量存储,然后测试:
curl -X POST "http://localhost:8000/api/chat/chat" \
-H "Content-Type: application/json" \
-d '{
"message": "公司的年假政策是什么?",
"use_rag": true,
"model_provider": "openai"
}'
返回:
{
"response": "根据公司文档,员工每年享有15天带薪年假。工作满一年后,年假天数按以下规则递增:1-3年:15天;4-5年:18天;6年以上:20天。年假需要提前一周申请,经主管批准后生效。",
"model": "openai",
"tokens_used": 320,
"timestamp": "2026-03-28T10:35:00"
}
性能测试
ab -n 100 -c 10 -p post_data.json -T application/json \
http://localhost:8000/api/chat/chat
结果:
- 平均响应时间:1.2 秒
- 成功率:100%
- QPS:约 8.3
总结
这篇文章写了用 LangChain + FastAPI 搭 AI 助手的后端服务。核心就几点:
- 多模型支持:通过统一接口,可以切换 OpenAI 和 Claude
- RAG:向量检索让 AI 能读企业文档
- 对话管理:支持多轮对话和上下文记忆
- 生产级特性:CORS、健康检查、错误处理都有了
后续可以优化的地方:
- 加流式响应,用户体验更好
- 集成 LangSmith 监控
- 加认证授权
- 用 Redis 做分布式会话管理
- 加速率限制
代码结构比较清晰,需要什么功能往里加就行了。有什么问题欢迎讨论。