import os
from http import HTTPStatus
from urllib.parse import urlparse, unquote
from pathlib import PurePosixPath
import dashscope
import requests
from dashscope import ImageSynthesis
dashscope.base_http_api_url = 'https://dashscope.aliyuncs.com/api/v1'
# 如果未设置环境变量,请将下行替换为:api_key="sk-xxx"
api_key = os.getenv("DASHSCOPE_API_KEY")
# 公网图片URL
image_url_1 = "https://img.alicdn.com/imgextra/i3/O1CN0157XGE51l6iL9441yX_!!6000000004770-49-tps-1104-1472.webp"
image_url_2 = "https://img.alicdn.com/imgextra/i3/O1CN01SfG4J41UYn9WNt4X1_!!6000000002530-49-tps-1696-960.webp"
def async_call():
print('----创建任务----')
task_info = create_async_task()
print('----等待任务----')
wait_async_task(task_info)
# 创建异步任务
def create_async_task():
rsp = ImageSynthesis.async_call(api_key=api_key,
model="wan2.5-i2i-preview",
prompt="将图1中的闹钟放置到图2的餐桌的花瓶旁边位置",
images=[image_url_1, image_url_2],
negative_prompt="",
n=1,
# size="1280*1280",
prompt_extend=True,
watermark=False,
seed=12345)
print(rsp)
if rsp.status_code == HTTPStatus.OK:
print(rsp.output)
else:
print('失败, status_code: %s, code: %s, message: %s' %
(rsp.status_code, rsp.code, rsp.message))
return rsp
# 等待异步任务完成
def wait_async_task(task):
rsp = ImageSynthesis.wait(task=task, api_key=api_key)
print(rsp)
if rsp.status_code == HTTPStatus.OK:
print(rsp.output)
# 保存文件到当前目录
for result in rsp.output.results:
file_name = PurePosixPath(unquote(urlparse(result.url).path)).parts[-1]
with open('./%s' % file_name, 'wb+') as f:
f.write(requests.get(result.url).content)
else:
print('失败, status_code: %s, code: %s, message: %s' %
(rsp.status_code, rsp.code, rsp.message))
# 查询异步任务状态
def fetch_task_status(task):
status = ImageSynthesis.fetch(task=task, api_key=api_key)
print(status)
if status.status_code == HTTPStatus.OK:
print(status.output.task_status)
else:
print('失败, status_code: %s, code: %s, message: %s' %
(status.status_code, status.code, status.message))
# 取消异步任务,仅PENDING状态的任务可以取消
def cancel_task(task):
rsp = ImageSynthesis.cancel(task=task, api_key=api_key)
print(rsp)
if rsp.status_code == HTTPStatus.OK:
print(rsp.output.task_status)
else:
print('失败, status_code: %s, code: %s, message: %s' %
(rsp.status_code, rsp.code, rsp.message))
if __name__ == '__main__':
async_call()