跳转到主要内容
实时

高并发场景下实时语音识别的性能优化

介绍如何利用DashScope Java SDK中的连接池与对象池机制,在高并发场景下高效调用Paraformer实时语音识别服务。

Paraformer 实时语音识别服务基于 WebSocket 协议,以支持流式实时通信。然而,在高并发场景下,为每个请求独立创建和销毁 WebSocket 连接会产生巨大的网络与系统资源开销,并引入显著的连接延迟。为优化性能并确保稳定性,DashScope SDK 内置了高效的资源复用机制(如连接池与对象池)。本文档将详细介绍如何利用 DashScope Java SDK 中的这些特性,在高并发场景下高效调用 Paraformer 实时语音识别服务。 关于模型介绍和选型建议请参见实时语音识别概述

前提条件

Java SDK 通过内置的连接池和自定义的对象池协同工作,实现最佳性能。
  • 连接池:SDK 内部集成的 OkHttp3 连接池,负责管理和复用底层的 WebSocket 连接,减少网络握手开销。此功能默认开启。
  • 对象池:基于 commons-pool2 实现,用于维护一组已预先建立好连接的 Recognition 对象。从池中获取对象可消除连接建立的延迟,显著降低首包延迟。

实现步骤

1
添加依赖
2
根据项目构建工具,在依赖配置文件中添加 dashscope-sdk-java 和 commons-pool2。
3
以Maven和Gradle为例,配置如下:
4
Maven
<dependency>
  <groupId>com.alibaba</groupId>
  <artifactId>dashscope-sdk-java</artifactId>
  <!-- 请将 'the-latest-version' 替换为2.16.9及以上版本,可在如下链接查询相关版本号:https://mvnrepository.com/artifact/com.alibaba/dashscope-sdk-java -->
  <version>the-latest-version</version>
</dependency>

<dependency>
  <groupId>org.apache.commons</groupId>
  <artifactId>commons-pool2</artifactId>
  <!-- 请将 'the-latest-version' 替换为最新版本,可在如下链接查询相关版本号:https://mvnrepository.com/artifact/org.apache.commons/commons-pool2 -->
  <version>the-latest-version</version>
</dependency>
Gradle
dependencies {
  // 请将 'the-latest-version' 替换为2.16.9及以上版本,可在如下链接查询相关版本号:https://mvnrepository.com/artifact/com.alibaba/dashscope-sdk-java
  implementation group: 'com.alibaba', name: 'dashscope-sdk-java', version: 'the-latest-version'

  // 请将 'the-latest-version' 替换为最新版本,可在如下链接查询相关版本号:https://mvnrepository.com/artifact/org.apache.commons/commons-pool2
  implementation group: 'org.apache.commons', name: 'commons-pool2', version: 'the-latest-version'
}
5
Gradle 项目保存文件后,在命令行切换到项目根目录,执行以下命令更新依赖:
6
./gradlew build --refresh-dependencies
7
Windows 系统使用:
8
gradlew build --refresh-dependencies
9
配置连接池
10
通过环境变量配置连接池关键参数:
11
环境变量描述DASHSCOPE_CONNECTION_POOL_SIZE连接池大小。推荐值:峰值并发数的 2 倍以上。默认值:32。DASHSCOPE_MAXIMUM_ASYNC_REQUESTS最大异步请求数。推荐值:与 DASHSCOPE_CONNECTION_POOL_SIZE 保持一致。默认值:32。DASHSCOPE_MAXIMUM_ASYNC_REQUESTS_PER_HOST单主机最大异步请求数。推荐值:与 DASHSCOPE_CONNECTION_POOL_SIZE 保持一致。默认值:32。
12
配置对象池
13
通过环境变量配置对象池大小:
14
环境变量描述RECOGNITION_OBJECTPOOL_SIZE对象池大小。推荐值:峰值并发数的 1.5 至 2 倍。默认值:500。
15
  • 对象池的大小(RECOGNITION_OBJECTPOOL_SIZE)必须小于或等于连接池的大小(DASHSCOPE_CONNECTION_POOL_SIZE)。否则,当对象池请求对象时,若连接池已满,会导致调用线程阻塞,等待可用连接。
  • 对象池大小不应超过您账户的 QPS(每秒查询率)限制。
