跳转到主要内容
Sambert

高并发场景调用 DashScope Java SDK 最佳实践

本文介绍在高并发场景下,如何通过 DashScope Java SDK 高效调用 Sambert 语音合成服务。

Sambert 语音合成服务使用 WebSocket 协议,在高并发场景下,频繁创建 WebSocket 连接会增加连接耗时并消耗大量资源。 在使用 DashScope Java SDK 时,您可以根据服务器的实际情况,通过合理设置连接池和对象池的大小来降低运行开销。

前提条件

推荐配置

连接池和对象池不是越多越好,过少或过多都会导致程序运行变慢。建议您根据自己服务器的实际规格进行配置。 在服务器上只运行 Sambert 语音合成服务的情况下,进行测试后得到了如下推荐配置供您参考:
常见机器配置(阿里云)单机最大并发数对象池大小连接池大小
4核8GiB60012002000
单机并发数指的是同一时刻正在运行的 Sambert 语音合成任务的数量,也可以理解为工作线程数。
在高并发调用时,同一个对象会复用同一个 WebSocket 连接,因此 WebSocket 连接只会在服务启动时创建。需要注意的是,同时创建过多的 WebSocket 连接会导致阻塞,因此在实际启动服务时应逐步提高单机并发数。

可配置参数

连接池

DashScope Java SDK 使用了 OkHttp3 提供的连接池来复用 WebSocket 连接,从而减少频繁创建 WebSocket 连接的耗时和资源开销。 连接池是 DashScope SDK 默认开启的优化项,您需要根据使用场景配置连接池的大小。 请在运行 Java 服务前,通过环境变量的方式提前按需配置好连接池的相关参数。连接池配置参数如下:
参数说明
DASHSCOPE_CONNECTION_POOL_SIZE配置连接池大小。默认值为 32。推荐配置为您的峰值并发数的 2 倍以上。
DASHSCOPE_MAXIMUM_ASYNC_REQUESTS配置最大异步请求数。默认值为 32。推荐配置为和连接池大小一致。更多信息参见 OkHttp3 Dispatcher 参考文档
DASHSCOPE_MAXIMUM_ASYNC_REQUESTS_PER_HOST配置单 host 最大异步请求数。默认值为 32。推荐配置为和连接池大小一致。更多信息参见 OkHttp3 Dispatcher 参考文档

对象池

推荐使用对象池的方式来复用 SpeechSynthesizer 对象,这样可以进一步降低反复创建和销毁对象带来的内存和时间开销。 请在运行 Java 服务前,通过环境变量或代码的方式提前按需配置好对象池的大小。对象池配置参数如下:
参数说明
SAMBERT_OBJECTPOOL_SIZE对象池大小。推荐配置为您的峰值并发数的 1.5~2 倍。对象池大小需要小于或等于连接池大小,否则会出现对象等待连接的情况,导致调用阻塞。
关于如何配置环境变量,可参考配置 API Key 到环境变量

示例代码

以下为使用资源池的示例代码。其中,对象池为全局单例对象。
  • 每个主账号默认每秒可提交 3 个 Sambert 语音合成任务。如需开通更高 QPS,请联系我们。
  • 示例代码中,不同的线程通过等待随机时间来避免同时创建过多的 WebSocket 连接。
以 Maven 和 Gradle 为例,配置如下:
  • Maven
  • Gradle
  1. 打开您的 Maven 项目的 pom.xml 文件。
  2. <dependencies> 标签内添加以下依赖信息。
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>dashscope-sdk-java</artifactId>
    <!-- 请将 'the-latest-version' 替换为2.16.9及以上版本,可在如下链接查询相关版本号:https://mvnrepository.com/artifact/com.alibaba/dashscope-sdk-java -->
    <version>the-latest-version</version>
</dependency>

<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
    <!-- 请将 'the-latest-version' 替换为最新版本,可在如下链接查询相关版本号:https://mvnrepository.com/artifact/org.apache.commons/commons-pool2 -->
    <version>the-latest-version</version>
</dependency>
  1. 保存 pom.xml 文件。
  2. 使用 Maven 命令(如 mvn clean installmvn compile)来更新项目依赖。
import com.alibaba.dashscope.audio.tts.SpeechSynthesisAudioFormat;
import com.alibaba.dashscope.audio.tts.SpeechSynthesisParam;
import com.alibaba.dashscope.audio.tts.SpeechSynthesisResult;
import com.alibaba.dashscope.audio.tts.SpeechSynthesizer;
import com.alibaba.dashscope.common.ResultCallback;
import com.alibaba.dashscope.exception.NoApiKeyException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;

/**
 * Before making high-concurrency calls to the TTS service,
 * please configure the connection pool size through following environment
 * variables.
 *
 * DASHSCOPE_MAXIMUM_ASYNC_REQUESTS=2000
 * DASHSCOPE_MAXIMUM_ASYNC_REQUESTS_PER_HOST=2000
 * DASHSCOPE_CONNECTION_POOL_SIZE=2000
 *
 * The default is 32, and it is recommended to set it to 2 times the maximum
 * concurrent connections of a single server.
 */

