跳转到主要内容
更多

连接复用与连接池

面向高并发场景的 HTTP 连接复用和 WebSocket 连接池配置指南。

复用连接可以减少资源消耗、提升吞吐量。具体策略取决于协议类型:
  • HTTP API(文本生成、多模态、Embeddings):通过连接池配置(Java)或 Session 对象(Python)复用 TCP 连接。
  • WebSocket API(TTS、实时语音):池化持有长连接的合成器对象。

前提条件

  • 已获取 API Key 并配置为 DASHSCOPE_API_KEY 环境变量。
  • 已安装最新版 DashScope SDK:
    • Python SDK:>= 1.25.2
    • Java SDK:>= 2.16.6

HTTP 连接复用

DashScope 端点因模型类型而异:
  • 文本模型qwen-plusqwen3-max 等):使用 Generation 类,路由到 /services/aigc/text-generation/generation
  • 多模态模型qwen3.7-plusqwen3-vl-plus 等):使用 MultiModalConversation 类,路由到 /services/aigc/multimodal-generation/generation

Java SDK

连接池默认启用,可根据需要调整以下参数。
参数说明默认值单位备注
connectTimeout建立连接的超时时间120低延迟场景可缩短此值以减少等待时间。
readTimeout读取数据的超时时间300
writeTimeout写入数据的超时时间60
connectionIdleTimeout空闲连接的超时时间300适当延长可避免高并发时频繁重连。
connectionPoolSize连接池最大连接数32过少会导致阻塞,过多会增加服务器压力。
maximumAsyncRequests所有主机的最大并发请求数,需 ≤ connectionPoolSize32
maximumAsyncRequestsPerHost单个主机的最大并发请求数,需 ≤ maximumAsyncRequests32
配置连接池参数并调用模型服务:
// 建议 DashScope SDK 版本 >= 2.12.0
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import com.alibaba.dashscope.aigc.multimodalconversation.MultiModalConversation;
import com.alibaba.dashscope.aigc.multimodalconversation.MultiModalConversationParam;
import com.alibaba.dashscope.aigc.multimodalconversation.MultiModalConversationResult;
import com.alibaba.dashscope.common.MultiModalMessage;
import com.alibaba.dashscope.common.Role;
import com.alibaba.dashscope.exception.ApiException;
import com.alibaba.dashscope.exception.InputRequiredException;
import com.alibaba.dashscope.exception.NoApiKeyException;
import com.alibaba.dashscope.protocol.ConnectionConfigurations;
import com.alibaba.dashscope.protocol.Protocol;
import com.alibaba.dashscope.utils.Constants;

public class Main {
  public static MultiModalConversationResult callWithMessage() throws ApiException, NoApiKeyException, InputRequiredException {
    MultiModalConversation conv = new MultiModalConversation(Protocol.HTTP.getValue(), "https://dashscope.aliyuncs.com/api/v1");
    Map<String, Object> textContent = new HashMap<>();
    textContent.put("text", "Who are you?");
    MultiModalMessage userMsg = MultiModalMessage.builder()
        .role(Role.USER.getValue())
        .content(Collections.singletonList(textContent))
        .build();
    MultiModalConversationParam param = MultiModalConversationParam.builder()
        // 如果未配置环境变量,请替换为您的 API Key:.apiKey("sk-xxx")
        .apiKey(System.getenv("DASHSCOPE_API_KEY"))
        .model("qwen3.7-plus")
        .messages(Collections.singletonList(userMsg))
        .build();

    return conv.call(param);
  }
  public static void main(String[] args) {
    // 连接池配置
    Constants.connectionConfigurations = ConnectionConfigurations.builder()
        .connectTimeout(Duration.ofSeconds(10))  // 建立连接超时时间,默认 120s
        .readTimeout(Duration.ofSeconds(300)) // 读取数据超时时间,默认 300s
        .writeTimeout(Duration.ofSeconds(60)) // 写入数据超时时间,默认 60s
        .connectionIdleTimeout(Duration.ofSeconds(300)) // 空闲连接超时时间,默认 300s
        .connectionPoolSize(256) // 连接池最大连接数,默认 32
        .maximumAsyncRequests(256)  // 最大并发请求数,默认 32
        .maximumAsyncRequestsPerHost(256) // 单主机最大并发请求数,默认 32
        .build();

    try {
      MultiModalConversationResult result = callWithMessage();
      System.out.println(result.getOutput().getChoices().get(0).getMessage().getContent().get(0).get("text"));
    } catch (ApiException | NoApiKeyException | InputRequiredException e) {
      System.err.println("调用服务时发生错误:" + e.getMessage());
    }
    System.exit(0);
  }
}