16
通过如下代码创建对象池:
17
class RecognitionObjectPool {
  // 这里省略其它代码,完整示例请参见完整代码
  public static GenericObjectPool<Recognition> getInstance() {
    lock.lock();
    if (recognitionGenericObjectPool == null) {
      // 您可以在这里设置对象池的大小。或在环境变量RECOGNITION_OBJECTPOOL_SIZE中设置。
      // 建议设置为服务器最大并发连接数的1.5到2倍。
      int objectPoolSize = getObjectivePoolSize();
      System.out.println("RECOGNITION_OBJECTPOOL_SIZE: "
          + objectPoolSize);
      RecognitionObjectFactory recognitionObjectFactory =
          new RecognitionObjectFactory();
      GenericObjectPoolConfig<Recognition> config =
          new GenericObjectPoolConfig<>();
      config.setMaxTotal(objectPoolSize);
      config.setMaxIdle(objectPoolSize);
      config.setMinIdle(objectPoolSize);
      recognitionGenericObjectPool =
          new GenericObjectPool<>(recognitionObjectFactory, config);
    }
    lock.unlock();
    return recognitionGenericObjectPool;
  }
}
18
从对象池中获取 Recognition 对象
19
如果当前未归还的对象数量已超过对象池的最大容量,系统会额外创建一个新的 Recognition 对象。此类新创建的对象需要重新进行初始化并建立 WebSocket 连接,无法利用对象池的既有连接资源,因此不具备复用效果。
20
recognizer = RecognitionObjectPool.getInstance().borrowObject();
21
进行语音识别
22
调用 Recognition 对象的 callstreamCall 方法进行语音识别。
23
归还 Recognition 对象
24
语音识别任务结束后,归还 Recognition 对象,以便后续任务可以复用该对象。不要归还未完成任务或任务失败的对象。
25
RecognitionObjectPool.getInstance().returnObject(recognizer);

完整代码

package org.alibaba.bailian.example.examples;

import com.alibaba.dashscope.audio.asr.recognition.Recognition;
import com.alibaba.dashscope.audio.asr.recognition.RecognitionParam;
import com.alibaba.dashscope.audio.asr.recognition.RecognitionResult;
import com.alibaba.dashscope.common.ResultCallback;
import com.alibaba.dashscope.exception.NoApiKeyException;
import com.alibaba.dashscope.utils.ApiKey;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import java.io.FileInputStream;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;

/**
 * Before making high-concurrency calls to the ASR service,
 * please configure the connection pool size through following environment
 * variables.
 *
 * DASHSCOPE_MAXIMUM_ASYNC_REQUESTS=2000
 * DASHSCOPE_MAXIMUM_ASYNC_REQUESTS_PER_HOST=2000
 * DASHSCOPE_CONNECTION_POOL_SIZE=2000
 *
 * The default is 32, and it is recommended to set it to 2 times the maximum
 * concurrent connections of a single server.
 */
public class Main {
  public static void checkoutEnv(String envName, int defaultSize) {
    if (System.getenv(envName) != null) {
      System.out.println("[ENV CHECK]: " + envName + " "
          + System.getenv(envName));
    } else {
      System.out.println("[ENV CHECK]: " + envName
          + " Using Default which is " + defaultSize);
    }
  }

  public static void main(String[] args)
      throws NoApiKeyException, InterruptedException {
    // Check for connection pool env
    checkoutEnv("DASHSCOPE_CONNECTION_POOL_SIZE", 32);
    checkoutEnv("DASHSCOPE_MAXIMUM_ASYNC_REQUESTS", 32);
    checkoutEnv("DASHSCOPE_MAXIMUM_ASYNC_REQUESTS_PER_HOST", 32);
    checkoutEnv(RecognitionObjectPool.RECOGNITION_OBJECTPOOL_SIZE_ENV, RecognitionObjectPool.DEFAULT_OBJECT_POOL_SIZE);

    int threadNums = 3;
    String currentDir = System.getProperty("user.dir");
    // Please replace the path with your audio source
    Path[] filePaths = {
        Paths.get(currentDir, "asr_example.wav"),
        Paths.get(currentDir, "asr_example.wav"),
        Paths.get(currentDir, "asr_example.wav"),
    };
    // Use ThreadPool to run recognition tasks
    ExecutorService executorService = Executors.newFixedThreadPool(threadNums);
    for (int i = 0; i < threadNums; i++) {
      executorService.submit(new RealtimeRecognizeTask(filePaths));
    }
    executorService.shutdown();
    // wait for all tasks to complete
    executorService.awaitTermination(10, TimeUnit.MINUTES);
    System.exit(0);
  }
}

