跳转到主要内容

接收异步任务完成通知

通过 EventBridge 接收异步任务完成后的推送通知,无需轮询任务状态接口。

背景

提交异步任务(如语音识别、视频生成)后,有两种方式获取任务完成状态:
对比项轮询方式通知方式
限流影响重复请求受 API 限流限制无需额外调用 API
集成复杂度简单,仅需调用任务查询接口需配置 EventBridge
服务器资源消耗高,持续轮询消耗资源低,服务器仅需处理推送事件
实时性取决于轮询间隔接近实时
适用场景适合简单集成或低频任务推荐用于生产环境
可选方案:
  • 方案一:配置 HTTP 回调地址 — 任务完成后,EventBridge 向你的 HTTP 地址发送回调请求。
  • 方案二:配置 RocketMQ — 任务完成后,EventBridge 将事件投递到 RocketMQ Topic,由消费者自行处理。

方案一:配置 HTTP 回调地址

流程概览

步骤说明
流程1. DashScope 异步任务完成 → 2. DashScope 将事件发布到 EventBridge → 3. EventBridge 匹配事件转发规则 → 4. EventBridge 向你的 HTTP 回调地址发送 POST 请求 → 5. 你的服务器接收并处理事件
HTTP callback flow diagram
EventBridge 事件投递会产生费用,详见事件总线计费。

操作步骤

HTTP 回调地址需满足以下条件:
  • 公网可访问的 URL(HTTP 或 HTTPS)
  • 支持接收 POST 请求
  • 支持接收 CloudEvents 格式的 JSON 请求体
投递到你的地址的事件体结构如下:
{
  "datacontenttype": "application/json;charset=utf-8",
  "aliyunaccountid": "xxxxx",
  "aliyunpublishtime": "2023-10-25T01:45:16.993Z",
  "data": {
    "start_time": "2023-10-25 09:45:09",
    "user_api_unique_key": "apikey:v1:audio:asr:transcription:paraformer-8k-v1",
    "task_status": "SUCCEEDED",
    "contain_result": false,
    "end_time": "2023-10-25 09:45:16",
    "task_id": "a154c328-xxxx-xxxx-xxxx-e52a9a7e9a35",
    "region": "cn-beijing",
    "request_id": "108f38f5-xxxx-xxxx-xxxx-6504db9080b3",
    "api_key_id": "1250"
  },
  "aliyunoriginalaccountid": "xxxxxxxx",
  "specversion": "1.0",
  "aliyuneventbusname": "default",
  "id": "81765e5b-xxxx-xxxx-xxxx-bbad8dde2bd9",
  "source": "acs.dashscope",
  "time": "2023-1-25T01:45:16.969Z",
  "aliyunregionid": "cn-beijing",
  "type": "dashscope:System:AsyncTaskFinish"
}
参数类型说明示例值
datacontenttypeString数据内容类型,仅支持 application/jsonapplication/json;charset=utf-8
aliyunaccountidString阿里云账号 ID123456789098****
aliyunpublishtimeString事件被 EventBridge 接收的时间2020-11-19T21:04:42.179PRC
dataObject事件内容,即 CloudEvents 上下文对象
data[].start_timeString异步任务开始时间(格式:yyyy-MM-dd HH:mm:ss)2023-10-25 09:45:09
data[].end_timeString异步任务完成时间(格式:yyyy-MM-dd HH:mm:ss)2023-10-25 09:45:16
data[].user_api_unique_keyStringAPI 唯一标识(格式:apikey:version:group:task:function-call:model)apikey:v1:audio:asr:transcription:paraformer-8k-v1
data[].task_statusString任务状态:PENDING / RUNNING / SUCCEEDED / FAILED / CANCELED / UNKNOWNSUCCEEDED
data[].task_idString任务 IDa154c328-xxxx-xxxx-xxxx-e52a9a7e9a35
data[].regionString任务运行区域cn-beijing
data[].request_idString请求 ID108f38f5-xxxx-xxxx-xxxx-6504db9080b3
data[].api_key_idStringAPI Key ID1234
aliyunoriginalaccountidString阿里云原始账号 ID123456789098****
specversionStringCloudEvents 协议版本1.0
aliyuneventbusnameString接收该事件的事件总线名称default
idString事件唯一 ID45ef4dewdwe1-7c35-447a-bd93-fab****
sourceString事件来源,与 id 组合构成唯一标识acs.dashscope
timeString事件生成时间2020-11-19T21:04:41+08:00
aliyunregionidString接收事件的区域cn-beijing
typeString事件类型,用于路由、过滤和策略执行dashscope:System:AsyncTaskFinish
  1. 登录 EventBridge 控制台,选择 cn-beijing 地域。
    EventBridge console — event bus list
    DashScope 异步任务事件会发布到 cn-beijing 地域的 default 事件总线。
  2. 单击 事件追踪,可按时间范围或条件筛选搜索事件。
    Event tracking search
  3. 单击搜索结果中的 事件详情,可查看完整事件内容。
  1. 在 EventBridge 控制台,单击 创建规则
    Create rule
  2. 填写规则的基本信息(名称、描述、事件总线)。
    Rule basic info configuration
  3. 配置事件模式。如需接收所有 DashScope 异步任务完成事件,使用以下模式:
{
  "source": ["acs.dashscope"],
  "type": ["dashscope:System:AsyncTaskFinish"]
}
如需过滤到特定模型(例如 paraformer-8k-v1),可添加 data 过滤条件:
{
  "source": ["acs.dashscope"],
  "type": ["dashscope:System:AsyncTaskFinish"],
  "data": {
    "user_api_unique_key": [
      {"suffix": ":paraformer-8k-v1"}
    ]
  }
}
Event pattern configuration
  1. 配置事件目标(见步骤四)。
