速率限制的工作原理
速率限制控制您的账户每分钟可以对每个模型消耗的 API 请求数和 Token 数。限制分为两种:
- RPM(Requests Per Minute):每分钟最大 API 调用次数。
- TPM(Tokens Per Minute):每分钟最大处理 Token 数。
速率限制在账户级别生效,同一账户下的所有业务空间和 API Key 共享配额。
速率限制同时按秒生效:RPS = RPM / 60,TPS = TPM / 60。即使每分钟的总用量未超限,单秒内的突发请求也可能触发限流。
查看速率限制
前往用量分析页面,查看账户下每个模型的速率限制和实时用量。
用量分析页面显示以下信息:
- 汇总指标:所选时间范围内的总调用次数、失败次数、平均首包耗时和平均延迟。
- 按模型明细:展示每个模型的平均 TPM、平均 RPM、总请求数、成功请求数、平均成功率、平均首包耗时和平均延迟。
使用时间范围选择器(如 3 小时、24 小时)调整监控窗口。
按业务空间设置速率限制
您可以在业务空间中为单个模型设置自定义的 RPM 和 TPM 限制。
进入业务空间页面
前往设置 > 业务空间,点击子业务空间的编辑按钮。 添加模型并设置限制
在模型权限下,点击全部模型添加模型。为每个模型设置 Times / min(RPM)和 Token / min(TPM),然后点击应用。
为业务空间设置的 RPM 和 TPM 不能超过该模型的账户级别限制。默认业务空间使用账户级别限制,无法修改。
临时提升频率限制
如果某个模型需要更高的吞吐量,可以通过账户设置申请临时提升。
申请提升
点击临时提升频率限制,选择模型,输入目标 Token Rate Limit(每 60 秒的 Token 数)。对话框会显示当前配额和上限。
请根据实际需求申请配额。长期未使用的配额可能会被缩减至默认限制。
限流提额页面还展示所有临时提升申请的历史记录,包括每次申请的提交时间、模型代码和账号 TPM 上限。
速率限制错误
触发速率限制时,API 返回 HTTP 状态码 429,错误信息会指出触发了哪种限制:
| 错误信息 | 原因 |
|---|
Requests rate limit exceeded 或 You exceeded your current requests list | 达到 RPM 限制 |
Allocated quota exceeded 或 You exceeded your current quota | 达到 TPM 限制 |
Request rate increased too quickly | 请求量突增触发了稳定性保护,即使 RPM/TPM 未超限 |
限制在一分钟内重置。 其他错误请参阅错误信息。
最佳实践
平滑请求速率
将请求均匀分散在时间维度上,避免突发集中发送。使用恒定速率调度、指数退避或请求队列来避免触发每秒限制。
使用备用模型
请求被限流时,可回退到备用模型以保持服务可用性:
import os
import asyncio
from openai import AsyncOpenAI, APIStatusError
API_KEY = os.getenv("DASHSCOPE_API_KEY")
MODEL = "qwen-plus-2025-07-28"
BACKUP_MODEL = "qwen-plus-2025-07-14"
QUESTION = "Who are you?"
NUM_REQUESTS = 10
client = AsyncOpenAI(
api_key=API_KEY,
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
)
async def send_request(model):
try:
await client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": QUESTION}]
)
return True
except APIStatusError as e:
if e.status_code == 429:
print(f"[触发速率限制] 模型 {model}")
return False
raise
except Exception as e:
print(f"[请求失败] 模型 {model},错误:{e}")
return False
async def task(i):
if await send_request(MODEL):
return True
return await send_request(BACKUP_MODEL)
async def main():
results = await asyncio.gather(*(task(i) for i in range(NUM_REQUESTS)))
print(f"成功:{sum(results)},失败:{len(results) - sum(results)}")
if __name__ == "__main__":
asyncio.run(main())
拆分大任务
长对话或大文档会快速消耗大量 Token。将大批量任务拆分为小批次,分时提交,以控制在 TPM 限制内。
选择高配额模型
稳定版或最新版模型通常比旧版快照有更高的速率限制。建议尽量使用模型的最新版本。
使用批量推理
如果不需要实时结果,可以使用 Batch API。批量任务不受实时速率限制,但可能存在排队和处理延迟。