-
Notifications
You must be signed in to change notification settings - Fork 1.9k
feat: add Qwen3-ASR streaming WebSocket server with notes #3035
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
qiulang
wants to merge
1
commit into
modelscope:main
Choose a base branch
from
qiulang:qwen3-asr-ws-example
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+462
−0
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
143 changes: 143 additions & 0 deletions
143
examples/industrial_data_pretraining/qwen3_asr/serve_qwen3_asr_ws.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,143 @@ | ||
| #!/usr/bin/env python3 | ||
| # coding=utf-8 | ||
| """ | ||
| Qwen3-ASR streaming WebSocket service. | ||
|
|
||
| 把官方 example_qwen3_asr_vllm_streaming.py 的原生流式 API | ||
| (init_streaming_state / streaming_transcribe / finish_streaming_transcribe) | ||
| 包成一个 WebSocket 服务,协议与 Fun-ASR-Nano 的 serve_realtime_ws.py 一致, | ||
| 因此可以直接用同一个 bench_streaming_ws.py 压测,对比。 | ||
|
|
||
| 协议: | ||
| 1. 客户端连接 ws://host:port | ||
| 2. 客户端发文本 "START" → 服务端回 {"event": "started"} | ||
| 3. 客户端发二进制 int16 PCM 块(16kHz 单声道) | ||
| 4. 服务端随转写增长发 {"partial": "<当前文本>"} | ||
| 5. 客户端发文本 "STOP" → 服务端回 | ||
| {"is_final": true, "sentences": [{"text": "<最终文本>"}]} | ||
| 然后 {"event": "stopped"} | ||
|
|
||
| 架构对齐 serve_realtime_ws.py:单 asyncio 事件循环,streaming_transcribe / | ||
| finish_streaming_transcribe 同步调用、阻塞整个循环——这样压测出来的并发特性 | ||
| 才和 Fun-ASR-Nano 那条同口径可比。生产扩展同样靠 多进程 + CUDA MPS + nginx | ||
| (见 vllm_guide §6.7)。 | ||
|
|
||
| 关于 VAD 参见配套说明文档 | ||
|
|
||
|
|
||
| 依赖: | ||
| pip install qwen-asr[vllm] websockets numpy | ||
| 启动: | ||
| python serve_qwen3_asr_ws.py --port 10095 --gpu-memory-utilization 0.8 | ||
| # 可选:--chunk-size-sec 控制流式块大小(默认 2.0)。值越小出字越快/越勤, | ||
| # 但并发开销越大(实测 1.0 比 2.0 明显更吃并发)。 | ||
| """ | ||
| import asyncio | ||
| import argparse | ||
| import json | ||
| import logging | ||
|
|
||
| import numpy as np | ||
| import websockets | ||
|
|
||
| from qwen_asr import Qwen3ASRModel | ||
|
|
||
| # websockets 默认会对每个连接打 INFO 级 "connection open/closed",压测时刷屏; | ||
| # 提到 WARNING 关掉这条噪音(不影响连接行为,纯日志)。 | ||
| logging.getLogger("websockets").setLevel(logging.WARNING) | ||
|
|
||
| SAMPLE_RATE = 16000 | ||
|
|
||
| # 全局只加载一次;所有连接共用模型,各自持有独立的 streaming state。 | ||
| asr = None | ||
|
|
||
| # 流式块大小(秒),由 --chunk-size-sec 设置,handle_client 里 init_streaming_state 用。 | ||
| # 默认 2.0(官方 example 值);值越小出字越勤、并发开销越大。 | ||
| CHUNK_SIZE_SEC = 2.0 | ||
|
|
||
|
|
||
| def int16_pcm_to_float32(pcm_bytes: bytes) -> np.ndarray: | ||
| """bench 发来的是 int16 小端 PCM;Qwen3-ASR 的 streaming_transcribe 吃 float32 [-1,1)。""" | ||
| return np.frombuffer(pcm_bytes, dtype=np.int16).astype(np.float32) / 32768.0 | ||
|
|
||
|
|
||
| async def handle_client(ws, path=None): # path 兼容老版本 websockets 的两参回调 | ||
| state = None | ||
| last_partial = None | ||
| try: | ||
| async for msg in ws: | ||
| # ---- 文本控制消息 ---- | ||
| if isinstance(msg, str): | ||
| if msg == "START": | ||
| # 每个连接一份独立 state;参数同官方 example | ||
| state = asr.init_streaming_state( | ||
| unfixed_chunk_num=2, | ||
| unfixed_token_num=5, | ||
| chunk_size_sec=CHUNK_SIZE_SEC, | ||
| ) | ||
| last_partial = None | ||
| await ws.send(json.dumps({"event": "started"})) | ||
|
|
||
| elif msg == "STOP": | ||
| if state is not None: | ||
| # 同步收尾,阻塞循环(与 serve_realtime_ws.py 句尾 finalize 同口径) | ||
| asr.finish_streaming_transcribe(state) | ||
| final_text = (state.text or "").strip() | ||
| await ws.send(json.dumps({ | ||
| "is_final": True, | ||
| "sentences": [{"text": final_text}] if final_text else [], | ||
| })) | ||
| await ws.send(json.dumps({"event": "stopped"})) | ||
| break | ||
| # 其它文本忽略 | ||
|
|
||
| # ---- 二进制音频块 ---- | ||
| else: | ||
| if state is None: | ||
| continue # 还没 START,丢弃 | ||
| seg = int16_pcm_to_float32(msg) | ||
| # 同步调用,阻塞整个事件循环 —— 这正是要复刻的单循环架构 | ||
| asr.streaming_transcribe(seg, state) | ||
| text = state.text or "" | ||
| # 只在文本变化时发 partial,避免刷屏(不影响 bench 的首词延迟统计) | ||
| if text != last_partial: | ||
| last_partial = text | ||
| await ws.send(json.dumps({"partial": text})) | ||
|
|
||
| except websockets.exceptions.ConnectionClosed: | ||
| pass | ||
|
|
||
|
|
||
| async def amain(args): | ||
| global asr, CHUNK_SIZE_SEC | ||
| CHUNK_SIZE_SEC = args.chunk_size_sec | ||
| print(f"Loading {args.model} (gpu_memory_utilization={args.gpu_memory_utilization}, chunk_size_sec={CHUNK_SIZE_SEC}) ...") | ||
| # Streaming is vLLM-only and no forced aligner supported.(官方 example 注释) | ||
| asr = Qwen3ASRModel.LLM( | ||
| model=args.model, | ||
| gpu_memory_utilization=args.gpu_memory_utilization, | ||
| max_new_tokens=32, # 流式用小值,同官方 example | ||
| ) | ||
| print(f"Serving on ws://{args.host}:{args.port} (Ctrl-C to stop)") | ||
| async with websockets.serve( | ||
| handle_client, args.host, args.port, max_size=10 * 1024 * 1024 | ||
| ): | ||
| await asyncio.Future() # run forever | ||
|
|
||
|
|
||
| def main(): | ||
| p = argparse.ArgumentParser() | ||
| p.add_argument("--host", default="0.0.0.0") | ||
| p.add_argument("--port", type=int, default=10095) | ||
| p.add_argument("--model", default="Qwen/Qwen3-ASR-1.7B") | ||
| p.add_argument("--gpu-memory-utilization", type=float, default=0.8, | ||
| dest="gpu_memory_utilization") | ||
| p.add_argument("--chunk-size-sec", type=float, default=2.0, | ||
| dest="chunk_size_sec", | ||
| help="流式块大小(秒),传给 init_streaming_state。默认 2.0;越小出字越勤但并发开销越大(实测 1.0 比 2.0 明显更吃并发)") | ||
| args = p.parse_args() | ||
| asyncio.run(amain(args)) | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| main() | ||
143 changes: 143 additions & 0 deletions
143
examples/industrial_data_pretraining/qwen3_asr/serve_qwen3_asr_ws_notes.md
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,143 @@ | ||
| # serve_qwen3_asr_ws.py 说明与已知问题 | ||
|
|
||
| 本文档记录 `serve_qwen3_asr_ws.py` 的几个容易困惑/踩坑的点。代码里对应位置只留一行 | ||
| 指向本文的精简注释,细节看这里。 | ||
|
|
||
| --- | ||
|
|
||
| ## 1. 关于 VAD | ||
|
|
||
| > 在开源仓库 github.com/QwenLM/Qwen3-ASR 没有关于 `VAD` 内容。但商用 [Qwen3-ASR的文档/示例](https://help.aliyun.com/zh/model-studio/qwen-asr-realtime-interaction-process)里**是有 VAD 设置**,例如: | ||
| > ```json | ||
| > "turn_detection": { "type": "server_vad", "threshold": 0.2, "silence_duration_ms": 800 } | ||
| > ``` | ||
| > 以下是我的个人理解 | ||
|
|
||
| VAD 在 ASR 里其实有两种完全不同的用途,容易混为一谈: | ||
|
|
||
| - **A. 切段用的 VAD(给非流式 encoder 喂分段)**:像 Fun-ASR-Nano 这类模型 encoder 是 | ||
| **非流式**的(一次要看完整一段),必须靠 VAD 把连续音频切成一句句、每句整体编码解码。 | ||
| 这种 VAD 对 Fun-ASR-Nano 这类模型 **技术必需**——不切就没法编码。 | ||
|
|
||
| - **B. 端点/轮次检测用的 VAD(判断"这一轮说完了没")**:检测说话人停顿(如静音 800ms) | ||
| 来判定"一句话/一轮结束",从而触发"锁定文本 / 发 is_final / 该回应了"。这是**产品行为** | ||
| 层面的需求,和 encoder 能不能流式无关。 | ||
|
|
||
| 对 **Qwen3-ASR 的开源流式 API**(本服务用的 `qwen-asr[vllm]` 的 | ||
| `init_streaming_state` / `streaming_transcribe`): | ||
|
|
||
| - **不需要 A 类(切段)VAD**:它是**增量式**流式——每次只吃新增的一小段音频、状态向前 | ||
| 滚动,连续转写,不存在"先切句再解码"。"哪些字定了、哪些会变"由 `unfixed_chunk_num` / | ||
| `unfixed_token_num` 表示(尾部 N 个 chunk/token 算"未定"、会被后续音频修正,其余视为 | ||
| 已确认)——这相当于内置的 partial/锁定机制,取代了 A 类 VAD 的切段职责。所以在开源 | ||
| 流式 API 的代码里搜不到 vad,是因为它**这一层不做切段**。 | ||
|
|
||
| - **仍然需要 B 类(端点)VAD —— 只是开源 API 自己不带**:自动判断"用户停顿=这一轮结束" | ||
| 这件事,`streaming_transcribe` 本身不管。商用服务在 ASR **之外**包了一层 `server_vad` | ||
| 来做(就是上面那段 `turn_detection`)。本服务目前是用客户端显式发 **`STOP`** 来代替 | ||
| 这个端点判断(bench 里音频放完即发 STOP)。**若要在真实场景自动断句/断轮,需要自己在 | ||
| 本服务之外接一个 VAD / 端点检测**(角色等同商用的 server_vad),而不是去 Qwen3-ASR | ||
| 内部找——它的开源流式 API 不含这一层。 | ||
|
|
||
| **一句话**:Qwen3-ASR 增量流式**省掉了"切段 VAD"(A)**,但**"端点/轮次 VAD"(B)这个 | ||
| 职责依然存在**,商用版用 `server_vad` 实现、本服务用手动 `STOP` 代替。两者不矛盾。 | ||
|
|
||
| ### 1.1 官方佐证:商用 Qwen-ASR-Realtime 的"VAD 模式 / Manual 模式" | ||
|
|
||
| 阿里云百炼的实时语音识别(Qwen-ASR-Realtime)文档明确把"断句由谁做"分成两种模式, | ||
| 本质就是 `session.turn_detection` 开还是关: | ||
|
|
||
| - **VAD 模式(默认,`turn_detection` 配置为 server_vad)**:服务端自动检测语音起点/终点 | ||
| 来断句,客户端只管持续发音频流,服务端在"检测到一句话结束"时自动返回最终结果。流程中 | ||
| 服务端会发 `input_audio_buffer.speech_started` / `speech_stopped` 等事件——这就是上面说的 | ||
| **B 类端点 VAD**,由服务端那一层(server_vad)实现,**不是** ASR 内核在切段。 | ||
|
|
||
| - **Manual 模式(`turn_detection` 设为 null)**:由**客户端**控制断句——发完一整句音频后, | ||
| 客户端发 `input_audio_buffer.commit` 通知服务端边界。适用于客户端能明确判断语句边界的 | ||
| 场景(如"按住说话"、聊天发语音)。 | ||
|
|
||
| > 对应关系:本服务 `serve_qwen3_asr_ws.py` 用客户端显式发 **`STOP`** 来标记一轮结束, | ||
| > 等价于商用的 **Manual 模式**(`turn_detection=null`,由客户端控制边界)。若要做成"服务端 | ||
| > 自动断句",就是去实现商用 **VAD 模式** 的那一层端点检测(server_vad),加在本服务的 | ||
| > 增量转写之外,而不是在 Qwen3-ASR 转写内核里找。 | ||
| > | ||
| > 文档:实时语音识别(Qwen-ASR-Realtime)交互流程 | ||
| > (help.aliyun.com/zh/model-studio/qwen-asr-realtime-interaction-process): ”服务端自动检测语音的起点和终点(断句)。开发者只需持续发送音频流,服务端会在检测到一句话结束时自动返回最终识别结果。此模式适用于实时对话、会议记录等场景。“ | ||
|
|
||
| ### 1.2 `chunk-size-sec` 控制流式块大小 | ||
|
|
||
| `chunk-size-sec` 控制流式块大小(默认 2.0)。值越小出字越快/越勤,但并发开销越大(实测 1.0 比 2.0 在L20 上,29秒音频 48路并发,1.0 全部失败,2.0 全部通过)。 | ||
|
|
||
| --- | ||
|
|
||
| ## 2 必须用 vllm 0.14,不要用 0.19(rope_scaling / thinker_config 警告) | ||
|
|
||
| 本服务需要vllm加速, `qwen-asr[vllm]`,[它锁定 `vllm==0.14.0`](https://github.com/QwenLM/Qwen3-ASR/blob/main/pyproject.toml)。若换成更新的vllm版本, 比如 0.19.x,启动会**出现**: | ||
|
|
||
| ``` | ||
| Unrecognized keys in `rope_scaling` for 'rope_type'='default': | ||
| {'mrope_section', 'mrope_interleaved', 'interleaved'} | ||
| thinker_config is None. Initializing thinker model with default values | ||
| ``` | ||
|
|
||
| **根因**:vllm 在 0.14 → 0.19 之间 (transformers 两版都是 4.57.6),config 解析里的 | ||
| `patch_rope_scaling_dict` 会把 `rope_type` 从 `'mrope'` 改写成 `'default'`(它把 mrope | ||
| 当 legacy、假设由 vllm 内部消化 `mrope_section` 等字段): | ||
|
|
||
| ```python | ||
| elif rope_scaling["rope_type"] == "mrope": | ||
| assert "mrope_section" in rope_scaling | ||
| rope_scaling["rope_type"] = "default" # ← 改写 | ||
| ``` | ||
|
|
||
| 但 Qwen3-ASR 自带的 `Qwen3ASRThinkerTextRotaryEmbedding` 期望从 `rope_scaling` 里读到 | ||
| `"mrope"` 才走多模态 RoPE 分支: | ||
|
|
||
| ```python | ||
| self.rope_type = config.rope_scaling.get("rope_type", "default") | ||
| ``` | ||
|
|
||
| 被 vllm 改写成 `"default"` 后,它走了普通 RoPE,`mrope_section` / `mrope_interleaved` / | ||
| `interleaved` 这几个键没人认领 → 打印 "Unrecognized keys" 警告,且音频/文本的多模态 | ||
| 位置编码退化。`thinker_config is None` 那条同源:0.19 的加载路径没正确解析 Qwen3-ASR 的 | ||
| thinker 子配置,回退到默认参数。 | ||
|
|
||
| **影响与抉择**:在 0.19 上服务"能起、也能出字"(两条是 WARNING/INFO 不是 ERROR),抽查 | ||
| 几条转写也"看着正常";但位置编码退化对长音频/复杂内容可能有害,且**未做 CER 量化对比**, | ||
| 无法判定等价。保守起见固定用 `qwen-asr[vllm]` 自带的 `vllm==0.14.0`。 | ||
|
|
||
| ### 2.1 vllm 加速需要用 Qwen3ASRModel | ||
|
|
||
| funasr 的`AutoModelVLLM` 不能加速 Qwen3-ASR , 必须要用 `from qwen_asr import Qwen3ASRModel` | ||
|
|
||
| > 这解答 `#3026` 的问题 | ||
| > | ||
| ## 3. tokenizer 的 `fix_mistral_regex` 警告(无害,可忽略) | ||
|
|
||
| 启动时可能出现: | ||
|
|
||
| ``` | ||
| The tokenizer you are loading from '.../Qwen3-ASR-1.7B' with an incorrect regex | ||
| pattern ... This will lead to incorrect tokenization. You should set the | ||
| `fix_mistral_regex=True` flag when loading this tokenizer to fix this issue. | ||
| ``` | ||
|
|
||
| **原因**:Qwen3-ASR 的 tokenizer 沿用了一类带已知 regex 问题的分词器实现,底层库检测到 | ||
| 该 regex 模式后给出提醒,建议加 `fix_mistral_regex=True` 修正切分。本服务通过 qwen-asr | ||
| 的高层 API 加载模型、并不直接构造 tokenizer,没有暴露这个开关,所以这条提醒按原样打印。 | ||
|
|
||
| **影响**:实测对中文 ASR 转写结果无可见影响(**抽查**多条转写正常,未做 CER 量化)。该 | ||
| regex 修正主要影响某些特殊 token 的边界切分,对语音转写路径未观察到差异。属于"提醒级" | ||
| 噪音,可忽略。若要彻底消除,需在更底层自行加载 tokenizer 时传 `fix_mistral_regex=True`, | ||
| 但 qwen-asr 高层 API 当前不直接支持,且无实测必要。 | ||
|
|
||
| --- | ||
|
|
||
| ### 顺带:另外两条启动日志(均无害) | ||
|
|
||
| - `Error retrieving safetensors: Repo id must be in the form ...`:把本地模型路径当成 HF | ||
| 仓库 id 去查线上元数据,失败后重试 2 次、回退本地加载,不影响功能。可设环境变量 | ||
| `HF_HUB_OFFLINE=1` 消除。 | ||
|
|
||
| - `Downcasting torch.float32 to torch.bfloat16`:权重以 fp32 存、按 bf16 加载,正常省显存 / | ||
| 提速,bf16 与 fp32 指数位同宽,精度几乎无损。这是 INFO 不是错误。 |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
建议在
handle_client中增加对通用异常(Exception)的捕获并记录日志。目前仅捕获了websockets.exceptions.ConnectionClosed,如果发生其他未预期的异常(例如音频数据转换错误或模型推理异常),连接会直接中断且没有任何错误日志,这会增加排查问题的难度。可以参考serve_realtime_ws.py中的异常处理设计。