class RecognitionObjectFactory extends BasePooledObjectFactory<Recognition> {
  public RecognitionObjectFactory() {
    super();
  }

  @Override
  public Recognition create() throws Exception {
    return new Recognition();
  }

  @Override
  public PooledObject<Recognition> wrap(Recognition obj) {
    return new DefaultPooledObject<>(obj);
  }
}

class RecognitionObjectPool {
  public static GenericObjectPool<Recognition> recognitionGenericObjectPool;
  public static String RECOGNITION_OBJECTPOOL_SIZE_ENV =
      "RECOGNITION_OBJECTPOOL_SIZE";
  public static int DEFAULT_OBJECT_POOL_SIZE = 500;
  private static Lock lock = new java.util.concurrent.locks.ReentrantLock();

  public static int getObjectivePoolSize() {
    try {
      Integer n = Integer.parseInt(System.getenv(RECOGNITION_OBJECTPOOL_SIZE_ENV));
      return n;
    } catch (NumberFormatException e) {
      return DEFAULT_OBJECT_POOL_SIZE;
    }
  }

  public static GenericObjectPool<Recognition> getInstance() {
    lock.lock();
    if (recognitionGenericObjectPool == null) {
      // You can set the object pool size here. or in environment variable
      // RECOGNITION_OBJECTPOOL_SIZE It is recommended to set it to 1.5 to 2
      // times your server's maximum concurrent connections.
      int objectPoolSize = getObjectivePoolSize();
      System.out.println("RECOGNITION_OBJECTPOOL_SIZE: "
          + objectPoolSize);
      RecognitionObjectFactory recognitionObjectFactory =
          new RecognitionObjectFactory();
      GenericObjectPoolConfig<Recognition> config =
          new GenericObjectPoolConfig<>();
      config.setMaxTotal(objectPoolSize);
      config.setMaxIdle(objectPoolSize);
      config.setMinIdle(objectPoolSize);
      recognitionGenericObjectPool =
          new GenericObjectPool<>(recognitionObjectFactory, config);
    }
    lock.unlock();
    return recognitionGenericObjectPool;
  }
}

class RealtimeRecognizeTask implements Runnable {
  private static final Object lock = new Object();
  private Path[] filePaths;

  public RealtimeRecognizeTask(Path[] filePaths) {
    this.filePaths = filePaths;
  }

  /**
   * Set your DashScope API key. In
   * fact, if you have set DASHSCOPE_API_KEY in your environment variable, you
   * can ignore this, and the SDK will automatically get the api_key from the
   * environment variable
   */
  private static String getDashScopeApiKey() throws NoApiKeyException {
    String dashScopeApiKey = null;
    try {
      ApiKey apiKey = new ApiKey();
      dashScopeApiKey =
          ApiKey.getApiKey(null); // Retrieve from environment variable.
    } catch (NoApiKeyException e) {
      System.out.println("No API key found in environment.");
    }
    if (dashScopeApiKey == null) {
      // If you cannot set api_key in your environment variable,
      // you can set it here by code
      dashScopeApiKey = "your-dashscope-apikey";
    }
    return dashScopeApiKey;
  }