在事件目标配置中,设置以下字段:
  • 服务类型:HTTP
  • URL:你的公网可访问 HTTP 或 HTTPS 回调地址
  • Body:完整事件 — 投递完整的 CloudEvents 内容
  • 网络类型:外部地址选择公网,内部 VPC 地址选择阿里云专有网络VPC
HTTP target configuration
HTTP target confirmation

方案二:配置 RocketMQ

流程概览

步骤说明
流程1. DashScope 异步任务完成 → 2. DashScope 将事件发布到 EventBridge → 3. EventBridge 匹配事件转发规则 → 4. EventBridge 将事件消息投递到 RocketMQ Topic → 5. 你的消费者读取并处理消息
RocketMQ flow diagram
该方案会同时产生 EventBridge 和云消息队列 RocketMQ 的费用,详见事件总线计费和 RocketMQ计费。

操作步骤

  1. 登录 RocketMQ 控制台,创建 RocketMQ 5.x 实例。
    Create RocketMQ instance
  2. 在实例中创建 Topic,用于接收 EventBridge 消息。
    Create RocketMQ Topic
  3. 创建 Consumer Group,应用程序将通过该分组订阅 Topic。
    Create RocketMQ Consumer Group
  1. 登录 EventBridge 控制台,选择 cn-beijing 地域。
    EventBridge console — event bus list
    DashScope 异步任务事件会发布到 cn-beijing 地域的 default 事件总线。
  2. 单击 事件追踪,可按时间范围或条件筛选搜索事件。
    Event tracking search
  3. 单击搜索结果中的 事件详情,可查看完整事件内容。
  1. 在 EventBridge 控制台,单击 创建规则
    Create rule
  2. 填写规则的基本信息(名称、描述、事件总线)。
    Rule basic info configuration
  3. 配置事件模式。如需接收所有 DashScope 异步任务完成事件,使用以下模式:
{
  "source": ["acs.dashscope"],
  "type": ["dashscope:System:AsyncTaskFinish"]
}
如需过滤到特定模型(例如 paraformer-8k-v1),可添加 data 过滤条件:
{
  "source": ["acs.dashscope"],
  "type": ["dashscope:System:AsyncTaskFinish"],
  "data": {
    "user_api_unique_key": [
      {"suffix": ":paraformer-8k-v1"}
    ]
  }
}
Event pattern configuration
  1. 配置事件目标(见步骤四)。
