1704 字
9 分钟
完整注释代码
完整注释代码
本篇保留项目的完整注释代码,方便和前面几篇模块分析对照阅读。阅读顺序建议是:先看 02_系统架构与回调流程、04_LangChain链路与会话历史、05_图像处理与多模态消息组装,再回到这里整体串读。
# =============================================================================# 多模态聊天机器人(LangChain + ASR 版)# 架构:Gradio 前端 → 消息预处理 → LangChain → 阿里云多模态大模型# =============================================================================
import base64 # 二进制 → base64 字符串,用于 API 传输import io # 内存字节流,图片转换时不写临时文件import time # sensevoice 轮询时休眠import httpx # HTTP 客户端,调用阿里云原生 APIimport gradio as grfrom PIL import Imagefrom pathlib import Path
from langchain_community.chat_message_histories import SQLChatMessageHistoryfrom langchain_core.messages import HumanMessagefrom langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholderfrom langchain_core.runnables import RunnableWithMessageHistory
from my_llm import multiModal_llm, asr_client
# ── 常量 ──────────────────────────────────────────────────────────────────────# 原生多模态接口(支持 base64 音频);compatible-mode 只支持公网 URLDASHSCOPE_MULTIMODAL_URL = ( "https://dashscope.aliyuncs.com/api/v1/services/aigc/" "multimodal-generation/generation")DASHSCOPE_ASR_SUBMIT_URL = ( "https://dashscope.aliyuncs.com/api/v1/services/audio/asr/transcription")DASHSCOPE_TASK_QUERY_URL = "https://dashscope.aliyuncs.com/api/v1/tasks/{task_id}"POLL_MAX_RETRIES = 20 # 最多轮询 20 次POLL_INTERVAL_SECONDS = 2 # 每次间隔 2 秒,共最长等待 40 秒
# =============================================================================# 语音识别:本地音频 → 文字# 主方案:qwen3-asr-flash(同步)# 降级方案:sensevoice-v1(异步轮询)# =============================================================================
def transcribe_audio_to_text(audio_path: str) -> str: print(f"[正在识别语音] {audio_path}")
# 步骤 1:音频文件 → Base64 Data URI # Data URI 格式:data:<MIME>;base64,<数据>,可内嵌在 JSON 里传输 ext = Path(audio_path).suffix.lower().lstrip('.') or 'wav' fmt_map = {'m4a': 'mp4', 'ogg': 'ogg', 'flac': 'flac', 'mp3': 'mp3'} audio_fmt = fmt_map.get(ext, 'wav') mime_type = f"audio/{audio_fmt}"
with open(audio_path, "rb") as f: audio_b64 = base64.b64encode(f.read()).decode('utf-8') data_uri = f"data:{mime_type};base64,{audio_b64}"
api_key = asr_client.api_key # 从已初始化的客户端取 key,不用重复配置
# 步骤 2:主方案 —— qwen3-asr-flash(DashScope 原生多模态接口) # 注意:audio 字段必须是字符串(Data URI),不能是字典 try: resp = httpx.post( DASHSCOPE_MULTIMODAL_URL, headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}, json={ "model": "qwen3-asr-flash", "input": { "messages": [ {"role": "system", "content": [{"text": ""}]}, # 必须有,留空 {"role": "user", "content": [{"audio": data_uri}]}, ] }, "parameters": {"asr_options": {"enable_itn": False}}, }, timeout=30, ) resp.raise_for_status() # 返回结构:output.choices[0].message.content[0].text text = resp.json()["output"]["choices"][0]["message"]["content"][0]["text"].strip() print(f"[识别结果]: {text}") return text
except Exception as exc: print(f"\n[qwen3-asr-flash 失败] {exc}") print("[切换到 sensevoice-v1 异步接口...]")
# 步骤 3:降级方案 —— sensevoice-v1(异步「提交-轮询」) try: # 第一步:提交任务,X-DashScope-Async: enable 是必须的请求头 submit_resp = httpx.post( DASHSCOPE_ASR_SUBMIT_URL, headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json", "X-DashScope-Async": "enable"}, json={"model": "sensevoice-v1", "input": {"file_url": data_uri}, "parameters": {}}, timeout=15, ) submit_resp.raise_for_status() task_id = submit_resp.json()["output"]["task_id"] print(f"[sensevoice 任务已提交] task_id={task_id}")
# 第二步:轮询任务状态,直到完成或超时 for attempt in range(POLL_MAX_RETRIES): time.sleep(POLL_INTERVAL_SECONDS) poll_resp = httpx.get( DASHSCOPE_TASK_QUERY_URL.format(task_id=task_id), headers={"Authorization": f"Bearer {api_key}"}, timeout=10, ) poll_resp.raise_for_status() poll_data = poll_resp.json() status = poll_data["output"]["task_status"] print(f"[轮询 {attempt + 1}/{POLL_MAX_RETRIES}] 状态: {status}")
if status == "SUCCEEDED": text = poll_data["output"]["results"][0]["transcription"] print(f"[降级识别结果]: {text}") return text elif status in ("FAILED", "CANCELED"): print(f"[sensevoice 任务失败] {poll_data}") break
except Exception as fallback_exc: print(f"[降级识别也失败] {fallback_exc}")
return "[抱歉,系统未能听清您的语音内容]"
# =============================================================================# LangChain 初始化# =============================================================================
# Prompt 模板:system 设定人格,MessagesPlaceholder 是历史消息的占位符# 运行时框架会把 SQLite 历史插入到 placeholder 位置prompt = ChatPromptTemplate.from_messages([ ('system', "你是一个强大的多模态AI助手,可以精准地处理文本和图像输入。"), MessagesPlaceholder(variable_name="messages"),])
# 管道符 | 把 prompt 和 llm 串联:输入 → prompt渲染 → llm调用 → 输出chain = prompt | multiModal_llm
def get_session_history(session_id: str) -> SQLChatMessageHistory: """框架在每次 invoke 前后自动调用此函数读写历史,无需手动调用""" return SQLChatMessageHistory( session_id=session_id, connection='sqlite:///chat_history.db', )
# 包装 chain,赋予「自动读写历史」能力chain_history = RunnableWithMessageHistory(chain, get_session_history)
# 注意:硬编码 session_id 意味着所有用户共享历史# 多用户场景应改为 str(uuid.uuid4()) 或根据用户 ID 生成config = {"configurable": {"session_id": "usr000"}}
# =============================================================================# 图像处理# =============================================================================
def transcribe_image(image_path: str) -> dict | None: """图片文件 → OpenAI 兼容的 image_url 格式字典""" try: with Image.open(image_path) as img: if img.mode in ('RGBA', 'P'): img = img.convert('RGB') # RGBA/P 模式不支持 JPEG,统一转 RGB img_format = img.format or 'JPEG' buffered = io.BytesIO() # 内存流,避免写临时文件 img.save(buffered, format=img_format) image_data = base64.b64encode(buffered.getvalue()).decode('utf-8') return { "type": "image_url", "image_url": { "url": f"data:image/{img_format.lower()};base64,{image_data}", "detail": "high", # high: 高精度,按尺寸计费;low: 固定85token }, } except Exception as e: print(f"[图像处理失败] {e}") return None
def _append_file(content: list, file_path: str) -> None: """根据后缀追加文件到 content 列表(音频已在前端拦截,这里只剩图片)""" ext = Path(file_path).suffix.lower() if ext in ('.jpg', '.jpeg', '.png', '.webp', '.bmp', '.gif'): msg = transcribe_image(file_path) if msg: content.append(msg) else: print(f"[警告] 不支持的文件类型: {file_path}")
# =============================================================================# 消息组装# =============================================================================
def get_last_user_messages(history: list) -> list: """ 提取「最后一个 assistant 回复之后」的所有 user 消息(即本轮新增消息)。 避免把完整历史重复传给 LangChain(历史已在 SQLite 里,框架自动加载)。 """ if not history or history[-1]["role"] == "assistant": return [] last_assistant_idx = next( (i for i in range(len(history) - 1, -1, -1) if history[i]["role"] == "assistant"), -1, ) return history[last_assistant_idx + 1:]
# =============================================================================# Gradio 回调# =============================================================================
def add_message(history: list, messages: dict) -> tuple: """ 第一步回调:处理用户输入,立即更新界面。 - 音频:转文字后存入 history(不让二进制音频流入 LangChain) - 图片:存文件路径,供 submit_messages 读取编码 - 文字:直接存入 history """ for file_path in messages.get('files', []): print(f"[UI] 收到文件: {file_path}") ext = Path(file_path).suffix.lower() if ext in ('.wav', '.mp3', '.m4a', '.ogg', '.flac'): text = transcribe_audio_to_text(file_path) if text: history.append({'role': 'user', 'content': f"🎤 [语音输入]: {text}"}) else: # type='messages' 模式下图片必须用字典格式,不能用 tuple history.append({'role': 'user', 'content': {'path': file_path}})
text_input = messages.get("text", "").strip() if text_input: history.append({"role": "user", "content": text_input})
# 返回更新后的 history 和清空+禁用的输入框 return history, gr.MultimodalTextbox(value=None, interactive=False)
def submit_messages(history: list) -> list: """ 第二步回调:组装本轮消息,调用 LangChain 获取 AI 回复。 处理三种 content 格式:str / list(新版Gradio文本)/ dict(图片路径) """ user_messages = get_last_user_messages(history) content = [] has_text = False
for x in user_messages: msg_content = x['content']
if isinstance(msg_content, str): content.append({'type': 'text', 'text': msg_content}) has_text = True
elif isinstance(msg_content, list): for item in msg_content: if not isinstance(item, dict): continue if item.get('type') == 'text': content.append({'type': 'text', 'text': item['text']}) has_text = True elif item.get('type') == 'file': fp = item.get('file', {}).get('path') if fp: _append_file(content, fp)
elif isinstance(msg_content, dict): fp = msg_content.get('path') or msg_content.get('url') or msg_content.get('name') if fp: _append_file(content, fp)
# 只有图片没有文字时,自动补一条描述指令(避免部分模型报错) if content and not has_text: content.append({'type': 'text', 'text': '请详细描述这张图片的内容。'})
if not content: print("[警告] content 为空,跳过本次调用") return history
input_message = HumanMessage(content=content)
try: # invoke 三步:① 读SQLite历史 ② 调用模型 ③ 写回历史 resp = chain_history.invoke({'messages': [input_message]}, config) history.append({'role': 'assistant', 'content': resp.content}) except Exception as e: print(f"\n[调用大模型失败] {e}\n") history.append({'role': 'assistant', 'content': f"⚠️ 请求出错:{e}"})
return history
# =============================================================================# Gradio 界面# =============================================================================
with gr.Blocks(title='多模态聊天机器人') as block: chatbot = gr.Chatbot(height=600, label='AI 助手')
chat_input = gr.MultimodalTextbox( interactive=True, file_types=['image', 'audio'], file_count="multiple", placeholder="请输入文字,或点击 📎 上传图片/语音...", show_label=False, sources=["microphone", "upload"], )
# 三步事件链: # 1. add_message → 立即显示用户消息,锁定输入框 # 2. submit_messages → 调用 AI,追加回复 # 3. lambda → 解锁输入框 chat_input.submit( fn=add_message, inputs=[chatbot, chat_input], outputs=[chatbot, chat_input], ).then( fn=submit_messages, inputs=[chatbot], outputs=[chatbot], ).then( fn=lambda: gr.MultimodalTextbox(interactive=True), inputs=None, outputs=[chat_input], )
if __name__ == '__main__': block.launch(theme=gr.themes.Soft())相关笔记