@Slf4j
public class SynthesizeTextToSpeechUsingSambertConcurrently {
    public static void checkoutEnv(String envName, int defaultSize) {
        if (System.getenv(envName) != null) {
            System.out.println("[ENV CHECK]: " + envName + " "
                    + System.getenv(envName));
        } else {
            System.out.println("[ENV CHECK]: " + envName
                    + " Using Default which is " + defaultSize);
        }
    }

    public static void main(String[] args)
            throws InterruptedException, NoApiKeyException {

        // Check for connection pool env
        checkoutEnv("DASHSCOPE_CONNECTION_POOL_SIZE", 32);
        checkoutEnv("DASHSCOPE_MAXIMUM_ASYNC_REQUESTS", 32);
        checkoutEnv(SambertObjectPool.SAMBERT_OBJECTPOOL_SIZE_ENV, SambertObjectPool.DEFAULT_CONNECTION_POOL_SIZE);
        checkoutEnv("DASHSCOPE_MAXIMUM_ASYNC_REQUESTS_PER_HOST", 32);

        // Record task start time
        int runTimes = 1;

        // Create the pool of SpeechSynthesis objects
        ExecutorService executorService = Executors.newFixedThreadPool(runTimes);

        for (int i = 0; i < runTimes; i++) {
            executorService.submit(new SynthesizeTask(new String[]{
                    "床前明月光,",
                    "疑似地上霜。",
                    "举头望明月,",
                    "低头思故乡。"
            }));
        }

        // Shut down the ExecutorService and wait for all tasks to complete
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.MINUTES);
        System.exit(0);
    }
}

class SpeechSynthesizerObjectFactory
        extends BasePooledObjectFactory<SpeechSynthesizer> {
    public SpeechSynthesizerObjectFactory() {
        super();
    }
    @Override
    public SpeechSynthesizer create() throws Exception {
        return new SpeechSynthesizer();
    }

    @Override
    public PooledObject<SpeechSynthesizer> wrap(SpeechSynthesizer obj) {
        return new DefaultPooledObject<>(obj);
    }
}

class SambertObjectPool {
    public static GenericObjectPool<SpeechSynthesizer> synthesizerPool;
    public static String SAMBERT_OBJECTPOOL_SIZE_ENV = "SAMBERT_OBJECTPOOL_SIZE";
    public static int DEFAULT_CONNECTION_POOL_SIZE = 500;
    private static Lock lock = new java.util.concurrent.locks.ReentrantLock();
    public static int getObjectivePoolSize() {
        try {
            Integer n = Integer.parseInt(System.getenv(SAMBERT_OBJECTPOOL_SIZE_ENV));
            return n;
        } catch (NumberFormatException e) {
            return DEFAULT_CONNECTION_POOL_SIZE;
        }
    }
    public static GenericObjectPool<SpeechSynthesizer> getInstance() {
        lock.lock();
        if (synthesizerPool == null) {
            // You can set the object pool size here. or in environment variable
            // SAMBERT_OBJECTPOOL_SIZE It is recommended to set it to 1.5 to 2 times
            // your server's maximum concurrent connections.
            int objectPoolSize = getObjectivePoolSize();
            SpeechSynthesizerObjectFactory speechSynthesizerObjectFactory =
                    new SpeechSynthesizerObjectFactory();
            GenericObjectPoolConfig<SpeechSynthesizer> config =
                    new GenericObjectPoolConfig<>();
            config.setMaxTotal(objectPoolSize);
            config.setMaxIdle(objectPoolSize);
            config.setMinIdle(objectPoolSize);
            synthesizerPool =
                    new GenericObjectPool<>(speechSynthesizerObjectFactory, config);
        }
        lock.unlock();
        return synthesizerPool;
    }
}