Python SDK

Python SDK 通过自定义 Session 实现连接复用,支持异步 (aiohttp)同步 (requests.Session)两种方式。

异步 (aiohttp)

使用 aiohttp.ClientSessionaiohttp.TCPConnector 实现异步连接复用。
参数说明默认值备注
limit总连接数上限100增大此值可提升并发能力。
limit_per_host单主机连接数上限0(不限)防止对单个主机施加过大压力。
sslSSL 上下文配置None用于 HTTPS 连接的 SSL 证书验证。
import asyncio
import aiohttp
import ssl
import certifi
from dashscope import AioMultiModalConversation
import dashscope
import os

async def main():
  dashscope.base_http_api_url = 'https://dashscope.aliyuncs.com/api/v1'

  # 如果未配置环境变量,请替换为您的 API Key:dashscope.api_key = "sk-xxx"
  dashscope.api_key = os.getenv("DASHSCOPE_API_KEY")

  # 配置连接参数
  connector = aiohttp.TCPConnector(
    limit=100,           # 总连接数上限
    limit_per_host=30,   # 单主机连接数上限
    ssl=ssl.create_default_context(cafile=certifi.where()),
  )

  # 创建自定义 Session 并传入调用方法
  async with aiohttp.ClientSession(connector=connector) as session:
    response = await AioMultiModalConversation.call(
      model='qwen3.7-plus',
      messages=[{'role': 'user', 'content': [{'text': 'Hello, please introduce yourself'}]}],
      session=session,  # 传入自定义 Session
    )
    print(response)

asyncio.run(main())

同步 (requests.Session)

使用 requests.Session 实现同步连接复用。同一 Session 内的请求会复用 TCP 连接。
import requests
from dashscope import MultiModalConversation
import dashscope
import os

dashscope.base_http_api_url = 'https://dashscope.aliyuncs.com/api/v1'

# 如果未配置环境变量,请替换为您的 API Key:dashscope.api_key = "sk-xxx"
dashscope.api_key = os.getenv("DASHSCOPE_API_KEY")

# 使用 with 语句确保 Session 正确关闭
with requests.Session() as session:
  response = MultiModalConversation.call(
    model='qwen3.7-plus',
    messages=[{'role': 'user', 'content': [{'text': 'Hello'}]}],
    session=session  # 传入自定义 Session
  )
  print(response)
在多次调用间复用同一个 Session:
import requests
from dashscope import MultiModalConversation
import dashscope
import os

dashscope.base_http_api_url = 'https://dashscope.aliyuncs.com/api/v1'

# 如果未配置环境变量,请替换为您的 API Key:dashscope.api_key = "sk-xxx"
dashscope.api_key = os.getenv("DASHSCOPE_API_KEY")

# 创建 Session 对象
session = requests.Session()

try:
  # 在多次调用间复用同一个 Session
  response1 = MultiModalConversation.call(
    model='qwen3.7-plus',
    messages=[{'role': 'user', 'content': [{'text': 'Hello'}]}],
    session=session
  )
  print(response1)

  response2 = MultiModalConversation.call(
    model='qwen3.7-plus',
    messages=[{'role': 'user', 'content': [{'text': 'Introduce yourself'}]}],
    session=session
  )
  print(response2)