  public void runCallback() {
    for (Path filePath : filePaths) {
      // Create recognition params
      // you can customize the recognition parameters, like model, format,
      // sample_rate
      RecognitionParam param = null;
      try {
        param =
            RecognitionParam.builder()
                .model("paraformer-realtime-v2")
                .format(
                    "pcm") // 'pcm'、'wav'、'opus'、'speex'、'aac'、'amr', you
                // can check the supported formats in the document
                .sampleRate(16000) // supported 8000、16000
                .apiKey(getDashScopeApiKey()) // use getDashScopeApiKey to get
                // api key.
                .build();
      } catch (Exception e) {
        throw new RuntimeException(e);
      }

      Recognition recognizer = null;
      // if recv onError
      final boolean[] hasError = {false};
      try {
        recognizer = RecognitionObjectPool.getInstance().borrowObject();

        String threadName = Thread.currentThread().getName();

        ResultCallback<RecognitionResult> callback =
            new ResultCallback<RecognitionResult>() {
              @Override
              public void onEvent(RecognitionResult message) {
                synchronized (lock) {
                  if (message.isSentenceEnd()) {
                    System.out.println("[process " + threadName
                        + "] Fix:" + message.getSentence().getText());
                  } else {
                    System.out.println("[process " + threadName
                        + "] Result: " + message.getSentence().getText());
                  }
                }
              }

              @Override
              public void onComplete() {
                System.out.println("[" + threadName + "] Recognition complete");
              }

              @Override
              public void onError(Exception e) {
                System.out.println("[" + threadName
                    + "] RecognitionCallback error: " + e.getMessage());
                hasError[0] = true;
              }
            };
        // Please replace the path with your audio file path
        System.out.println(
            "[" + threadName + "] Input file_path is: " + filePath);
        FileInputStream fis = null;
        // Read file and send audio by chunks
        try {
          fis = new FileInputStream(filePath.toFile());
        } catch (Exception e) {
          System.out.println("Error when loading file: " + filePath);
          e.printStackTrace();
        }
        // set param & callback
        recognizer.call(param, callback);

        // chunk size set to 100 ms for 16KHz sample rate
        byte[] buffer = new byte[3200];
        int bytesRead;
        // Loop to read chunks of the file
        while ((bytesRead = fis.read(buffer)) != -1) {
          ByteBuffer byteBuffer;
          if (bytesRead < buffer.length) {
            byteBuffer = ByteBuffer.wrap(buffer, 0, bytesRead);
          } else {
            byteBuffer = ByteBuffer.wrap(buffer);
          }
          // Send the ByteBuffer to the recognition instance
          recognizer.sendAudioFrame(byteBuffer);
          Thread.sleep(100);
          buffer = new byte[3200];
        }
        System.out.println(
            "[" + threadName + "] send audio done");
        recognizer.stop();
        System.out.println(
            "[" + threadName + "] asr task finished");
      } catch (Exception e) {
        e.printStackTrace();
        hasError[0] = true;
      }
      if (recognizer != null) {
        try {
          if (hasError[0] == true) {
            // Invalid the recognition object error.
            recognizer.getDuplexApi().close(1000, "bye");
            RecognitionObjectPool.getInstance().invalidateObject(recognizer);
          } else {
            // Return the recognition object to the pool if no error or exception.
            RecognitionObjectPool.getInstance().returnObject(recognizer);
          }
        } catch (Exception e) {
          e.printStackTrace();
        }
      }
    }
  }

  @Override
  public void run() {
    runCallback();
  }
}

推荐配置

以下配置基于在指定规格的阿里云服务器上仅运行 Paraformer 实时语音识别服务的测试结果。过高的并发数可能导致任务处理延迟。 其中单机并发数指的是同一时刻正在运行的 Paraformer 实时语音识别任务数,也可以理解为工作线程数。
机器配置(阿里云)单机最大并发数对象池大小连接池大小
4核8GiB1005002000
8核16GiB2005002000
16核32GiB4005002000

资源管理与异常处理

  • 任务成功:当语音识别任务正常完成时,必须调用 GenericObjectPoolreturnObject 方法将 Recognition 对象归还到池中,以便复用。在当前代码中,对应 RecognitionObjectPool.getInstance().returnObject(recognizer)
    不要归还未完成任务或任务失败的 Recognition 对象。
  • 任务失败:当 SDK 内部或业务逻辑抛出异常导致任务中断时,必须执行以下两个操作:
    1. 主动关闭底层的 WebSocket 连接
    2. 从对象池中废弃该对象,防止被再次使用
// 关闭连接
recognizer.getDuplexApi().close(1000, "bye");
// 在对象池中废弃出现异常的recognizer
RecognitionObjectPool.getInstance().invalidateObject(recognizer);
  • TaskFailed 报错:在服务出现 TaskFailed 报错时,不需要额外处理。

调用预热与耗时统计说明

在对 DashScope Java SDK 进行并发调用延迟等性能评估时,建议在正式测试前执行充分的预热操作。预热能够确保测量结果准确反映服务在稳定状态下的真实性能,避免因初始连接耗时导致的数据偏差。

连接复用机制

