curl --location 'https://dashscope.aliyuncs.com/api/v1/services/aigc/video-generation/video-synthesis' \--header 'X-DashScope-Async: enable' \--header "Authorization: Bearer $DASHSCOPE_API_KEY" \--header 'Content-Type: application/json' \--data '{ "model": "wan2.1-vace-plus", "input": { "function": "image_reference", "prompt": "In the video, a girl gracefully walks out from a misty, ancient forest. Her steps are light, and the camera captures her every nimble moment. When she stops and looks around at the lush woods, a smile of surprise and joy blossoms on her face. This scene, frozen in a moment of interplay between light and shadow, records her wonderful encounter with nature.", "ref_images_url": [ "http://wanx.alicdn.com/material/20250318/image_reference_2_5_16.png", "http://wanx.alicdn.com/material/20250318/image_reference_1_5_16.png" ] }, "parameters": { "prompt_extend": true, "obj_or_bg": ["obj","bg"], "size": "1280*720" }}'
第 2 步:通过任务 ID 获取结果将 {task_id} 替换为上一步 API 返回的 task_id 值。
复制
curl -X GET https://dashscope.aliyuncs.com/api/v1/tasks/{task_id} \--header "Authorization: Bearer $DASHSCOPE_API_KEY"
复制
import osimport requestsimport timeBASE_URL = "https://dashscope.aliyuncs.com/api/v1"API_KEY = os.environ.get("DASHSCOPE_API_KEY")def create_task(): """创建视频合成任务并返回 task_id""" try: resp = requests.post( f"{BASE_URL}/services/aigc/video-generation/video-synthesis", headers={ "X-DashScope-Async": "enable", "Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json" }, json={ "model": "wan2.1-vace-plus", "input": { "function": "image_reference", "prompt": "In the video, a girl walks out from the depths of an ancient, misty forest. Her steps are light, and the camera captures her every graceful moment. When she stops and looks around at the lush trees, a smile of surprise and joy blossoms on her face. This scene, frozen in a moment of intertwined light and shadow, records her wonderful encounter with nature.", "ref_images_url": [ "http://wanx.alicdn.com/material/20250318/image_reference_2_5_16.png", "http://wanx.alicdn.com/material/20250318/image_reference_1_5_16.png" ] }, "parameters": {"prompt_extend": True, "obj_or_bg": ["obj", "bg"], "size": "1280*720"} }, timeout=30 ) resp.raise_for_status() return resp.json()["output"]["task_id"] except requests.RequestException as e: raise RuntimeError(f"创建任务失败: {e}")def poll_result(task_id): while True: try: resp = requests.get( f"{BASE_URL}/tasks/{task_id}", headers={"Authorization": f"Bearer {API_KEY}"}, timeout=10 ) resp.raise_for_status() data = resp.json()["output"] status = data["task_status"] print(f"状态: {status}") if status == "SUCCEEDED": return data["video_url"] elif status in ("FAILED", "CANCELLED"): raise RuntimeError(f"任务失败: {data.get('message', '未知错误')}") time.sleep(15) except requests.RequestException as e: print(f"轮询异常: {e},15 秒后重试...") time.sleep(15)实时