1704 字
9 分钟
完整注释代码

完整注释代码#

本篇保留项目的完整注释代码,方便和前面几篇模块分析对照阅读。阅读顺序建议是:先看 02_系统架构与回调流程04_LangChain链路与会话历史05_图像处理与多模态消息组装,再回到这里整体串读。


# =============================================================================
# 多模态聊天机器人(LangChain + ASR 版)
# 架构:Gradio 前端 → 消息预处理 → LangChain → 阿里云多模态大模型
# =============================================================================
import base64 # 二进制 → base64 字符串,用于 API 传输
import io # 内存字节流,图片转换时不写临时文件
import time # sensevoice 轮询时休眠
import httpx # HTTP 客户端,调用阿里云原生 API
import gradio as gr
from PIL import Image
from pathlib import Path
from langchain_community.chat_message_histories import SQLChatMessageHistory
from langchain_core.messages import HumanMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables import RunnableWithMessageHistory
from my_llm import multiModal_llm, asr_client
# ── 常量 ──────────────────────────────────────────────────────────────────────
# 原生多模态接口(支持 base64 音频);compatible-mode 只支持公网 URL
DASHSCOPE_MULTIMODAL_URL = (
"https://dashscope.aliyuncs.com/api/v1/services/aigc/"
"multimodal-generation/generation"
)
DASHSCOPE_ASR_SUBMIT_URL = (
"https://dashscope.aliyuncs.com/api/v1/services/audio/asr/transcription"
)
DASHSCOPE_TASK_QUERY_URL = "https://dashscope.aliyuncs.com/api/v1/tasks/{task_id}"
POLL_MAX_RETRIES = 20 # 最多轮询 20 次
POLL_INTERVAL_SECONDS = 2 # 每次间隔 2 秒,共最长等待 40 秒
# =============================================================================
# 语音识别:本地音频 → 文字
# 主方案:qwen3-asr-flash(同步)
# 降级方案:sensevoice-v1(异步轮询)
# =============================================================================
def transcribe_audio_to_text(audio_path: str) -> str:
print(f"[正在识别语音] {audio_path}")
# 步骤 1:音频文件 → Base64 Data URI
# Data URI 格式:data:<MIME>;base64,<数据>,可内嵌在 JSON 里传输
ext = Path(audio_path).suffix.lower().lstrip('.') or 'wav'
fmt_map = {'m4a': 'mp4', 'ogg': 'ogg', 'flac': 'flac', 'mp3': 'mp3'}
audio_fmt = fmt_map.get(ext, 'wav')
mime_type = f"audio/{audio_fmt}"
with open(audio_path, "rb") as f:
audio_b64 = base64.b64encode(f.read()).decode('utf-8')
data_uri = f"data:{mime_type};base64,{audio_b64}"
api_key = asr_client.api_key # 从已初始化的客户端取 key,不用重复配置
# 步骤 2:主方案 —— qwen3-asr-flash(DashScope 原生多模态接口)
# 注意:audio 字段必须是字符串(Data URI),不能是字典
try:
resp = httpx.post(
DASHSCOPE_MULTIMODAL_URL,
headers={"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"},
json={
"model": "qwen3-asr-flash",
"input": {
"messages": [
{"role": "system", "content": [{"text": ""}]}, # 必须有,留空
{"role": "user", "content": [{"audio": data_uri}]},
]
},
"parameters": {"asr_options": {"enable_itn": False}},
},
timeout=30,
)
resp.raise_for_status()
# 返回结构:output.choices[0].message.content[0].text
text = resp.json()["output"]["choices"][0]["message"]["content"][0]["text"].strip()
print(f"[识别结果]: {text}")
return text
except Exception as exc:
print(f"\n[qwen3-asr-flash 失败] {exc}")
print("[切换到 sensevoice-v1 异步接口...]")
# 步骤 3:降级方案 —— sensevoice-v1(异步「提交-轮询」)
try:
# 第一步:提交任务,X-DashScope-Async: enable 是必须的请求头
submit_resp = httpx.post(
DASHSCOPE_ASR_SUBMIT_URL,
headers={"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
"X-DashScope-Async": "enable"},
json={"model": "sensevoice-v1",
"input": {"file_url": data_uri},
"parameters": {}},
timeout=15,
)
submit_resp.raise_for_status()
task_id = submit_resp.json()["output"]["task_id"]
print(f"[sensevoice 任务已提交] task_id={task_id}")
# 第二步:轮询任务状态,直到完成或超时
for attempt in range(POLL_MAX_RETRIES):
time.sleep(POLL_INTERVAL_SECONDS)
poll_resp = httpx.get(
DASHSCOPE_TASK_QUERY_URL.format(task_id=task_id),
headers={"Authorization": f"Bearer {api_key}"},
timeout=10,
)
poll_resp.raise_for_status()
poll_data = poll_resp.json()
status = poll_data["output"]["task_status"]
print(f"[轮询 {attempt + 1}/{POLL_MAX_RETRIES}] 状态: {status}")
if status == "SUCCEEDED":
text = poll_data["output"]["results"][0]["transcription"]
print(f"[降级识别结果]: {text}")
return text
elif status in ("FAILED", "CANCELED"):
print(f"[sensevoice 任务失败] {poll_data}")
break
except Exception as fallback_exc:
print(f"[降级识别也失败] {fallback_exc}")
return "[抱歉,系统未能听清您的语音内容]"
# =============================================================================
# LangChain 初始化
# =============================================================================
# Prompt 模板:system 设定人格,MessagesPlaceholder 是历史消息的占位符
# 运行时框架会把 SQLite 历史插入到 placeholder 位置
prompt = ChatPromptTemplate.from_messages([
('system', "你是一个强大的多模态AI助手,可以精准地处理文本和图像输入。"),
MessagesPlaceholder(variable_name="messages"),
])
# 管道符 | 把 prompt 和 llm 串联:输入 → prompt渲染 → llm调用 → 输出
chain = prompt | multiModal_llm
def get_session_history(session_id: str) -> SQLChatMessageHistory:
"""框架在每次 invoke 前后自动调用此函数读写历史,无需手动调用"""
return SQLChatMessageHistory(
session_id=session_id,
connection='sqlite:///chat_history.db',
)
# 包装 chain,赋予「自动读写历史」能力
chain_history = RunnableWithMessageHistory(chain, get_session_history)
# 注意:硬编码 session_id 意味着所有用户共享历史
# 多用户场景应改为 str(uuid.uuid4()) 或根据用户 ID 生成
config = {"configurable": {"session_id": "usr000"}}
# =============================================================================
# 图像处理
# =============================================================================
def transcribe_image(image_path: str) -> dict | None:
"""图片文件 → OpenAI 兼容的 image_url 格式字典"""
try:
with Image.open(image_path) as img:
if img.mode in ('RGBA', 'P'):
img = img.convert('RGB') # RGBA/P 模式不支持 JPEG,统一转 RGB
img_format = img.format or 'JPEG'
buffered = io.BytesIO() # 内存流,避免写临时文件
img.save(buffered, format=img_format)
image_data = base64.b64encode(buffered.getvalue()).decode('utf-8')
return {
"type": "image_url",
"image_url": {
"url": f"data:image/{img_format.lower()};base64,{image_data}",
"detail": "high", # high: 高精度,按尺寸计费;low: 固定85token
},
}
except Exception as e:
print(f"[图像处理失败] {e}")
return None
def _append_file(content: list, file_path: str) -> None:
"""根据后缀追加文件到 content 列表(音频已在前端拦截,这里只剩图片)"""
ext = Path(file_path).suffix.lower()
if ext in ('.jpg', '.jpeg', '.png', '.webp', '.bmp', '.gif'):
msg = transcribe_image(file_path)
if msg:
content.append(msg)
else:
print(f"[警告] 不支持的文件类型: {file_path}")
# =============================================================================
# 消息组装
# =============================================================================
def get_last_user_messages(history: list) -> list:
"""
提取「最后一个 assistant 回复之后」的所有 user 消息(即本轮新增消息)。
避免把完整历史重复传给 LangChain(历史已在 SQLite 里,框架自动加载)。
"""
if not history or history[-1]["role"] == "assistant":
return []
last_assistant_idx = next(
(i for i in range(len(history) - 1, -1, -1)
if history[i]["role"] == "assistant"),
-1,
)
return history[last_assistant_idx + 1:]
# =============================================================================
# Gradio 回调
# =============================================================================
def add_message(history: list, messages: dict) -> tuple:
"""
第一步回调:处理用户输入,立即更新界面。
- 音频:转文字后存入 history(不让二进制音频流入 LangChain)
- 图片:存文件路径,供 submit_messages 读取编码
- 文字:直接存入 history
"""
for file_path in messages.get('files', []):
print(f"[UI] 收到文件: {file_path}")
ext = Path(file_path).suffix.lower()
if ext in ('.wav', '.mp3', '.m4a', '.ogg', '.flac'):
text = transcribe_audio_to_text(file_path)
if text:
history.append({'role': 'user', 'content': f"🎤 [语音输入]: {text}"})
else:
# type='messages' 模式下图片必须用字典格式,不能用 tuple
history.append({'role': 'user', 'content': {'path': file_path}})
text_input = messages.get("text", "").strip()
if text_input:
history.append({"role": "user", "content": text_input})
# 返回更新后的 history 和清空+禁用的输入框
return history, gr.MultimodalTextbox(value=None, interactive=False)
def submit_messages(history: list) -> list:
"""
第二步回调:组装本轮消息,调用 LangChain 获取 AI 回复。
处理三种 content 格式:str / list(新版Gradio文本)/ dict(图片路径)
"""
user_messages = get_last_user_messages(history)
content = []
has_text = False
for x in user_messages:
msg_content = x['content']
if isinstance(msg_content, str):
content.append({'type': 'text', 'text': msg_content})
has_text = True
elif isinstance(msg_content, list):
for item in msg_content:
if not isinstance(item, dict):
continue
if item.get('type') == 'text':
content.append({'type': 'text', 'text': item['text']})
has_text = True
elif item.get('type') == 'file':
fp = item.get('file', {}).get('path')
if fp:
_append_file(content, fp)
elif isinstance(msg_content, dict):
fp = msg_content.get('path') or msg_content.get('url') or msg_content.get('name')
if fp:
_append_file(content, fp)
# 只有图片没有文字时,自动补一条描述指令(避免部分模型报错)
if content and not has_text:
content.append({'type': 'text', 'text': '请详细描述这张图片的内容。'})
if not content:
print("[警告] content 为空,跳过本次调用")
return history
input_message = HumanMessage(content=content)
try:
# invoke 三步:① 读SQLite历史 ② 调用模型 ③ 写回历史
resp = chain_history.invoke({'messages': [input_message]}, config)
history.append({'role': 'assistant', 'content': resp.content})
except Exception as e:
print(f"\n[调用大模型失败] {e}\n")
history.append({'role': 'assistant', 'content': f"⚠️ 请求出错:{e}"})
return history
# =============================================================================
# Gradio 界面
# =============================================================================
with gr.Blocks(title='多模态聊天机器人') as block:
chatbot = gr.Chatbot(height=600, label='AI 助手')
chat_input = gr.MultimodalTextbox(
interactive=True,
file_types=['image', 'audio'],
file_count="multiple",
placeholder="请输入文字,或点击 📎 上传图片/语音...",
show_label=False,
sources=["microphone", "upload"],
)
# 三步事件链:
# 1. add_message → 立即显示用户消息,锁定输入框
# 2. submit_messages → 调用 AI,追加回复
# 3. lambda → 解锁输入框
chat_input.submit(
fn=add_message,
inputs=[chatbot, chat_input],
outputs=[chatbot, chat_input],
).then(
fn=submit_messages,
inputs=[chatbot],
outputs=[chatbot],
).then(
fn=lambda: gr.MultimodalTextbox(interactive=True),
inputs=None,
outputs=[chat_input],
)
if __name__ == '__main__':
block.launch(theme=gr.themes.Soft())

相关笔记