class SynthesizeTask implements Runnable {
    String[] textList;
    String requestId;
    long timeCost;
    public SynthesizeTask(String[] textList) {
        this.textList = textList;
    }
    @Override
    public void run() {
        // sleep random time before start task, avoid creating too much websocket at the same time.
        Random random = new Random();
        try {
            Thread.sleep(random.nextInt(30*1000));
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        for (String text:textList) {
            SpeechSynthesizer synthesizer = null;
            long startTime = System.currentTimeMillis();

            try {
                CountDownLatch latch = new CountDownLatch(1);
                class ReactCallback extends ResultCallback<SpeechSynthesisResult> {
                    ReactCallback() {}

                    @Override
                    public void onEvent(SpeechSynthesisResult message) {
                        if (message.getAudioFrame() != null) {
                            try {
                                byte[] bytesArray = message.getAudioFrame().array();
                            } catch (Exception e) {
                                throw new RuntimeException(e);
                            }
                        }
                    }

                    @Override
                    public void onComplete() {
                        latch.countDown();
                    }

                    @Override
                    public void onError(Exception e) {
                        System.out.println(e.getMessage());
                        e.printStackTrace();
                        latch.countDown();
                    }
                }

                // you can set your dashscope apikey here by code or in environment
                // variable DASHSCOPE_API_KEY
                String dashScopeApiKey = System.getenv("DASHSCOPE_API_KEY");

                SpeechSynthesisParam param =
                        SpeechSynthesisParam.builder()
                                .model("sambert-zhichu-v1")
                                .format(SpeechSynthesisAudioFormat.MP3) // 使用PCM或者MP3
                                .text(text)
                                .enablePhonemeTimestamp(true)
                                .enableWordTimestamp(true)
                                .apiKey(dashScopeApiKey)
                                .build();

                try {
                    synthesizer = SambertObjectPool.getInstance().borrowObject();
                    synthesizer.call(param, new ReactCallback());
                    try {
                        latch.await();
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                    requestId = synthesizer.getLastRequestId();
                } catch (Exception e) {
                    System.out.println("Exception e: " + e.toString());
                    synthesizer.getSyncApi().close(1000, "bye");
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            } finally {
                if (synthesizer != null) {
                    try {
                        // Return the SpeechSynthesizer object to the pool
                        SambertObjectPool.getInstance().returnObject(synthesizer);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
            long endTime = System.currentTimeMillis();
            timeCost = endTime - startTime;
            System.out.println("[线程" + Thread.currentThread() + "] 语音合成任务:(" + text + ")结束。耗时" + timeCost + "ms, RequestId" + requestId);
        }
    }
}

异常处理

  • 在服务出现 TaskFailed 报错时,不需要额外处理。
  • 如果在语音合成中途,客户端出现错误(如 SDK 内部异常或业务逻辑异常)导致语音合成任务未完成,则需要您主动关闭连接。
关闭连接方法如下:
// 将下面这段代码放在try-catch块中
synthesizer.getSyncApi().close(1000, "bye");

常见异常

出错原因类型一每一个 SDK 对象创建时都会申请一个连接。如果没有使用对象池,每一次任务结束后对象都被析构。此时这一个连接将进入无引用状态,需要等待 61s 后服务端报错连接超时才会真正断开,这会导致这个连接在 61 秒内不可复用。在高并发场景下,新的任务在发现没有可复用连接时会创建新连接,会造成如下后果:
  1. 连接数持续上升。
  2. 由于连接数过多,服务器资源不足,服务器卡顿。
  3. 连接池被打满、新任务由于启动时需要等待可用连接而阻塞。
类型二对象池配置的 MaxIdle 小于 MaxTotal,导致在对象闲置时,超过 MaxIdle 的对象被销毁,从而造成连接泄漏。泄漏的连接需要等待 61 秒超时后断连,同类型一造成连接数持续上升。解决方法对于类型一,使用对象池解决。对于类型二,检查对象池配置参数,设置 MaxIdle 和 MaxTotal 相等,关闭对象池自动销毁策略解决。
同"异常 1",连接池已经达到最大连接限制,新的任务需要等待无引用状态的连接 61 秒触发超时后才可以获得连接。
出错原因在高并发调用时,同一个对象会复用同一个 WebSocket 连接,因此 WebSocket 连接只会在服务启动时创建。需要注意的是,任务启动阶段如果立刻开始较高并发调用,同时创建过多的 WebSocket 连接会导致阻塞。解决方法启动服务后逐步提升并发量,或增加预热任务。
出错原因这是由于出现了客户端报错后,服务端不知道客户端出错,连接处于任务中状态。此时连接和对象被复用并开启下一个任务,导致流程错误,下一个任务失败。解决方法在抛出异常后主动关闭 WebSocket 连接后归还对象池。
出错原因同时创建过多 WebSocket 连接导致阻塞,但业务流量持续打进来,导致任务短时间积压,并且在阻塞后所有积压任务立刻调用。这会造成调用量尖刺,并且有可能造成瞬时超过账号的并发数限制导致部分任务失败、服务器卡顿等。这种瞬间创建过多 WebSocket 的情况多发生于:
  • 服务启动阶段
  • 网络出现异常,大量 WebSocket 连接同时中断重连
  • 某一时刻出现大量服务端报错,导致大量 WebSocket 重连。常见报错如并发数超过账号限制("Requests rate limit exceeded, please try again later.")。
解决方法
  1. 检查网络情况。
  2. 排查尖刺前是否出现大量其他服务端报错。
  3. 提高账号并发限制。
  4. 调小对象池和连接池大小,通过对象池上限限制最大并发数。
  5. 提升服务器配置或扩充机器数。
解决方法
  1. 检查是否已经达到网络带宽上限。
  2. 检查实际并发数是否已经过高。