finally:
  # 确保 Session 正确关闭
  session.close()

  • 非实时 (MiniMax)
  • 语音识别
    语音对话
    音乐生成
    图片翻译
    多模态向量
    平台 API
    工具包与框架

    更多

    异步任务管理 API 参考

    通过 HTTP API 查询单个异步任务结果、批量查询异步任务状态、以及取消异步任务的完整参考文档。

    本文档介绍异步任务管理 HTTP API 的完整参数与使用方法。在使用前,请确保已获取 API Key。所有接口的基础路径为:
    https://dashscope.aliyuncs.com/api/v1/tasks
    
    Windows CMD 请将 $DASHSCOPE_API_KEY 替换为 %DASHSCOPE_API_KEY%,PowerShell 请替换为 $env:DASHSCOPE_API_KEY

    查询异步任务结果

    查询指定异步任务的执行结果。接口 QPS 限制为 20 次/账号,任务完成后结果保留 24 小时。
    curl -X GET 'https://dashscope.aliyuncs.com/api/v1/tasks/{task_id}' \
    --header "Authorization: Bearer $DASHSCOPE_API_KEY"
    

    WebSocket 连接池

    TTS 服务使用 WebSocket 连接进行实时流式传输。在生产环境中,每次请求都新建连接会浪费资源并增加延迟。本节介绍面向高吞吐 TTS 场景的连接池、对象池和并发请求管理方案。

    Python:对象池

    Python SDK 提供 SpeechSynthesizerObjectPool 来管理和复用 SpeechSynthesizer 实例。对象池在初始化时预创建对象并建立 WebSocket 连接,消除了逐请求建连的开销。 池大小建议:将 max_size 设置为峰值并发的 1.5~2 倍,但不要超过账户的 QPS 上限。
    import os
    import threading
    import dashscope
    from dashscope.audio.tts_v2 import *
    
    dashscope.api_key = os.getenv("DASHSCOPE_API_KEY")
    dashscope.base_websocket_api_url = 'wss://dashscope.aliyuncs.com/api-ws/v1/inference'
    
    # 创建全局对象池(仅在启动时初始化一次)
    pool = SpeechSynthesizerObjectPool(max_size=20)
    
    def synthesize(text, task_id):
      complete_event = threading.Event()
    
      class Callback(ResultCallback):
        def on_open(self):
          self.file = open(f'result_{task_id}.mp3', 'wb')
    
        def on_complete(self):
          complete_event.set()
    
        def on_error(self, message):
          print(f'[task_{task_id}] 错误:{message}')
    
        def on_data(self, data):
          self.file.write(data)
    
        def on_close(self):
          if hasattr(self, 'file'):
            self.file.close()
    
      callback = Callback()
    
      # 从池中借用一个已建连的合成器
      synth = pool.borrow_synthesizer(
        model='cosyvoice-v3-flash',
        voice='longanyang',
        callback=callback
      )
    
      try:
        synth.call(text)
        complete_event.wait()
        print(f'[task_{task_id}] 首包延迟:'
           f'{synth.get_first_package_delay()} ms')
        # 将合成器归还到池中以供复用
        pool.return_synthesizer(synth)
      except Exception as e:
        print(f'[task_{task_id}] 失败:{e}')
        synth.close()  # 失败的对象不要归还到池中
    
    # 运行并发任务
    texts = ["第一句话。", "第二句话。", "第三句话。"]
    threads = [threading.Thread(target=synthesize, args=(t, i))
         for i, t in enumerate(texts)]
    for t in threads:
      t.start()
    for t in threads:
      t.join()
    
    pool.shutdown()
    
    如果任务失败或仍在运行,不要将合成器归还到池中,应手动调用 close 关闭。

    Java:连接池 + 对象池

    Java SDK 使用 OkHttp3 连接池(默认启用),并可选配 Apache Commons Pool2 对象池来管理 SpeechSynthesizer 实例。 第 1 步:通过环境变量配置连接池
    变量默认值建议
    DASHSCOPE_CONNECTION_POOL_SIZE32峰值并发的 2 倍
    DASHSCOPE_MAXIMUM_ASYNC_REQUESTS32与连接池大小一致
    DASHSCOPE_MAXIMUM_ASYNC_REQUESTS_PER_HOST32与连接池大小一致
    export DASHSCOPE_CONNECTION_POOL_SIZE=2000
    export DASHSCOPE_MAXIMUM_ASYNC_REQUESTS=2000
    export DASHSCOPE_MAXIMUM_ASYNC_REQUESTS_PER_HOST=2000
    
    第 2 步:添加 commons-pool2 依赖
    • Maven
    • Gradle
    <dependency>
      <groupId>org.apache.commons</groupId>
      <artifactId>commons-pool2</artifactId>
      <version>the-latest-version</version>
    </dependency>
    
    第 3 步:创建并使用对象池
    变量默认值建议
    SAMBERT_OBJECTPOOL_SIZE(Sambert)500峰值并发的 1.5~2 倍,不超过连接池大小
    import com.alibaba.dashscope.audio.tts.SpeechSynthesisParam;
    import com.alibaba.dashscope.audio.tts.SpeechSynthesizer;
    import org.apache.commons.pool2.BasePooledObjectFactory;
    import org.apache.commons.pool2.PooledObject;
    import org.apache.commons.pool2.impl.DefaultPooledObject;
    import org.apache.commons.pool2.impl.GenericObjectPool;
    import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
    
    // 工厂
    class SynthesizerFactory extends BasePooledObjectFactory<SpeechSynthesizer> {
      public SpeechSynthesizer create() { return new SpeechSynthesizer(); }
      public PooledObject<SpeechSynthesizer> wrap(SpeechSynthesizer obj) {
        return new DefaultPooledObject<>(obj);
      }
    }
    
    // 对象池(全局单例)
    GenericObjectPoolConfig<SpeechSynthesizer> config = new GenericObjectPoolConfig<>();
    config.setMaxTotal(1200);
    config.setMaxIdle(1200);
    config.setMinIdle(1200);
    GenericObjectPool<SpeechSynthesizer> pool =
      new GenericObjectPool<>(new SynthesizerFactory(), config);
    
    // 在每个任务中使用
    SpeechSynthesizer synth = pool.borrowObject();
    try {
      // ... 配置参数并调用 synth
      pool.returnObject(synth);
    } catch (Exception e) {
      synth = null;  // 失败时不要归还
    }
    
    服务器配置参考:4 核 8 GiB 机器可支撑约 600 个并发 Sambert TTS 任务,对象池大小 1200,连接池大小 2000。

    最佳实践

    • Java SDK:根据实际并发量设置 connectionPoolSizemaximumAsyncRequests。连接数过少会导致阻塞,过多会增加服务器压力。
    • Python SDK:使用 with 语句管理 Session 生命周期,确保资源正确释放。
    • 选择合适的调用方式:异步应用(如 asyncio 或 FastAPI)使用异步调用,传统应用使用同步调用。
    • WebSocket 对象池:如果任务失败或仍在运行,不要将合成器归还到池中,应手动调用 close 关闭。

    性能监控

    跟踪以下指标以维护生产环境 TTS 服务的健康状态:
    指标说明目标值
    首包延迟从发送请求到收到第一个音频分片的时间< 500 ms
    端到端延迟完成整个合成的总时间取决于文本长度
    错误率失败请求的百分比< 0.1%
    池使用率已借出对象数 / 池大小峰值时 60%~80%
    连接复用率复用连接数 / 总请求数> 95%
    通过 SDK 获取这些指标:
    # TTS
    print(f"Request ID: {synthesizer.get_last_request_id()}")
    print(f"首包延迟:{synthesizer.get_first_package_delay()} ms")
    
    // TTS
    System.out.println("Request ID: " + synthesizer.getLastRequestId());
    System.out.println("首包延迟:" + synthesizer.getFirstPackageDelay() + " ms");
    

    上线检查清单

    上线前请确认以下事项:
    • API Key 存储在环境变量中,未硬编码到代码里。
    • 连接池和对象池大小已按预期峰值负载配置。
    • 池大小未超过账户的 QPS 上限。
    • 错误处理逻辑将失败对象销毁(而非归还到池中)。
    • 优雅停机时调用了 pool.shutdown()(Python)或关闭池(Java)。
    • WebSocket 连接使用了正确的区域端点。
    • 监控面板已配置首包延迟、错误率和池使用率等指标。
    • 已完成 2 倍预期峰值并发的压力测试。
    • 已实现指数退避的重试逻辑以应对瞬时故障。

    相关文档