在事件目标配置中,设置以下字段:
  • 服务类型:消息队列 RocketMQ 版
  • 版本:5.x
  • 实例ID:已创建的 RocketMQ 实例 ID
  • Topic:已创建的 Topic 名称
RocketMQ event target configuration
任务完成并投递事件后,可在 RocketMQ 控制台查看消息。
RocketMQ console — message list
RocketMQ console — message details
RocketMQ console — message trace
RocketMQ 控制台的消息一键收发体验功能依赖函数计算,会产生额外费用,详见函数计算计费规则。
在项目中添加 RocketMQ 5.x 客户端依赖:
<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-client-java</artifactId>
  <version>5.0.4</version>
</dependency>
使用 PushConsumer 订阅并处理消息:
import com.alibaba.fastjson2.JSON;
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;

public class ConsumerExample {
    private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerExample.class);

    private ConsumerExample() {
    }

    public static void main(String[] args) throws ClientException, IOException, InterruptedException {
        /*
          实例接入点,从控制台实例详情页的接入点页签中获取。
          如果是在阿里云ECS内网访问,建议填写VPC接入点。
          如果是在本地公网访问,或者是线下IDC环境访问,可以使用公网接入点。使用公网接入点访问,必须开启实例的公网访问功能。
         */
        String endpoints = "xxxx";
        // 指定需要订阅哪个目标Topic,Topic需要提前在控制台创建,如果不创建直接使用会返回报错。
        String topic = "xxxx";
        // 为消费者指定所属的消费者分组,Group需要提前在控制台创建,如果不创建直接使用会返回报错。
        String consumerGroup = "xxxx";
        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
        /*
          如果是使用公网接入点访问,configuration还需要设置实例的用户名和密码。用户名和密码在控制台实例详情页获取。
          如果是在阿里云ECS内网访问,无需填写该配置,服务端会根据内网VPC信息智能获取。
         */
        builder.setCredentialProvider(new StaticSessionCredentialsProvider("xxxx", "xxxx"));
        ClientConfiguration clientConfiguration = builder.build();
        // 订阅消息的过滤规则,表示订阅所有Tag的消息。
        String tag = "*";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        // 初始化SimpleConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。
        PushConsumer pushConsumer = provider.newPushConsumerBuilder()
            .setClientConfiguration(clientConfiguration)
            // 设置消费者分组。
            .setConsumerGroup(consumerGroup)
            // 设置预绑定的订阅关系。
            .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
            // 设置消费监听器。
            .setMessageListener(messageView -> {
                try {
                    // 处理消息并返回消费结果。
                    ByteBuffer buffer = messageView.getBody();
                    ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity());
                    for (int i = 0; i < buffer.capacity(); i++) {
                        newBuffer.put(buffer.get(i));
                    }
                    String result = new String(newBuffer.array());
                    LOGGER.info("Consume message={}", JSON.toJSONString(result));
                    System.out.println(result);
                    return ConsumeResult.SUCCESS;
                } catch (Exception e) {
                    LOGGER.error("deal message has error", e);
                    return ConsumeResult.FAILURE;
                }
            })
            .build();
        Thread.sleep(Long.MAX_VALUE);

        // 如果不需要再使用PushConsumer,可关闭该进程。
        pushConsumer.close();
    }
}

常见问题

可以。一个事件转发规则支持配置多个事件目标,事件匹配规则后会同时投递到所有目标。
最常见的原因是地域不一致。DashScope 会将异步任务事件发布到 cn-beijing 地域的 EventBridge。如果你的事件规则创建在其他地域,将无法收到 DashScope 事件。请确认你的规则配置在 cn-beijing 地域。
EventBridge regions diagram
请检查以下内容:
  1. 确认地址 URL 公网可访问,且在超时时间范围内返回 2xx 状态码。
  2. 确认服务器支持接收 POST 请求和 JSON 格式的请求体。
  3. 确认没有防火墙或安全组规则拦截来自 EventBridge IP 段的入站请求。
  4. 检查服务器日志,确认处理事件载荷时没有出现应用层错误。
接收异步任务完成通知 - 千问云