DashScope Java SDK 通过全局单例的连接池高效管理和复用 WebSocket 连接,旨在减少频繁建连和断连的开销,提升高并发场景下的处理能力。 该机制的工作特点如下:
  • 按需创建:SDK 不会在服务启动时预创建 WebSocket 连接,而是在首次调用时按需建立。
  • 限时复用:请求完成后,连接将在池中保留最多 60 秒以备复用。
    • 若 60 秒内有新请求,将复用现有连接,避免重复握手开销。
    • 若连接空闲超过 60 秒,将被自动关闭以释放资源。

预热的重要性

在以下场景中,连接池中可能没有可复用的活跃连接,导致请求需要新建连接:
  • 应用刚启动,尚未发起任何调用。
  • 服务空闲时间超过 60 秒,池中连接已因超时而关闭。
在这些场景下,首次或初期请求会触发完整的 WebSocket 建连过程(包括 TCP 握手、TLS 加密协商和协议升级),其端到端延迟会显著高于后续复用连接的请求。这部分额外耗时源于网络连接初始化,并非服务本身的处理延迟。因此,若未进行预热,性能测试结果会因包含初始建连时间而产生偏差。

推荐做法

为获取可靠的性能数据,在正式进行性能压测或延迟统计前,请遵循以下预热步骤:
  1. 模拟正式测试的并发级别,提前发起一定数量的调用(例如,持续 1-2 分钟),以充分填充连接池。
  2. 确认连接池已建立并维持足够的活跃连接后,再开始正式的性能数据采集。
通过合理的预热,可使 SDK 连接池进入稳定复用状态,从而测量出更具代表性的延迟指标,真实反映服务在线上平稳运行时的性能。

Java SDK 常见异常

出错原因类型一每一个 SDK 对象创建时都会申请一个连接。如果没有使用对象池,每一次任务结束后对象都被析构。此时这一个连接将进入无引用状态,需要等待 61 秒后服务端报错连接超时才会真正断开,这会导致这个连接在 61 秒内不可复用。在高并发场景下,新的任务在发现没有可复用连接时会创建新连接,会造成如下后果:
  1. 连接数持续上升。
  2. 由于连接数过多,服务器资源不足,服务器卡顿。
  3. 连接池被打满、新任务由于启动时需要等待可用连接而阻塞。
类型二对象池配置的 MaxIdle 小于 MaxTotal,导致在对象闲置时,超过 MaxIdle 的对象被销毁,从而造成连接泄漏。泄漏的连接需要等待 61 秒超时后断连,同类型一造成连接数持续上升。解决方法对于类型一,使用对象池解决。对于类型二,检查对象池配置参数,设置 MaxIdle 和 MaxTotal 相等,关闭对象池自动销毁策略解决。
同"异常 1",连接池已经达到最大连接限制,新的任务需要等待无引用状态的连接 61 秒触发超时后才可以获得连接。
出错原因在高并发调用时,同一个对象会复用同一个 WebSocket 连接,因此 WebSocket 连接只会在服务启动时创建。需要注意的是,任务启动阶段如果立刻开始较高并发调用,同时创建过多的 WebSocket 连接会导致阻塞。解决方法启动服务后逐步提升并发量,或增加预热任务。
出错原因这是由于出现了客户端报错后,服务端不知道客户端出错,连接处于任务中状态。此时连接和对象被复用并开启下一个任务,导致流程错误,下一个任务失败。解决方法在抛出异常后主动关闭 WebSocket 连接后归还对象池。
出错原因同时创建过多 WebSocket 连接导致阻塞,但业务流量持续打进来,导致任务短时间积压,并且在阻塞后所有积压任务立刻调用。这会造成调用量尖刺,并且有可能造成瞬时超过账号的并发数限制导致部分任务失败、服务器卡顿等。这种瞬间创建过多 WebSocket 的情况多发生于:
  • 服务启动阶段
  • 网络出现异常,大量 WebSocket 连接同时中断重连
  • 某一时刻出现大量服务端报错,导致大量 WebSocket 重连。常见报错如并发数超过账号限制("Requests rate limit exceeded, please try again later.")。
解决方法
  1. 检查网络情况。
  2. 排查尖刺前是否出现大量其他服务端报错。
  3. 提高账号并发限制。
  4. 调小对象池和连接池大小,通过对象池上限限制最大并发数。
  5. 提升服务器配置或扩充机器数。
解决方法
  1. 检查是否已经达到网络带宽上限。
  2. 检查实际并发数是否已经过高。