GUI-Plus 模型基于屏幕截图和自然语言指令解析用户意图,转换为标准化的 GUI 操作复制本文GUI-Plus 可基于屏幕截图和自然语言指令来解析用户意图,并转换为标准化的图像用户界面(GUI)操作(如点击、输入、滚动等),供外部系统决策或执行。相较于千问 VL 系列模型,提升了 GUI 操作的准确性。 GUI-Plus 模型的服务部署范围仅支持中国内地。 支持的模型 模型名称模式上下文长度最大输入最大思维链长度最大回复长度输入成本(每百万 Token)输出成本(每百万 Token)免费额度gui-plus非思考模式256,000254,976(单图最大 16384)-32,7681.5 元4.5 元各 100 万 Token(有效期:千问云开通后 90 天内)gui-plus-2026-02-26思考模式262,144258,048(单图最大 16384)81,92032,7681.5 元4.5 元各 100 万 Token(有效期:千问云开通后 90 天内)gui-plus-2026-02-26非思考模式262,144260,096(单图最大 16384)-32,7681.5 元4.5 元各 100 万 Token(有效期:千问云开通后 90 天内) gui-plus-2026-02-26 模型能力全面升级,支持思考与非思考模式,相较于 gui-plus 模型,gui-plus-2026-02-26 在处理跨平台、多 APP 任务的效果上得到大幅提升。推荐优先使用该模型。 快速开始 本节将演示如何快速发起 GUI-Plus 模型调用,获取执行 GUI 任务的指令。关于如何将指令转换为实际的 GUI 操作并执行,请参阅后文的如何使用章节。如需快速体验模型效果,可进行在线试用。 前提条件 需要已配置 API Key 并将 API Key 配置到环境变量。 如果通过 SDK 进行调用,需安装最新版 SDK。 推荐 System Prompt System Prompt 可定义模型角色、能力和输出规范等,推荐 gui-plus-2026-02-26 模型使用以下系统提示词,否则会影响模型输出结果。 gui-plus 和 gui-plus-2026-02-26 的系统提示词不可共用,gui-plus 的系统提示词请参见GUI-Plus 模型推荐提示词。 电脑端 System Prompt手机端 System Prompt复制"""# Tools You may call one or more functions to assist with the user query. You are provided with function signatures within <tools></tools> XML tags: <tools> {"type": "function", "function": {"name": "computer_use", "description": "Use a mouse and keyboard to interact with a computer, and take screenshots.\\n* This is an interface to a desktop GUI. You do not have access to a terminal or applications menu. You must click on desktop icons to start applications.\\n* Some applications may take time to start or process actions, so you may need to wait and take successive screenshots to see the results of your actions. E.g. if you click on Firefox and a window doesn't open, try wait and taking another screenshot.\\n* The screen's resolution is 1000x1000.\\n* Make sure to click any buttons, links, icons, etc with the cursor tip in the center of the element. Don't click boxes on their edges unless asked.", "parameters": {"properties": {"action": {"description": "The action to perform. The available actions are:\\n* `key`: Performs key down presses on the arguments passed in order, then performs key releases in reverse order.\\n* `type`: Type a string of text on the keyboard.\\n* `mouse_move`: Move the cursor to a specified (x, y) pixel coordinate on the screen.\\n* `left_click`: Click the left mouse button at a specified (x, y) pixel coordinate on the screen.\\n* `left_click_drag`: Click and drag the cursor to a specified (x, y) pixel coordinate on the screen.\\n* `right_click`: Click the right mouse button at a specified (x, y) pixel coordinate on the screen.\\n* `middle_click`: Click the middle mouse button at a specified (x, y) pixel coordinate on the screen.\\n* `double_click`: Double-click the left mouse button at a specified (x, y) pixel coordinate on the screen.\\n* `triple_click`: Triple-click the left mouse button at a specified (x, y) pixel coordinate on the screen (simulated as double-click since it's the closest action).\\n* `scroll`: Performs a scroll of the mouse scroll wheel.\\n* `hscroll`: Performs a horizontal scroll (mapped to regular scroll).\\n* `wait`: Wait specified seconds for the change to happen.\\n* `terminate`: Terminate the current task and report its completion status.\\n* `answer`: Answer a question.\\n* `interact`: Resolve the blocking window by interacting with the user.", "enum": ["key", "type", "mouse_move", "left_click", "left_click_drag", "right_click", "middle_click", "double_click", "triple_click", "scroll", "hscroll", "wait", "terminate", "answer", "interact"], "type": "string"}, "keys": {"description": "Required only by `action=key`.", "type": "array"}, "text": {"description": "Required only by `action=type`, `action=answer` and `action=interact`.", "type": "string"}, "coordinate": {"description": "(x, y): The x (pixels from the left edge) and y (pixels from the top edge) coordinates to move the mouse to. Required only by `action=mouse_move` and `action=left_click_drag`.", "type": "array"}, "pixels": {"description": "The amount of scrolling to perform. Positive values scroll up, negative values scroll down. Required only by `action=scroll` and `action=hscroll`.", "type": "number"}, "time": {"description": "The seconds to wait. Required only by `action=wait`.", "type": "number"}, "status": {"description": "The status of the task. Required only by `action=terminate`.", "type": "string", "enum": ["success", "failure"]}}, "required": ["action"], "type": "object"}}} </tools> For each function call, return a json object with function name and arguments within <tool_call></tool_call> XML tags: <tool_call> {"name": <function-name>, "arguments": <args-json-object>} </tool_call> # Response format Response format for every step: 1) Action: a short imperative describing what to do in the UI. 2) A single <tool_call>...</tool_call> block containing only the JSON: {"name": <function-name>, "arguments": <args-json-object>}. Rules: - Output exactly in the order: Action, <tool_call>. - Be brief: one for Action. - Do not output anything else outside those two parts. - If finishing, use action=terminate in the tool call.""" 快速调用示例 OpenAI PythonOpenAI Node.jsOpenAI curl复制import os from openai import OpenAI system_prompt = """# Tools You may call one or more functions to assist with the user query. You are provided with function signatures within <tools></tools> XML tags: <tools> {"type": "function", "function": {"name": "computer_use", "description": "Use a mouse and keyboard to interact with a computer, and take screenshots.\\n* This is an interface to a desktop GUI. You do not have access to a terminal or applications menu. You must click on desktop icons to start applications.\\n* Some applications may take time to start or process actions, so you may need to wait and take successive screenshots to see the results of your actions. E.g. if you click on Firefox and a window doesn't open, try wait and taking another screenshot.\\n* The screen's resolution is 1000x1000.\\n* Make sure to click any buttons, links, icons, etc with the cursor tip in the center of the element. Don't click boxes on their edges unless asked.", "parameters": {"properties": {"action": {"description": "The action to perform. The available actions are:\\n* `key`: Performs key down presses on the arguments passed in order, then performs key releases in reverse order.\\n* `type`: Type a string of text on the keyboard.\\n* `mouse_move`: Move the cursor to a specified (x, y) pixel coordinate on the screen.\\n* `left_click`: Click the left mouse button at a specified (x, y) pixel coordinate on the screen.\\n* `left_click_drag`: Click and drag the cursor to a specified (x, y) pixel coordinate on the screen.\\n* `right_click`: Click the right mouse button at a specified (x, y) pixel coordinate on the screen.\\n* `middle_click`: Click the middle mouse button at a specified (x, y) pixel coordinate on the screen.\\n* `double_click`: Double-click the left mouse button at a specified (x, y) pixel coordinate on the screen.\\n* `triple_click`: Triple-click the left mouse button at a specified (x, y) pixel coordinate on the screen (simulated as double-click since it's the closest action).\\n* `scroll`: Performs a scroll of the mouse scroll wheel.\\n* `hscroll`: Performs a horizontal scroll (mapped to regular scroll).\\n* `wait`: Wait specified seconds for the change to happen.\\n* `terminate`: Terminate the current task and report its completion status.\\n* `answer`: Answer a question.\\n* `interact`: Resolve the blocking window by interacting with the user.", "enum": ["key", "type", "mouse_move", "left_click", "left_click_drag", "right_click", "middle_click", "double_click", "triple_click", "scroll", "hscroll", "wait", "terminate", "answer", "interact"], "type": "string"}, "keys": {"description": "Required only by `action=key`.", "type": "array"}, "text": {"description": "Required only by `action=type`, `action=answer` and `action=interact`.", "type": "string"}, "coordinate": {"description": "(x, y): The x (pixels from the left edge) and y (pixels from the top edge) coordinates to move the mouse to. Required only by `action=mouse_move` and `action=left_click_drag`.", "type": "array"}, "pixels": {"description": "The amount of scrolling to perform. Positive values scroll up, negative values scroll down. Required only by `action=scroll` and `action=hscroll`.", "type": "number"}, "time": {"description": "The seconds to wait. Required only by `action=wait`.", "type": "number"}, "status": {"description": "The status of the task. Required only by `action=terminate`.", "type": "string", "enum": ["success", "failure"]}}, "required": ["action"], "type": "object"}}} </tools> For each function call, return a json object with function name and arguments within <tool_call></tool_call> XML tags: <tool_call> {"name": <function-name>, "arguments": <args-json-object>} </tool_call> # Response format Response format for every step: 1) Action: a short imperative describing what to do in the UI. 2) A single <tool_call>...</tool_call> block containing only the JSON: {"name": <function-name>, "arguments": <args-json-object>}. Rules: - Output exactly in the order: Action, <tool_call>. - Be brief: one for Action. - Do not output anything else outside those two parts. - If finishing, use action=terminate in the tool call.""" messages = [ { "role": "system", "content": system_prompt }, { "role": "user", "content": [ {"type": "image_url", "image_url": {"url": "https://img.alicdn.com/imgextra/i2/O1CN016iJ8ob1C3xP1s2M6z_!!6000000000026-2-tps-3008-1758.png"}}, {"type": "text", "text": "帮我打开浏览器"} ] } ] client = OpenAI( api_key=os.getenv("DASHSCOPE_API_KEY"), base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", ) completion = client.chat.completions.create( model="gui-plus-2026-02-26", messages=messages, extra_body={"vl_high_resolution_images": True} ) print(completion.choices[0].message.content) OpenAI 兼容返回结果复制{ "choices": [ { "message": { "content": "<tool_call>\n{\"name\": \"computer_use\", \"arguments\": {\"action\": \"left_click\", \"coordinate\": [2530, 314]}}\n</tool_call>", "role": "assistant" }, "finish_reason": "stop", "index": 0, "logprobs": null } ], "object": "chat.completion", "usage": { "prompt_tokens": 7750, "completion_tokens": 36, "total_tokens": 7786, "prompt_tokens_details": { "image_tokens": 6743, "text_tokens": 1007 }, "completion_tokens_details": { "text_tokens": 36 } }, "created": 1773133741, "system_fingerprint": null, "model": "gui-plus", "id": "chatcmpl-8b375016-abb8-9791-856c-74b2825c22d5" } DashScope PythonDashScope JavaDashScope curl复制import os import dashscope system_prompt = """# Tools You may call one or more functions to assist with the user query. You are provided with function signatures within <tools></tools> XML tags: <tools> {"type": "function", "function": {"name": "computer_use", "description": "Use a mouse and keyboard to interact with a computer, and take screenshots.\\n* This is an interface to a desktop GUI. You do not have access to a terminal or applications menu. You must click on desktop icons to start applications.\\n* Some applications may take time to start or process actions, so you may need to wait and take successive screenshots to see the results of your actions. E.g. if you click on Firefox and a window doesn't open, try wait and taking another screenshot.\\n* The screen's resolution is 1000x1000.\\n* Make sure to click any buttons, links, icons, etc with the cursor tip in the center of the element. Don't click boxes on their edges unless asked.", "parameters": {"properties": {"action": {"description": "The action to perform. The available actions are:\\n* `key`: Performs key down presses on the arguments passed in order, then performs key releases in reverse order.\\n* `type`: Type a string of text on the keyboard.\\n* `mouse_move`: Move the cursor to a specified (x, y) pixel coordinate on the screen.\\n* `left_click`: Click the left mouse button at a specified (x, y) pixel coordinate on the screen.\\n* `left_click_drag`: Click and drag the cursor to a specified (x, y) pixel coordinate on the screen.\\n* `right_click`: Click the right mouse button at a specified (x, y) pixel coordinate on the screen.\\n* `middle_click`: Click the middle mouse button at a specified (x, y) pixel coordinate on the screen.\\n* `double_click`: Double-click the left mouse button at a specified (x, y) pixel coordinate on the screen.\\n* `triple_click`: Triple-click the left mouse button at a specified (x, y) pixel coordinate on the screen (simulated as double-click since it's the closest action).\\n* `scroll`: Performs a scroll of the mouse scroll wheel.\\n* `hscroll`: Performs a horizontal scroll (mapped to regular scroll).\\n* `wait`: Wait specified seconds for the change to happen.\\n* `terminate`: Terminate the current task and report its completion status.\\n* `answer`: Answer a question.\\n* `interact`: Resolve the blocking window by interacting with the user.", "enum": ["key", "type", "mouse_move", "left_click", "left_click_drag", "right_click", "middle_click", "double_click", "triple_click", "scroll", "hscroll", "wait", "terminate", "answer", "interact"], "type": "string"}, "keys": {"description": "Required only by `action=key`.", "type": "array"}, "text": {"description": "Required only by `action=type`, `action=answer` and `action=interact`.", "type": "string"}, "coordinate": {"description": "(x, y): The x (pixels from the left edge) and y (pixels from the top edge) coordinates to move the mouse to. Required only by `action=mouse_move` and `action=left_click_drag`.", "type": "array"}, "pixels": {"description": "The amount of scrolling to perform. Positive values scroll up, negative values scroll down. Required only by `action=scroll` and `action=hscroll`.", "type": "number"}, "time": {"description": "The seconds to wait. Required only by `action=wait`.", "type": "number"}, "status": {"description": "The status of the task. Required only by `action=terminate`.", "type": "string", "enum": ["success", "failure"]}}, "required": ["action"], "type": "object"}}} </tools> For each function call, return a json object with function name and arguments within <tool_call></tool_call> XML tags: <tool_call> {"name": <function-name>, "arguments": <args-json-object>} </tool_call> # Response format Response format for every step: 1) Action: a short imperative describing what to do in the UI. 2) A single <tool_call>...</tool_call> block containing only the JSON: {"name": <function-name>, "arguments": <args-json-object>}. Rules: - Output exactly in the order: Action, <tool_call>. - Be brief: one for Action. - Do not output anything else outside those two parts. - If finishing, use action=terminate in the tool call.""" messages = [ { "role": "system", "content": system_prompt }, { "role": "user", "content": [ {"image": "https://img.alicdn.com/imgextra/i2/O1CN016iJ8ob1C3xP1s2M6z_!!6000000000026-2-tps-3008-1758.png"}, {"text": "帮我打开浏览器。"}] }] response = dashscope.MultiModalConversation.call( api_key=os.getenv('DASHSCOPE_API_KEY'), model='gui-plus-2026-02-26', messages=messages, vl_high_resolution_images=True ) print(response.output.choices[0].message.content[0]["text"]) DashScope 返回结果复制{ "output": { "choices": [ { "finish_reason": &quol-950" style="padding:6px 12px 6px 4px">工具包与框架OpenAI 兼容更多错误信息临时 API Key上传文件获取临时 URL管理异步任务异步任务管理 API连接池CosyVoice发音控制(SSML)复制本文语速、音调、停顿、音量复制本文SSML(Speech Synthesis Markup Language,语音合成标记语言)是一种 XML 标记语言,用于在 CosyVoice 中控制语速、音调、停顿、音量和背景音乐。 使用限制 模型: cosyvoice-v3-flash、cosyvoice-v3-plus。 音色: 克隆音色,以及音色列表中标记为支持 SSML 的系统音色。 API: Java SDK(2.20.3+):仅支持非流式和单向流式调用。参见 Java SDK 文档。 Python SDK(1.23.4+):仅支持非流式和单向流式调用。参见 Python SDK 文档。 WebSocket API:在 run-task 中将 enable_ssml 设为 true,且仅发送一次 continue-task。参见 WebSocket API 文档。 快速开始 前置条件和教程请参见语音合成 - CosyVoice。使用 SSML 前,请先确认使用限制中支持的模型、音色和 API。 Java SDK 非流式调用单向流式调用复制import com.alibaba.dashscope.audio.ttsv2.SpeechSynthesisParam; import com.alibaba.dashscope.audio.ttsv2.SpeechSynthesizer; import com.alibaba.dashscope.utils.Constants; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.nio.ByteBuffer; // SSML 支持要求参见上方说明 public class Main { private static String model = "cosyvoice-v3-flash"; private static String voice = "longanyang"; public static void main(String[] args) { Constants.baseWebsocketApiUrl = "wss://dashscope.aliyuncs.com/api-ws/v1/inference"; streamAudioDataToSpeaker(); System.exit(0); } public static void streamAudioDataToSpeaker() { SpeechSynthesisParam param = SpeechSynthesisParam.builder() // 如果未配置环境变量,请将下面一行替换为:.apiKey("sk-xxx") .apiKey(System.getenv("DASHSCOPE_API_KEY")) .model(model) .voice(voice) .build(); SpeechSynthesizer synthesizer = new SpeechSynthesizer(param, null); ByteBuffer audio = null; try { // 非流式调用,阻塞直到返回音频 // 转义特殊字符 audio = synthesizer.call("<speak rate=\"2\">My speaking rate is faster than a normal person's.</speak>"); } catch (Exception e) { throw new RuntimeException(e); } finally { // 任务结束后关闭 WebSocket 连接 synthesizer.getDuplexApi().close(1000, "bye"); } if (audio != null) { // 将音频数据保存到本地文件 "output.mp3" File file = new File("output.mp3"); try (FileOutputStream fos = new FileOutputStream(file)) { fos.write(audio.array()); } catch (IOException e) { throw new RuntimeException(e); } } // 首包延迟包含建立 WebSocket 连接所需的时间 System.out.println( "[Metric] Request ID: " + synthesizer.getLastRequestId() + ", First packet latency (ms): " + synthesizer.getFirstPackageDelay()); } } Python SDK 非流式调用单向流式调用复制# coding=utf-8 # SSML 支持要求参见上方说明 import dashscope from dashscope.audio.tts_v2 import * import os # 如果未配置环境变量,请将下面一行替换为:dashscope.api_key = "sk-xxx" dashscope.api_key = os.environ.get('DASHSCOPE_API_KEY') dashscope.base_websocket_api_url='wss://dashscope.aliyuncs.com/api-ws/v1/inference' # 模型 model = "cosyvoice-v3-flash" # 音色 voice = "longanyang" # 实例化 SpeechSynthesizer,将模型、音色等请求参数传入构造函数 synthesizer = SpeechSynthesizer(model=model, voice=voice) # 非流式调用,阻塞直到返回音频 # 转义特殊字符 audio = synthesizer.call("<speak rate=\"2\">My speaking rate is faster than a normal person's.</speak>") # 保存音频到本地 with open('output.mp3', 'wb') as f: f.write(audio) # 首包延迟包含建立 WebSocket 连接所需的时间 print('[Metric] Request ID: {}, First packet latency: {} ms'.format( synthesizer.get_last_request_id(), synthesizer.get_first_package_delay())) WebSocket API GoC#PHPNode.jsJava (WebSocket)Python (WebSocket)复制// SSML 支持要求参见上方说明 package main import ( "encoding/json" "fmt" "net/http" "os" "strings" "time" "github.com/google/uuid" "github.com/gorilla/websocket" ) const ( wsURL = "wss://dashscope.aliyuncs.com/api-ws/v1/inference/" outputFile = "output.mp3" ) func main() { // 如果未配置环境变量,请将下面一行替换为:apiKey := "sk-xxx" apiKey := os.Getenv("DASHSCOPE_API_KEY") // 清空输出文件 os.Remove(outputFile) os.Create(outputFile) // 连接 WebSocket header := make(http.Header) header.Add("X-DashScope-DataInspection", "enable") header.Add("Authorization", fmt.Sprintf("bearer %s", apiKey)) conn, resp, err := websocket.DefaultDialer.Dial(wsURL, header) if err != nil { if resp != nil { fmt.Printf("Connection failed. HTTP status code: %d\n", resp.StatusCode) } fmt.Println("Connection failed:", err) return } defer conn.Close() // 生成任务 ID taskID := uuid.New().String() fmt.Printf("Generated task ID: %s\n", taskID) // 发送 run-task 命令 runTaskCmd := map[string]interface{}{ "header": map[string]interface{}{ "action": "run-task", "task_id": taskID, "streaming": "duplex", }, "payload": map[string]interface{}{ "task_group": "audio", "task": "tts", "function": "SpeechSynthesizer", "model": "cosyvoice-v3-flash", "parameters": map[string]interface{}{ "text_type": "PlainText", "voice": "longanyang", "format": "mp3", "sample_rate": 22050, "volume": 50, "rate": 1, "pitch": 1, // 启用 enable_ssml: true 后,仅发送一次 continue-task "enable_ssml": true, }, "input": map[string]interface{}{}, }, } runTaskJSON, _ := json.Marshal(runTaskCmd) fmt.Printf("Sending run-task command: %s\n", string(runTaskJSON)) err = conn.WriteMessage(websocket.TextMessage, runTaskJSON) if err != nil { fmt.Println("Failed to send run-task:", err) return } textSent := false // 处理消息 for { messageType, message, err := conn.ReadMessage() if err != nil { fmt.Println("Failed to read message:", err) break } // 处理二进制消息 if messageType == websocket.BinaryMessage { fmt.Printf("Received binary message, length: %d\n", len(message)) file, _ := os.OpenFile(outputFile, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0644) file.Write(message) file.Close() continue } // 处理文本消息 messageStr := string(message) fmt.Printf("Received text message: %s\n", strings.ReplaceAll(messageStr, "\n", "")) // 解析 JSON 获取事件类型 var msgMap map[string]interface{} if json.Unmarshal(message, &msgMap) == nil { if header, ok := msgMap["header"].(map[string]interface{}); ok { if event, ok := header["event"].(string); ok { fmt.Printf("Event type: %s\n", event) switch event { case "task-started": fmt.Println("=== Received task-started event ===") if !textSent { // 发送 continue-task 命令;使用 SSML 时仅可发送一次 continueTaskCmd := map[string]interface{}{ "header": map[string]interface{}{ "action": "continue-task", "task_id": taskID, "streaming": "duplex", }, "payload": map[string]interface{}{ "input": map[string]interface{}{ // 转义特殊字符 "text": "<speak rate=\"2\">My speaking rate is faster than a normal person's.</speak>", }, }, } continueTaskJSON, _ := json.Marshal(continueTaskCmd) fmt.Printf("Sending continue-task command: %s\n", string(continueTaskJSON)) err = conn.WriteMessage(websocket.TextMessage, continueTaskJSON) if err != nil { fmt.Println("Failed to send continue-task:", err) return } textSent = true // 延迟发送 finish-task time.Sleep(500 * time.Millisecond) // 发送 finish-task 命令 finishTaskCmd := map[string]interface{}{ "header": map[string]interface{}{ "action": "finish-task", "task_id": taskID, "streaming": "duplex", }, "payload": map[string]interface{}{ "input": map[string]interface{}{}, }, } finishTaskJSON, _ := json.Marshal(finishTaskCmd) fmt.Printf("Sending finish-task command: %s\n", string(finishTaskJSON)) err = conn.WriteMessage(websocket.TextMessage, finishTaskJSON) if err != nil { fmt.Println("Failed to send finish-task:", err) return } } case "task-finished": fmt.Println("=== Task finished ===") return case "task-failed": fmt.Println("=== Task failed ===") if header["error_message"] != nil { fmt.Printf("Error message: %s\n", header["error_message"]) } return case "result-generated": fmt.Println("Received result-generated event") } } } } } } 复制using System.Net.WebSockets; using System.Text; using System.Text.Json; // SSML 支持要求参见上方说明 class Program { // 如果未配置环境变量,请将下面一行替换为:private static readonly string ApiKey = "sk-xxx" private static readonly string ApiKey = Environment.GetEnvironmentVariable("DASHSCOPE_API_KEY") ?? throw new InvalidOperationException("DASHSCOPE_API_KEY environment variable is not set."); private const string WebSocketUrl = "wss://dashscope.aliyuncs.com/api-ws/v1/inference/"; // 输出文件路径 private const string OutputFilePath = "output.mp3"; // WebSocket 客户端 private static ClientWebSocket _webSocket = new ClientWebSocket(); // 取消令牌源 private static CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource(); // 任务 ID private static string? _taskId; // 任务是否已启动 private static TaskCompletionSource<bool> _taskStartedTcs = new TaskCompletionSource<bool>(); static async Task Main(string[] args) { try { // 清空输出文件 ClearOutputFile(OutputFilePath); // 连接 WebSocket 服务 await ConnectToWebSocketAsync(WebSocketUrl); // 启动消息接收任务 Task receiveTask = ReceiveMessagesAsync(); // 发送 run-task 命令 _taskId = GenerateTaskId(); await SendRunTaskCommandAsync(_taskId); // 等待 task-started 事件 await _taskStartedTcs.Task; // 发送 continue-task 命令。使用 SSML 时仅可发送一次。 // 特殊字符需要转义。 await SendContinueTaskCommandAsync("<speak rate=\"2\">My speaking rate is faster than a normal person's.</speak>"); // 发送 finish-task 命令 await SendFinishTaskCommandAsync(_taskId); // 等待接收任务完成 await receiveTask; Console.WriteLine("Task completed, connection closed."); } catch (OperationCanceledException) { Console.WriteLine("The task was canceled."); } catch (Exception ex) { Console.WriteLine($"An error occurred: {ex.Message}"); } finally { _cancellationTokenSource.Cancel(); _webSocket.Dispose(); } } private static void ClearOutputFile(string filePath) { if (File.Exists(filePath)) { File.WriteAllText(filePath, string.Empty); Console.WriteLine("The output file has been cleared."); } else { Console.WriteLine("The output file does not exist and does not need to be cleared."); } } private static async Task ConnectToWebSocketAsync(string url) { var uri = new Uri(url); if (_webSocket.State == WebSocketState.Connecting || _webSocket.State == WebSocketState.Open) { return; } // 设置 WebSocket 连接请求头 _webSocket.Options.SetRequestHeader("Authorization", $"bearer {ApiKey}"); _webSocket.Options.SetRequestHeader("X-DashScope-DataInspection", "enable"); try { await _webSocket.ConnectAsync(uri, _cancellationTokenSource.Token); Console.WriteLine("Successfully connected to the WebSocket service."); } catch (OperationCanceledException) { Console.WriteLine("WebSocket connection was canceled."); } catch (Exception ex) { Console.WriteLine($"WebSocket connection failed: {ex.Message}"); throw; } } private static async Task SendRunTaskCommandAsync(string taskId) { var command = CreateCommand("run-task", taskId, "duplex", new { task_group = "audio", task = "tts", function = "SpeechSynthesizer", model = "cosyvoice-v3-flash", parameters = new { text_type = "PlainText", voice = "longanyang", format = "mp3", sample_rate = 22050, volume = 50, rate = 1, pitch = 1, // 启用 enable_ssml: true 后,仅发送一次 continue-task enable_ssml = true }, input = new { } }); await SendJsonMessageAsync(command); Console.WriteLine("Sent run-task command."); } private static async Task SendContinueTaskCommandAsync(string text) { if (_taskId == null) { throw new InvalidOperationException("Task ID is not initialized."); } var command = CreateCommand("continue-task", _taskId, "duplex", new { input = new { text } }); await SendJsonMessageAsync(command); Console.WriteLine("Sent continue-task command."); } private static async Task SendFinishTaskCommandAsync(string taskId) { var command = CreateCommand("finish-task", taskId, "duplex", new { input = new { } }); await SendJsonMessageAsync(command); Console.WriteLine("Sent finish-task command."); } private static async Task SendJsonMessageAsync(string message) { var buffer = Encoding.UTF8.GetBytes(message); try { await _webSocket.SendAsync(new ArraySegment<byte>(buffer), WebSocketMessageType.Text, true, _cancellationTokenSource.Token); } catch (OperationCanceledException) { Console.WriteLine("Message sending was canceled."); } } private static async Task ReceiveMessagesAsync() { while (_webSocket.State == WebSocketState.Open) { var response = await ReceiveMessageAsync(); if (response != null) { var eventStr = response.RootElement.GetProperty("header").GetProperty("event").GetString(); switch (eventStr) { case "task-started": Console.WriteLine("Task started."); _taskStartedTcs.TrySetResult(true); break; case "task-finished": Console.WriteLine("Task finished."); _cancellationTokenSource.Cancel(); break; case "task-failed": Console.WriteLine("Task failed: " + response.RootElement.GetProperty("header").GetProperty("error_message").GetString()); _cancellationTokenSource.Cancel(); break; default: // 可在此处理 result-generated 事件 break; } } } } private static async Task<JsonDocument?> ReceiveMessageAsync() { var buffer = new byte[1024 * 4]; var segment = new ArraySegment<byte>(buffer); try { WebSocketReceiveResult result = await _webSocket.ReceiveAsync(segment, _cancellationTokenSource.Token); if (result.MessageType == WebSocketMessageType.Close) { await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing", _cancellationTokenSource.Token); return null; } if (result.MessageType == WebSocketMessageType.Binary) { // 处理二进制数据 Console.WriteLine("Receiving binary data..."); // 将二进制数据保存到文件 using (var fileStream = new FileStream(OutputFilePath, FileMode.Append)) { fileStream.Write(buffer, 0, result.Count); } return null; } string message = Encoding.UTF8.GetString(buffer, 0, result.Count); return, "message": { "content": [ { "text": "<tool_call>\n{\"name\": \"computer_use\", \"arguments\": {\"action\": \"left_click\", \"coordinate\": [2530, 314]}}\n</tool_call>" } ], "role": "assistant" } } ] }, "usage": { "image_tokens": 6743, "input_tokens": 7750, "input_tokens_details": { "image_tokens": 6743, "text_tokens": 1007 }, "output_tokens": 36, "output_tokens_details": { "text_tokens": 36 }, "total_tokens": 7786 }, "request_id": "6821285d-e40f-4bca-903f-69f220e3c948" } 如何使用 电脑 GUI 任务 本示例适用于 Windows 操作系统,若在 Mac/Linux 环境下,需修改 ComputerTools 类中的系统命令。 步骤1. 构造 System Prompt 复制system_prompt = """# Tools You may call one or more functions to assist with the user query. You are provided with function signatures within <tools></tools> XML tags: <tools> {"type": "function", "function": {"name": "computer_use", "description": "Use a mouse and keyboard to interact with a computer, and take screenshots.\\n* This is an interface to a desktop GUI. You do not have access to a terminal or applications menu. You must click on desktop icons to start applications.\\n* Some applications may take time to start or process actions, so you may need to wait and take successive screenshots to see the results of your actions. E.g. if you click on Firefox and a window doesn't open, try wait and taking another screenshot.\\n* The screen's resolution is 1000x1000.\\n* Make sure to click any buttons, links, icons, etc with the cursor tip in the center of the element. Don't click boxes on their edges unless asked.", "parameters": {"properties": {"action": {"description": "The action to perform. The available actions are:\\n* `key`: Performs key down presses on the arguments passed in order, then performs key releases in reverse order.\\n* `type`: Type a string of text on the keyboard.\\n* `mouse_move`: Move the cursor to a specified (x, y) pixel coordinate on the screen.\\n* `left_click`: Click the left mouse button at a specified (x, y) pixel coordinate on the screen.\\n* `left_click_drag`: Click and drag the cursor to a specified (x, y) pixel coordinate on the screen.\\n* `right_click`: Click the right mouse button at a specified (x, y) pixel coordinate on the screen.\\n* `middle_click`: Click the middle mouse button at a specified (x, y) pixel coordinate on the screen.\\n* `double_click`: Double-click the left mouse button at a specified (x, y) pixel coordinate on the screen.\\n* `triple_click`: Triple-click the left mouse button at a specified (x, y) pixel coordinate on the screen (simulated as double-click since it's the closest action).\\n* `scroll`: Performs a scroll of the mouse scroll wheel.\\n* `hscroll`: Performs a horizontal scroll (mapped to regular scroll).\\n* `wait`: Wait specified seconds for the change to happen.\\n* `terminate`: Terminate the current task and report its completion status.\\n* `answer`: Answer a question.\\n* `interact`: Resolve the blocking window by interacting with the user.", "enum": ["key", "type", "mouse_move", "left_click", "left_click_drag", "right_click", "middle_click", "double_click", "triple_click", "scroll", "hscroll", "wait", "terminate", "answer", "interact"], "type": "string"}, "keys": {"description": "Required only by `action=key`.", "type": "array"}, "text": {"description": "Required only by `action=type`, `action=answer` and `action=interact`.", "type": "string"}, "coordinate": {"description": "(x, y): The x (pixels from the left edge) and y (pixels from the top edge) coordinates to move the mouse to. Required only by `action=mouse_move` and `action=left_click_drag`.", "type": "array"}, "pixels": {"description": "The amount of scrolling to perform. Positive values scroll up, negative values scroll down. Required only by `action=scroll` and `action=hscroll`.", "type": "number"}, "time": {"description": "The seconds to wait. Required only by `action=wait`.", "type": "number"}, "status": {"description": "The status of the task. Required only by `action=terminate`.", "type": "string", "enum": ["success", "failure"]}}, "required": ["action"], "type": "object"}}} </tools> For each function call, return a json object with function name and arguments within <tool_call></tool_call> XML tags: <tool_call> {"name": <function-name>, "arguments": <args-json-object>} </tool_call> # Response format Response format for every step: 1) Action: a short imperative describing what to do in the UI. 2) A single <tool_call>...</tool_call> block containing only the JSON: {"name": <function-name>, "arguments": <args-json-object>}. Rules: - Output exactly in the order: Action, <tool_call>. - Be brief: one for Action. - Do not output anything else outside those two parts. - If finishing, use action=terminate in the tool call.""" 以上系统提示词要求模型: 假设屏幕分辨率为 1000x1000(归一化坐标系) 输出格式严格:先输出动作(Action)的描述,然后输出 ``` 支持的操作类型:点击、拖拽、输入、滚动、按键等 步骤2. 构造多轮对话消息 在 GUI 自动化任务中,模型需要基于历史操作上下文做出决策。为了让模型理解当前任务进度并生成合理的下一步操作,模型采用以下策略构造多轮对话消息: 仅保留最近 N 轮(默认 4 轮)的完整对话(截图 + 模型输出),避免模型上下文过长导致的性能下降 对更早的历史操作,仅保留文本摘要(模型输出的动作(Action)部分),不包含截图,节省 token 消耗 复制def get_messages(image, instruction, history_output, model_name, system_prompt): """ 构造多轮对话消息 参数: image: 当前截图路径 instruction: 用户指令 history_output: 历史对话记录 [{"output": "...", "image": "..."}] model_name: 模型名称 """ history_n = 4 # 保留最近4轮历史 current_step = len(history_output) # 构造历史操作摘要 history_start_idx = max(0, current_step - history_n) previous_actions = [] for i in range(history_start_idx): if i < len(history_output): history_output_str = history_output[i]['output'] if 'Action:' in history_output_str and '<tool_call>': history_output_str = history_output_str.split('Action:')[1].split('<tool_call>')[0].strip() previous_actions.append(f"Step {i + 1}: {history_output_str}") previous_actions_str = "\\n".join(previous_actions) if previous_actions else "None" instruction_prompt = f""" Please generate the next move according to the UI screenshot, instruction and previous actions. Instruction: {instruction} Previous actions: {previous_actions_str}""" # 构造 messages 数组 messages = [ { "role": "system", "content": [{"text": system_prompt}], } ] history_len = min(history_n, len(history_output)) if history_len > 0: # 添加历史对话 for history_id, history_item in enumerate(history_output[-history_n:], 0): if history_id == 0: messages.append({ "role": "user", "content": [ {"text": instruction_prompt}, {"image": "file://" + history_item['image']} ] }) else: messages.append({ "role": "user", "content": [{"image": "file://" + history_item['image']}] }) messages.append({ "role": "assistant", "content": [{"text": history_item['output']}], }) # 添加当前截图 messages.append({ "role": "user", "content": [{"image": "file://" + image}] }) else: # 首轮对话 messages.append({ "role": "user", "content": [ {"text": instruction_prompt}, {"image": "file://" + image} ] }) return messages GUI模型的多轮对话的message数组示例如下(以7轮对话为例) 复制model_input [{ "role": "system", "content": [{ "text": "# Tools\n\nYou may call one or more functions to assist with the user query.\n\nYou are provided with function signatures within <tools></tools> XML tags:\n<tools>\n{\"type\": \"function\", \"function\": {\"name_for_human\": \"mobile_use\", \"name\": \"mobile_use\", \"description\": \"Use a touchscreen to interact with a mobile device, and take screenshots.\n* This is an interface to a mobile device with touchscreen. You can perform actions like clicking, typing, swiping, etc.\n* Some applications may take time to start or process actions, so you may need to wait and take successive screenshots to see the results of your actions.\n* The screen's resolution is 1000x1000.\n* Make sure to click any buttons, links, icons, etc with the cursor tip in the center of the element. Don't click boxes on their edges unless asked.\", \"parameters\": {\"properties\": {\"action\": {\"description\": \"The action to perform. The available actions are:\n* `key`: Perform a key event on the mobile device.\n - This supports adb's `keyevent` syntax.\n - Examples: \"volume_up\", \"volume_down\", \"power\", \"camera\", \"clear\".\n* `click`: Click the point on the screen with coordinate (x, y).\n* `long_press`: Press the point on the screen with coordinate (x, y) for specified seconds.\n* `swipe`: Swipe from the starting point with coordinate (x, y) to the end point with coordinates2 (x2, y2).\n* `type`: Input the specified text into the activated input box.\n* `system_button`: Press the system button.\n* `open`: Open an app on the device.\n* `wait`: Wait specified seconds for the change to happen.\n* `answer`: Terminate the current task and output the answer.\n* `interact`: Resolve the blocking window by interacting with the user.\n* `terminate`: Terminate the current task and report its completion status.\", \"enum\": [\"key\", \"click\", \"long_press\", \"swipe\", \"type\", \"system_button\", \"open\", \"wait\", \"answer\", \"interact\", \"terminate\"], \"type\": \"string\"}, \"coordinate\": {\"description\": \"(x, y): The x (pixels from the left edge) and y (pixels from the top edge) coordinates to move the mouse to. Required only by `action=click`, `action=long_press`, and `action=swipe`.\", \"type\": \"array\"}, \"coordinate2\": {\"description\": \"(x, y): The x (pixels from the left edge) and y (pixels from the top edge) coordinates to move the mouse to. Required only by `action=swipe`.\", \"type\": \"array\"}, \"text\": {\"description\": \"Required only by `action=key`, `action=type`, `action=open`, `action=answer`,and `action=interact`.\", \"type\": \"string\"}, \"time\": {\"description\": \"The seconds to wait. Required only by `action=long_press` and `action=wait`.\", \"type\": \"number\"}, \"button\": {\"description\": \"Back means returning to the previous interface, Home means returning to the desktop, Menu means opening the application background menu, and Enter means pressing the enter. Required only by `action=system_button`\", \"enum\": [\"Back\", \"Home\", \"Menu\", \"Enter\"], \"type\": \"string\"}, \"status\": {\"description\": \"The status of the task. Required only by `action=terminate`.\", \"type\": \"string\", \"enum\": [\"success\", \"failure\"]}}, \"required\": [\"action\"], \"type\": \"object\"}, \"args_format\": \"Format the arguments as a JSON object.\"}}\n</tools>\n\nFor each function call, return a json object with function name and arguments within <tool_call></tool_call> XML tags:\n<tool_call>\n{\"name\": <function-name>, \"arguments\": <args-json-object>}\n</tool_call>\n\n# Response format\n\nResponse format for every step:\n1) Action: a short imperative describing what to do in the UI.\n2) A single <tool_call>...</tool_call> block containing only the JSON: {\"name\": <function-name>, \"arguments\": <args-json-object>}.\n\nRules:\n- Output exactly in the order: Action, <tool_call>.\n- Be brief: one for Action.\n- Do not output anything else outside those two parts.\n- If finishing, use action=terminate in the tool call." }] }, { "role": "user", "content": [{ "text": "\nPlease generate the next move according to the UI screenshot, instruction and previous actions.\n\nInstruction: 帮我在携程搜一下今天济南喜来登酒店的价格\n\nPrevious actions:\nStep 1: 点击携程旅行应用图标以启动携程旅行预订应用程序。\nStep 2: 等待促销启动画面自动过渡到携程主应用界面。" }, { "image": "http://nlp-mobile-agent.oss-cn-zhangjiakou.aliyuncs.com/computer-use%2Fscreenshot%2Fscreenshot_a84122ac_853a630315784b64988492c9c07b5534.png" }] }, { "role": "assistant", "content": [{ "text": "Action: 点击应用更新通知弹窗右上角的关闭按钮(X图标)以将其关闭。\n<tool_call>\n{\"name\": \"mobile_use\", \"arguments\": {\"action\": \"click\", \"coordinate\": [789, 280]}}\n</tool_call>" }] }, { "role": "user", "content": [{ "image": "http://nlp-mobile-agent.oss-cn-zhangjiakou.aliyuncs.com/computer-use%2Fscreenshot%2Fscreenshot_6010a769_089b9b35b1904913bd5df492563b02b9.png" }] }, { "role": "assistant", "content": [{ "text": "Action: 点击搜索栏中的“济南的酒店”文本区域,以激活搜索输入框并准备修改搜索词。\n<tool_call>\n{\"name\": \"mobile_use\", \"arguments\": {\"action\": \"click\", \"coordinate\": [112, 134]}}\n</tool_call>" }] }, { "role": "user", "content": [{ "image": "http://nlp-mobile-agent.oss-cn-zhangjiakou.aliyuncs.com/computer-use%2Fscreenshot%2Fscreenshot_47446db4_fd4a5022002c4db99f110d5c7261fea2.png" }] }, { "role": "assistant", "content": [{ "text": "Action: 点击显示“厦门”的位置字段,将搜索位置从厦门更改为济南。\n<tool_call>\n{\"name\": \"mobile_use\", \"arguments\": {\"action\": \"click\", \"coordinate\": [156, 347]}}\n</tool_call>" }] }, { "role": "user", "content": [{ "image": "http://nlp-mobile-agent.oss-cn-zhangjiakou.aliyuncs.com/computer-use%2Fscreenshot%2Fscreenshot_3832132c_8c55861c1716467e802a3554402f3580.png" }] }, { "role": "assistant", "content": [{ "text": "Action: 在搜索输入框中键入“济南”,以指定酒店搜索的城市位置。\n<tool_call>\n{\"name\": \"mobile_use\", \"arguments\": {\"action\": \"type\", \"text\": \"济南\"}}\n</tool_call>" }] }, { "role": "user", "content": [{ "image": "http://nlp-mobile-agent.oss-cn-zhangjiakou.aliyuncs.com/computer-use%2Fscreenshot%2Fscreenshot_ff247bac_39c3e20be32c4baf8677a2b6b61bc021.png" }] }] 步骤3. 解析模型输出 由于模型在处理图像时会进行内部缩放,其返回的坐标是基于缩放后图像的归一化坐标。为在原图上准确执行 GUI 操作,需要进行坐标映射。 提取 Tool Call 字段 首先从模型返回的字符串中提取 Tool Call: 复制import re import json def extract_tool_calls(text): """ 从模型输出中提取所有 <tool_call> 块 参数: text: 模型返回的文本 返回: actions: 解析后的操作列表 """ pattern = re.compile(r'<tool_call>(.*?)</tool_call>', re.DOTALL | re.IGNORECASE) blocks = pattern.findall(text) actions = [] for blk in blocks: blk = blk.strip() try: actions.append(json.loads(blk)) except json.JSONDecodeError as e: print(f'解析失败: {e} | 片段: {blk[:80]}...') return actions 坐标映射函数 模型处理图像时会进行内部缩放,以下函数用于计算缩放后的尺寸: 复制import math from PIL import Image def smart_resize(height, width, factor=32, min_pixels=32*32*4, max_pixels=32*32*1280, max_long_side=8192): """ 计算模型内部缩放后的图像尺寸 参数: height: 原始图像高度 width: 原始图像宽度 factor: 分辨率因子(固定为 16) min_pixels: 最小像素值 max_pixels: 最大像素值 max_long_side: 最长边限制 返回: (h_bar, w_bar): 缩放后的高度和宽度 """ def round_by_factor(number, factor): return round(number / factor) * factor def ceil_by_factor(number, factor): return math.ceil(number / factor) * factor def floor_by_factor(number, factor): return math.floor(number / factor) * factor if height < 2 or width < 2: raise ValueError(f"height:{height} or width:{width} must be larger than factor:{factor}") elif max(height, width) / min(height, width) > 200: raise ValueError(f"absolute aspect ratio must be smaller than 200, got {height} / {width}") # 限制最长边 if max(height, width) > max_long_side: beta = max(height, width) / max_long_side height, width = int(height / beta), int(width / beta) # 计算缩放后的尺寸 h_bar = round_by_factor(height, factor) w_bar = round_by_factor(width, factor) if h_bar * w_bar > max_pixels: beta = math.sqrt((height * width) / max_pixels) h_bar = floor_by_factor(height / beta, factor) w_bar = floor_by_factor(width / beta, factor) elif h_bar * w_bar < min_pixels: beta = math.sqrt(min_pixels / (height * width)) h_bar = ceil_by_factor(height * beta, factor) w_bar = ceil_by_factor(width * beta, factor) return h_bar, w_bar 步骤4. 执行GUI操作 解析动作指令后,接下来演示如何使用pyautogui库模拟用户的鼠标点击、键盘输入、滚动等物理 GUI 操作。 复制import pyautogui import pyperclip import time from PIL import Image import os class ComputerTools: """电脑端 GUI 操作工具类""" def __init__(self): self.image_info = None def load_image_info(self, path): """加载图像尺寸信息""" width, height = Image.open(path).size self.image_info = (width, height) def get_screenshot(self, image_path, retry_times=3): """获取桌面截图""" if os.path.exists(image_path): os.remove(image_path) for i in range(retry_times): screenshot = pyautogui.screenshot() screenshot.save(image_path) if os.path.exists(image_path): self.load_image_info(image_path) return True else: time.sleep(0.1) return False def reset(self): """显示桌面""" pyautogui.hotkey('win', 'd') def press_key(self, keys): """按键操作""" if isinstance(keys, list): cleaned_keys = [] for key in keys: if isinstance(key, str): # 处理键名格式 if key.startswith("keys=["): key = key[6:] if key.endswith("]"): key = key[:-1] if key.startswith("['") or key.startswith('["'): key = key[2:] if len(key) > 2 else key if key.endswith("']") or key.endswith('"]'): key = key[:-2] if len(key) > 2 else key key = key.strip() # 转换键名 key_map = { "arrowleft": "left", "arrowright": "right", "arrowup": "up", "arrowdown": "down" } key = key_map.get(key, key) cleaned_keys.append(key) else: cleaned_keys.append(key) keys = cleaned_keys else: keys = [keys] if len(keys) > 1: pyautogui.hotkey(*keys) else: pyautogui.press(keys[0]) def type(self, text): """输入文本(使用剪贴板方式支持中文)""" pyperclip.copy(text) pyautogui.keyDown('ctrl') pyautogui.keyDown('v') pyautogui.keyUp('v') pyautogui.keyUp('ctrl') def mouse_move(self, x, y): """移动鼠标到指定坐标""" pyautogui.moveTo(x, y) time.sleep(0.1) pyautogui.moveTo(x, y) def left_click(self, x, y): """左键点击""" pyautogui.moveTo(x, y) time.sleep(0.1) pyautogui.click() def left_click_drag(self, x, y): """从当前位置拖拽到指定坐标""" pyautogui.dragTo(x, y, duration=0.5) pyautogui.moveTo(x, y) def right_click(self, x, y): """右键点击""" pyautogui.moveTo(x, y) time.sleep(0.1) pyautogui.rightClick() def middle_click(self, x, y): """中键点击""" pyautogui.moveTo(x, y) time.sleep(0.1) pyautogui.middleClick() def double_click(self, x, y): """双击""" pyautogui.moveTo(x, y) time.sleep(0.1) pyautogui.doubleClick() def triple_click(self, x, y): """三击""" pyautogui.moveTo(x, y) time.sleep(0.1) pyautogui.tripleClick() def scroll(self, pixels): """滚轮滚动""" pyautogui.scroll(pixels) 步骤5. 完整自动化流程 将以上所有步骤整合到一个完整的自动化流程中,循环执行截图 模型推理 执行GUI操作,直到任务完成。 复制import os import dashscope import time def run_gui_automation(instruction, max_step=30): """ 运行完整的 GUI 自动化流程 参数: instruction: 用户指令 max_step: 最大执行步骤数 """ # 配置 API dashscope.api_key = os.getenv("DASHSCOPE_API_KEY") dashscope.base_http_api_url = 'https://dashscope.aliyuncs.com/api/v1' model_name = 'gui-plus-2026-02-26' # 初始化工具 computer_tools = ComputerTools() computer_tools.reset() # 显示桌面 # 创建输出目录 output_dir = os.path.join(os.path.expanduser("~"), "Desktop", "gui_automation") os.makedirs(output_dir, exist_ok=True) # 对话历史 history = [] stop_flag = False print(f"[任务] {instruction}") print("=" * 60) for step_id in range(max_step): if stop_flag: break print(f"\n[步骤 {step_id + 1}]") # 1. 截图 screen_shot = os.path.join(output_dir, f'screenshot_{step_id}.png') computer_tools.get_screenshot(screen_shot) # 2. 构造消息 messages = get_messages(screen_shot, instruction, history, model_name) # 3. 调用模型 response = dashscope.MultiModalConversation.call( model=model_name, messages=messages, vl_high_resolution_images=True, stream=False ) output_text = response.output.choices[0].message.content[0]['text'] print(f"[模型输出]\n{output_text}\n") # 4. 解析操作 action_list = extract_tool_calls(output_text) if not action_list: print("未提取到有效操作") break # 5. 执行操作 for action_id, action in enumerate(action_list): action_parameter = action['arguments'] action_type = action_parameter['action'] # 获取图像尺寸用于坐标映射 dummy_image = Image.open(screen_shot) resized_height, resized_width = smart_resize( dummy_image.height, dummy_image.width, factor=16, min_pixels=3136, max_pixels=1003520 * 200 ) # 映射坐标(从归一化坐标 1000x1000 映射到实际尺寸) for key in ['coordinate', 'coordinate1', 'coordinate2']: if key in action_parameter: action_parameter[key][0] = int(action_parameter[key][0] / 1000 * resized_width) action_parameter[key][1] = int(action_parameter[key][1] / 1000 * resized_height) # 执行对应操作 if action_type in ['click', 'left_click']: computer_tools.left_click( action_parameter['coordinate'][0], action_parameter['coordinate'][1] ) print(f"[OK] 左键点击 ({action_parameter['coordinate'][0]}, {action_parameter['coordinate'][1]})") elif action_type == 'mouse_move': computer_tools.mouse_move( action_parameter['coordinate'][0], action_parameter['coordinate'][1] ) print(f"[OK] 移动鼠标到 ({action_parameter['coordinate'][0]}, {action_parameter['coordinate'][1]})") elif action_type == 'middle_click': computer_tools.middle_click( action_parameter['coordinate'][0], action_parameter['coordinate'][1] ) print(f"[OK] 中键点击") elif action_type in ['right click', 'right_click']: computer_tools.right_click( action_parameter['coordinate'][0], action_parameter['coordinate'][1] ) print(f"[OK] 右键点击") elif action_type in ['key', 'hotkey']: computer_tools.press_key(action_parameter['keys']) print(f"[OK] 按键 {action_parameter['keys']}") elif action_type == 'type': text = action_parameter['text'] computer_tools.type(text) print(f"[OK] 输入文本: {text}") elif action_type == 'drag': computer_tools.left_click_drag( action_parameter['coordinate'][0], action_parameter['coordinate'][1] ) print(f"[OK] 拖拽到 ({action_parameter['coordinate'][0]}, {action_parameter['coordinate'][1]})") elif action_type == 'scroll': if 'coordinate' in action_parameter: computer_tools.mouse_move( action_parameter['coordinate'][0], action_parameter['coordinate'][1] ) computer_tools.scroll(action_parameter.get("pixels", 1)) print(f"[OK] 滚动 {action_parameter.get('pixels', 1)} 像素") elif action_type in ['computer_double_click', 'double_click']: computer_tools.double_click( action_parameter['coordinate'][0], action_parameter['coordinate'][1] ) print(f"[OK] 双击") elif action_type == 'wait': time.sleep(action_parameter.get('time', 2)) print(f"[OK] 等待 {action_parameter.get('time', 2)} 秒") elif action_type == 'answer': print(f"[OK] 任务完成: {action_parameter.get('text', '')}") stop_flag = True break elif action_type in ['stop', 'terminate', 'done']: print(f"[OK] 任务终止: {action_parameter.get('status', 'success')}") stop_flag = True break else: print(f"未知操作类型: {action_type}") # 6. 保存历史 history.append({ 'output': output_text, 'image': screen_shot }) time.sleep(2) # 操作间隔 print("\n" + "=" * 60) print(f"[完成] 共执行 {len(history)} 步") # 使用示例 if __name__ == '__main__': run_gui_automation( instruction='帮我打开chrome,在百度中搜索阿里巴巴', max_step=30 ) 电脑端完整示例代码复制import os import re import json import math import time import pyautogui import pyperclip import dashscope from PIL import Image # ===================== 步骤1:System Prompt ===================== system_prompt = """# Tools You may call one or more functions to assist with the user query. You are provided with function signatures within <tools></tools> XML tags: <tools> {"type": "function", "function": {"name": "computer_use", "description": "Use a mouse and keyboard to interact with a computer, and take screenshots.\\n* This is an interface to a desktop GUI. You do not have access to a terminal or applications menu. You must click on desktop icons to start applications.\\n* Some applications may take time to start or process actions, so you may need to wait and take successive screenshots to see the results of your actions. E.g. if you click on Firefox and a window doesn't open, try wait and taking another screenshot.\\n* The screen's resolution is 1000x1000.\\n* Make sure to click any buttons, links, icons, etc with the cursor tip in the center of the element. Don't click boxes on their edges unless asked.", "parameters": {"properties": {"action": {"description": "The action to perform. The available actions are:\\n* `key`: Performs key down presses on the arguments passed in order, then performs key releases in reverse order.\\n* `type`: Type a string of text on the keyboard.\\n* `mouse_move`: Move the cursor to a specified (x, y) pixel coordinate on the screen.\\n* `left_click`: Click the left mouse button at a specified (x, y) pixel coordinate on the screen.\\n* `left_click_drag`: Click and drag the cursor to a specified (x, y) pixel coordinate on the screen.\\n* `right_click`: Click the right mouse button at a specified (x, y) pixel coordinate on the screen.\\n* `middle_click`: Click the middle mouse button at a specified (x, y) pixel coordinate on the screen.\\n* `double_click`: Double-click the left mouse button at a specified (x, y) pixel coordinate on the screen.\\n* `triple_click`: Triple-click the left mouse button at a specified (x, y) pixel coordinate on the screen (simulated as double-click since it's the closest action).\\n* `scroll`: Performs a scroll of the mouse scroll wheel.\\n* `hscroll`: Performs a horizontal scroll (mapped to regular scroll).\\n* `wait`: Wait specified seconds for the change to happen.\\n* `terminate`: Terminate the current task and report its completion status.\\n* `answer`: Answer a question.\\n* `interact`: Resolve the blocking window by interacting with the user.", "enum": ["key", "type", "mouse_move", "left_click", "left_click_drag", "right_click", "middle_click", "double_click", "triple_click", "scroll", "hscroll", "wait", "terminate", "answer", "interact"], "type": "string"}, "keys": {"description": "Required only by `action=key`.", "type": "array"}, "text": {"description": "Required only by `action=type`, `action=answer` and `action=interact`.", "type": "string"}, "coordinate": {"description": "(x, y): The x (pixels from the left edge) and y (pixels from the top edge) coordinates to move the mouse to. Required only by `action=mouse_move` and `action=left_click_drag`.", "type": "array"}, "pixels": {"description": "The amount of scrolling to perform. Positive values scroll up, negative values scroll down. Required only by `action=scroll` and `action=hscroll`.", "type": "number"}, "time": {"description": "The seconds to wait. Required only by `action=wait`.", "type": "number"}, "status": {"description": "The status of the task. Required only by `action=terminate`.", "type": "string", "enum": ["success", "failure"]}}, "required": ["action"], "type": "object"}}} </tools> For each function call, return a json object with function name and arguments within <tool_call></tool_call> XML tags: <tool_call> {"name": <function-name>, "arguments": <args-json-object>} </tool_call> # Response format Response format for every step: 1) Action: a short imperative describing what to do in the UI. 2) A single <tool_call>...</tool_call> block containing only the JSON: {"name": <function-name>, "arguments": <args-json-object>}. Rules: - Output exactly in the order: Action, <tool_call>. - Be brief: one for Action. - Do not output anything else outside those two parts. - If finishing, use action=terminate in the tool call.""" # ===================== 步骤2:构造多轮对话消息 ===================== def get_messages(image, instruction, history_output, system_prompt): history_n = 4 current_step = len(history_output) history_start_idx = max(0, current_step - history_n) previous_actions = [] for i in range(history_start_idx): if i < len(history_output): history_output_str = history_output[i]['output'] if 'Action:' in history_output_str and '<tool_call>' JsonDocument.Parse(message); } catch (OperationCanceledException) { Console.WriteLine("Message reception was canceled."); return null; } } private static string GenerateTaskId() { return Guid.NewGuid().ToString("N").Substring(0, 32); } private static string CreateCommand(string action, string taskId, string streaming, object payload) { var command = new { header = new { action, task_id = taskId, streaming }, payload }; return JsonSerializer.Serialize(command); } } 示例使用以下目录结构:复制my-php-project/ ├── composer.json ├── vendor/ └── index.php composer.json 文件内容(按需调整依赖版本):复制{ "require": { "react/event-loop": "^1.3", "react/socket": "^1.11", "react/stream": "^1.2", "react/http": "^1.1", "ratchet/pawl": "^0.4" }, "autoload": { "psr-4": { "App\\": "src/" } } } index.php 文件内容:复制<!-- SSML 支持要求参见上方说明 --> <?php require __DIR__ . '/vendor/autoload.php'; use Ratchet\Client\Connector; use React\EventLoop\Loop; use React\Socket\Connector as SocketConnector; // 如果未配置环境变量,请将下面一行替换为:$api_key = "sk-xxx" $api_key = getenv("DASHSCOPE_API_KEY"); $websocket_url = 'wss://dashscope.aliyuncs.com/api-ws/v1/inference/'; // WebSocket 服务地址 $output_file = 'output.mp3'; // 输出文件路径 $loop = Loop::get(); if (file_exists($output_file)) { // 清空文件内容 file_put_contents($output_file, ''); } // 创建自定义连接器 $socketConnector = new SocketConnector($loop, [ 'tcp' => [ 'bindto' => '0.0.0.0:0', ], 'tls' => [ 'verify_peer' => false, 'verify_peer_name' => false, ], ]); $connector = new Connector($loop, $socketConnector); $headers = [ 'Authorization' => 'bearer ' . $api_key, 'X-DashScope-DataInspection' => 'enable' ]; $connector($websocket_url, [], $headers)->then(function ($conn) use ($loop, $output_file) { echo "Connected to WebSocket server\n"; // 生成任务 ID $taskId = generateTaskId(); // 发送 run-task 命令 sendRunTaskMessage($conn, $taskId); // 定义发送 continue-task 命令的函数 $sendContinueTask = function() use ($conn, $loop, $taskId) { // 发送 continue-task 命令。使用 SSML 时仅可发送一次。 $continueTaskMessage = json_encode([ "header" => [ "action" => "continue-task", "task_id" => $taskId, "streaming" => "duplex" ], "payload" => [ "input" => [ // 特殊字符需要转义 "text" => "<speak rate=\"2\">My speaking rate is faster than a normal person's.</speak>" ] ] ]); $conn->send($continueTaskMessage); // 发送 finish-task 命令 sendFinishTaskMessage($conn, $taskId); }; // 标记是否已收到 task-started 事件 $taskStarted = false; // 监听消息 $conn->on('message', function($msg) use ($conn, $sendContinueTask, $loop, &$taskStarted, $taskId, $output_file) { if ($msg->isBinary()) { // 将二进制数据写入本地文件 file_put_contents($output_file, $msg->getPayload(), FILE_APPEND); } else { // 处理非二进制消息 $response = json_decode($msg, true); if (isset($response['header']['event'])) { handleEvent($conn, $response, $sendContinueTask, $loop, $taskId, $taskStarted); } else { echo "Unknown message format\n"; } } }); // 监听连接关闭 $conn->on('close', function($code = null, $reason = null) { echo "Connection closed\n"; if ($code !== null) { echo "Close code: " . $code . "\n"; } if ($reason !== null) { echo "Close reason: " . $reason . "\n"; } }); }, function ($e) { echo "Could not connect: {$e->getMessage()}\n"; }); $loop->run(); /** * 生成任务 ID * @return string */ function generateTaskId(): string { return bin2hex(random_bytes(16)); } /** * 发送 run-task 命令 * @param $conn * @param $taskId */ function sendRunTaskMessage($conn, $taskId) { $runTaskMessage = json_encode([ "header" => [ "action" => "run-task", "task_id" => $taskId, "streaming" => "duplex" ], "payload" => [ "task_group" => "audio", "task" => "tts", "function" => "SpeechSynthesizer", "model" => "cosyvoice-v3-flash", "parameters" => [ "text_type" => "PlainText", "voice" => "longanyang", "format" => "mp3", "sample_rate" => 22050, "volume" => 50, "rate" => 1, "pitch" => 1, // 启用 enable_ssml: true 后,仅发送一次 continue-task "enable_ssml" => true ], "input" => (object) [] ] ]); echo "Preparing to send run-task command: " . $runTaskMessage . "\n"; $conn->send($runTaskMessage); echo "run-task command sent\n"; } /** * 读取音频文件 * @param string $filePath * @return bool|string */ function readAudioFile(string $filePath) { $voiceData = file_get_contents($filePath); if ($voiceData === false) { echo "Failed to read audio file\n"; } return $voiceData; } /** * 分割音频数据 * @param string $data * @param int $chunkSize * @return array */ function splitAudioData(string $data, int $chunkSize): array { return str_split($data, $chunkSize); } /** * 发送 finish-task 命令 * @param $conn * @param $taskId */ function sendFinishTaskMessage($conn, $taskId) { $finishTaskMessage = json_encode([ "header" => [ "action" => "finish-task", "task_id" => $taskId, "streaming" => "duplex" ], "payload" => [ "input" => (object) [] ] ]); echo "Preparing to send finish-task command: " . $finishTaskMessage . "\n"; $conn->send($finishTaskMessage); echo "finish-task command sent\n"; } /** * 处理事件 * @param $conn * @param $response * @param $sendContinueTask * @param $loop * @param $taskId * @param $taskStarted */ function handleEvent($conn, $response, $sendContinueTask, $loop, $taskId, &$taskStarted) { switch ($response['header']['event']) { case 'task-started': echo "Task started, sending continue-task command...\n"; $taskStarted = true; // 发送 continue-task 命令 $sendContinueTask(); break; case 'result-generated': // 忽略 result-generated 事件 break; case 'task-finished': echo "Task finished\n"; $conn->close(); break; case 'task-failed': echo "Task failed\n"; echo "Error code: " . $response['header']['error_code'] . "\n"; echo "Error message: " . $response['header']['error_message'] . "\n"; $conn->close(); break; case 'error': echo "Error: " . $response['payload']['message'] . "\n"; break; default: echo "Unknown event: " . $response['header']['event'] . "\n"; break; } // 任务完成后关闭连接 if ($response['header']['event'] == 'task-finished') { // 等待 1 秒确保所有数据已传输 $loop->addTimer(1, function() use ($conn) { $conn->close(); echo "Client closes connection\n"; }); } // 如果未收到 task-started 事件,关闭连接 if (!$taskStarted && in_array($response['header']['event'], ['task-failed', 'error'])) { $conn->close(); } } 安装所需依赖:复制npm install ws npm install uuid 示例代码:复制// SSML 支持要求参见上方说明 import fs from 'fs'; import WebSocket from 'ws'; import { v4 as uuid } from 'uuid'; // 用于生成 UUID // 如果未配置环境变量,请将下面一行替换为:const apiKey = "sk-xxx" const apiKey = process.env.DASHSCOPE_API_KEY; const url = 'wss://dashscope.aliyuncs.com/api-ws/v1/inference/'; // 输出文件路径 const outputFilePath = 'output.mp3'; // 清空输出文件 fs.writeFileSync(outputFilePath, ''); // 创建 WebSocket 客户端 const ws = new WebSocket(url, { headers: { Authorization: `bearer ${apiKey}`, 'X-DashScope-DataInspection': 'enable' } }); let taskStarted = false; let taskId = uuid(); ws.on('open', () => { console.log('Connected to WebSocket server'); // 发送 run-task 命令 const runTaskMessage = JSON.stringify({ header: { action: 'run-task', task_id: taskId, streaming: 'duplex' }, payload: { task_group: 'audio', task: 'tts', function: 'SpeechSynthesizer', model: 'cosyvoice-v3-flash', parameters: { text_type: 'PlainText', voice: 'longanyang', // 音色 format: 'mp3', // 音频格式 sample_rate: 22050, // 采样率 volume: 50, // 音量 rate: 1, // 语速 pitch: 1, // 音调 enable_ssml: true // 是否启用 SSML。启用后仅可发送一次 continue-task 命令,否则会报错 "Text request limit violated, expected 1." }, input: {} } }); ws.send(runTaskMessage); console.log('Sent run-task message'); }); const fileStream = fs.createWriteStream(outputFilePath, { flags: 'a' }); ws.on('message', (data, isBinary) => { if (isBinary) { // 将二进制数据写入文件 fileStream.write(data); } else { const message = JSON.parse(data); switch (message.header.event) { case 'task-started': taskStarted = true; console.log('Task has started'); // 发送 continue-task 命令 sendContinueTasks(ws); break; case 'task-finished': console.log('Task has finished'); ws.close(); fileStream.end(() => { console.log('File stream has been closed'); }); break; case 'task-failed': console.error('Task failed: ', message.header.error_message); ws.close(); fileStream.end(() => { console.log('File stream has been closed'); }); break; default: // 可在此处理 result-generated 事件 break; } } }); function sendContinueTasks(ws) { if (taskStarted) { // 发送 continue-task 命令。使用 SSML 时仅可发送一次。 const continueTaskMessage = JSON.stringify({ header: { action: 'continue-task', task_id: taskId, streaming: 'duplex' }, payload: { input: { // 特殊字符需要转义 text: '<speak rate="2">My speaking rate is faster than a normal person\'s.</speak>' } } }); ws.send(continueTaskMessage); // 发送 finish-task 命令 const finishTaskMessage = JSON.stringify({ header: { action: 'finish-task', task_id: taskId, streaming: 'duplex' }, payload: { input: {} } }); ws.send(finishTaskMessage); } } ws.on('close', () => { console.log('Disconnected from the WebSocket server'); }); Java 建议使用 Java SDK。以下 Java WebSocket 示例需要以下依赖: Java-WebSocket jackson-databind 使用 Maven 或 Gradle 管理依赖:pom.xmlbuild.gradle复制<dependencies> <!-- WebSocket 客户端 --> <dependency> <groupId>org.java-websocket</groupId> <artifactId>Java-WebSocket</artifactId> <version>1.5.3</version> </dependency> <!-- JSON 处理 --> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.13.0</version> </dependency> </dependencies> Java 代码:复制import com.fasterxml.jackson.databind.ObjectMapper; import org.java_websocket.client.WebSocketClient; import org.java_websocket.handshake.ServerHandshake; import java.io.FileOutputStream; import java.io.IOException; import java.net.URI; import java.nio.ByteBuffer; import java.util.*; /** * SSML 功能说明: * 1. 发送 run-task 命令时,将 enable_ssml 参数设为 true 以启用 SSML 支持。 * 2. 通过 continue-task 命令发送包含 SSML 的文本,仅可发送一次。 * 3. SSML 仅支持 cosyvoice-v3-flash 和 cosyvoice-v3-plus 模型的克隆音色,以及音色列表中标记为支持 SSML 的系统音色(例如 cosyvoice-v3-flash 的 longanyang 音色)。 */ public class TTSWebSocketClient extends WebSocketClient { private final String taskId = UUID.randomUUID().toString(); private final String outputFile = "output_" + System.currentTimeMillis() + ".mp3"; private boolean taskFinished = false; public TTSWebSocketClient(URI serverUri, Map<String, String> headers) { super(serverUri, headers); } @Override public void onOpen(ServerHandshake serverHandshake) { System.out.println("Connection successful"); // 发送 run-task 命令 // 启用 enable_ssml 后,仅可发送一次 continue-task 命令。 // 否则会报错 "Text request limit violated, expected 1." String runTaskCommand = "{ \"header\": { \"action\": \"run-task\", \"task_id\": \"" + taskId + "\", \"streaming\": \"duplex\" }, \"payload\": { \"task_group\": \"audio\", \"task\": \"tts\", \"function\": \"SpeechSynthesizer\", \"model\": \"cosyvoice-v3-flash\", \"parameters\": { \"text_type\": \"PlainText\", \"voice\": \"longanyang\", \"format\": \"mp3\", \"sample_rate\": 22050, \"volume\": 50, \"rate\": 1, \"pitch\": 1, \"enable_ssml\": true }, \"input\": {} }}"; send(runTaskCommand); } @Override public void onMessage(String message) { System.out.println("Received message from server: " + message); try { // 解析 JSON 消息 Map<String, Object> messageMap = new ObjectMapper().readValue(message, Map.class); if (messageMap.containsKey("header")) { Map<String, Object> header = (Map<String, Object>) messageMap.get("header"); if (header.containsKey("event")) { String event = (String) header.get("event"); if ("task-started".equals(event)) { System.out.println("Received task-started event from server"); // 发送 continue-task 命令。使用 SSML 时仅可发送一次。 // 特殊字符需要转义。 sendContinueTask("<speak rate=\\\"2\\\">My speaking rate is faster than a normal person's.</speak>"); // 发送 finish-task 命令 sendFinishTask(); } else if ("task-finished".equals(event)) { System.out.println("Received task-finished event from server"); taskFinished = true; closeConnection(); } else if ("task-failed".equals(event)) { System.out.println("Task failed: " + message); closeConnection(); } } } } catch (Exception e) { System.err.println("An exception occurred: " + e.getMessage()); } } @Override public void onMessage(ByteBuffer message) { System.out.println("Size of received binary audio data: " + message.remaining()); try (FileOutputStream fos = new FileOutputStream(outputFile, true)) { byte[] buffer = new byte[message.remaining()]; message.get(buffer); fos.write(buffer); System.out.println("Audio data has been written to the local file " + outputFile); } catch (IOException e) { System.err.println("Failed to write audio data to local file: " + e.getMessage()); } } @Override public void onClose(int code, String reason, boolean remote) { System.out.println("Connection closed: " + reason + " (" + code + ")"); } @Override public void onError(Exception ex) { System.err.println("Error: " + ex.getMessage()); ex.printStackTrace(); } private void sendContinueTask(String text) { String command = "{ \"header\": { \"action\": \"continue-task\", \"task_id\": \"" + taskId + "\", \"streaming\": \"duplex\" }, \"payload\": { \"input\": { \"text\": \"" + text + "\" } }}"; send(command); } private void sendFinishTask() { String command = "{ \"header\": { \"action\": \"finish-task\", \"task_id\": \"" + taskId + "\", \"streaming\": \"duplex\" }, \"payload\": { \"input\": {} }}"; send(command); } private void closeConnection() { if (!isClosed()) { close(); } } public static void main(String[] args) { try { // 如果未配置环境变量,请将下面一行替换为:String apiKey = "sk-xxx" String apiKey = System.getenv("DASHSCOPE_API_KEY"); if (apiKey == null || apiKey.isEmpty()) { System.err.println("Please set the DASHSCOPE_API_KEY environment variable"); return; } Map<String, String> headers = new HashMap<>(); headers.put("Authorization", "bearer " + apiKey); TTSWebSocketClient client = new TTSWebSocketClient(new URI("wss://dashscope.aliyuncs.com/api-ws/v1/inference/"), headers); client.connect(); while (!client.isClosed() && !client.taskFinished) { Thread.sleep(1000); } } catch (Exception e) { System.err.println("Failed to connect to WebSocket service: " + e.getMessage()); e.printStackTrace(); } } } Python 建议使用 Python SDK。运行示例前请先安装依赖:复制pip uninstall websocket-client pip uninstall websocket pip install websocket-client 不要将 Python 文件命名为 websocket.py,否则会导致 AttributeError: module 'websocket' has no attribute 'WebSocketApp'。复制# SSML 功能说明: # 1. 发送 run-task 命令时,将 enable_ssml 参数设为 true 以启用 SSML 支持。 # 2. 通过 continue-task 命令发送包含 SSML 的文本,仅可发送一次。 # 3. SSML 仅支持 cosyvoice-v3-flash 和 cosyvoice-v3-plus 模型的克隆音色,以及音色列表中标记为支持 SSML 的系统音色(例如 cosyvoice-v3-flash 的 longanyang 音色)。 import websocket import json import uuid import os import time class TTSClient: def __init__(self, api_key, uri): """ 初始化 TTSClient 实例。 参数: api_key (str): 用于认证的 API Key。 uri (str): WebSocket 服务地址。 """ self.api_key = api_key # 替换为你的 API Key self.uri = uri # 替换为你的 WebSocket 地址 self.task_id = str(uuid.uuid4()) # 生成唯一任务 ID self.output_file = f"output_{int(time.time())}.mp3" # 输出音频文件路径 self.ws = None # WebSocketApp 实例 self.task_started = False # 是否已收到 task-started self.task_finished = False # 是否已收到 task-finished/task-failed def on_open(self, ws): """ WebSocket 连接建立时的回调函数。 发送 run-task 命令启动语音合成任务。 """ print("WebSocket connection established") # 构建 run-task 命令 run_task_cmd = { "header": { "action": "run-task", "task_id": self.task_id, "streaming": "duplex" }, "payload": { "task_group": "audio", "task": "tts", "function": "SpeechSynthesizer", "model": "cosyvoice-v3-flash", "parameters": { "text_type": "PlainText", "voice": "longanyang", "format": "mp3", "sample_rate": 22050, "volume": 50, "rate": 1, "pitch": 1, # 启用 enable_ssml: true 后,仅发送一次 continue-task "enable_ssml": True }, "input": {} } } # 发送 run-task 命令 ws.send(json.dumps(run_task_cmd)) print("run-task command sent") def on_message(self, ws, message): """ 收到消息时的回调函数。 分别处理文本消息和二进制消息。 """ if isinstance(message, str): # 处理 JSON 文本消息 try: msg_json = json.loads(message) print(f"Received JSON message: {msg_json}") if "header" in msg_json: header = msg_json["header"] if "event" in header: event = header["event"] if event == "task-started": print("Task started") self.task_started = True # 发送 continue-task 命令。使用 SSML 时仅可发送一次。 # 特殊字符需要转义。 self.send_continue_task("<speak rate=\"2\">My speaking rate is faster than a normal person's.</speak>") # 发送完 continue-task 后发送 finish-task self.send_finish_task() elif event == "task-finished": print("Task finished") self.task_finished = True self.close(ws) elif event == "task-failed": error_msg = msg_json.get("error_message", "Unknown error") print(f"Task failed: {error_msg}") self.task_finished = True self.close(ws) except json.JSONDecodeError as e: print(f"JSON parsing failed: {e}") else: # 处理二进制消息(音频数据) print(f"Received binary message, size: {len(message)} bytes") with open(self.output_file, "ab") as f: f.write(message) print(f"Audio data has been written to the local file {self.output_file}") def on_error(self, ws, error): """错误回调。""" print(f"WebSocket error: {error}") def on_close(self, ws, close_status_code, close_msg): """关闭回调。""" print(f"WebSocket closed: {close_msg} ({close_status_code})") def send_continue_task(self, text): """发送 continue-task 命令,包含待合成的文本。""" cmd = { "header": { "action": "continue-task", "task_id": self.task_id, "streaming": "duplex" }, "payload": { "input": { "text": text } } } self.ws.send(json.dumps(cmd)) print(f"Sent continue-task command, text content: {text}") def send_finish_task(self): """发送 finish-task 命令,结束语音合成任务。""" cmd = { "header": { "action": "finish-task", "task_id": self.task_id, "streaming": "duplex" }, "payload": { "input": {} } } self.ws.send(json.dumps(cmd)) print("Sent finish-task command") def close(self, ws): """主动关闭连接。""" if ws and ws.sock and ws.sock.connected: ws.close() print("Connection actively closed") def run(self): """启动 WebSocket 客户端。""" # 设置请求头(认证) header = { "Authorization": f"bearer {self.api_key}", "X-DashScope-DataInspection": "enable" } # 创建 WebSocketApp 实例 self.ws = websocket.WebSocketApp( self.uri, header=header, on_open=self.on_open, on_message=self.on_message, on_error=self.on_error, on_close=self.on_close ) print("Listening for WebSocket messages...") self.ws.run_forever() # 启动持久连接监听 # 使用示例 if __name__ == "__main__": # 如果未配置环境变量,请将下面一行替换为:API_KEY = "sk-xxx" API_KEY = os.environ.get("DASHSCOPE_API_KEY") SERVER_URI = "wss://dashscope.aliyuncs.com/api-ws/v1/inference/" client = TTSClient(API_KEY, SERVER_URI) client.run() 标签 CosyVoice SSML 基于 W3C SSML 1.0,仅支持部分标签。语法规则: 所有 SSML 内容必须包裹在 <speak></speak> 标签中。 可以连续使用多个 <speak> 标签,但不能嵌套。 需要转义 XML 特殊字符:" → ",' → ',& → &,< → <,> → >。 <speak>:根标签 说明 所有 SSML 内容必须包裹在 <speak></speak> 标签中。 语法 复制<speak>需要使用 SSML 功能的文本</speak> 属性 属性类型必填说明voiceString否音色名称。覆盖 API 参数 voice。参见音色列表。rateString否语速。覆盖 API 参数 speech_rate。推荐范围:0.5 ~ 2,默认值 1。大于 1 加速,小于 1 减速。pitchString否音调。覆盖 API 参数 pitch_rate。推荐范围:0.5 ~ 2,默认值 1。大于 1 升高,小于 1 降低。volumeString否音量。覆盖 API 参数 volume。取值范围:0 ~ 100,默认值 50。effectString否音效。可选值:robot、lolita(活泼女声)、lowpass、echo、eq(均衡器,高级)、lpfilter(低通滤波器,高级)、hpfilter(高通滤波器,高级)。eq、lpfilter、hpfilter 需配合 effectValue 使用。每个标签只能设置一种音效。音效会增加延迟。effectValueString否自定义 effect 参数。eq:8 个以空格分隔的整数(-20 ~ 20),分别对应 ["40 Hz", "100 Hz", "200 Hz", "400 Hz", "800 Hz", "1600 Hz", "4000 Hz", "12000 Hz"] 频段的增益,示例:"1 1 1 1 1 1 1 1"。lpfilter:整数频率,范围 (0, sample_rate/2],示例:"800"。hpfilter:整数频率,范围 (0, sample_rate/2],示例:"1200"。bgmString否背景音乐 URL。文件需存放在 OSS 上,权限至少为公共读。URL 中的 XML 特殊字符需转义。要求:16 kHz 采样率、单声道、WAV 格式、16-bit。如果合成音频长于背景音乐,音乐将循环播放。backgroundMusicVolumeString否背景音乐音量。 示例 音色: 复制<speak voice="longcheng_v2"> I am a male voice. </speak> 语速: 复制<speak rate="2"> My speech rate is faster than normal. </speak> 音调: 复制<speak pitch="0.5"> However, my pitch is lower than others. </speak> 音量: 复制<speak volume="80"> My volume is also very high. </speak> 音效: 复制<speak effect="robot"> Do you like the robot WALL-E? </speak> 音效 + effectValue: 复制<speak effect="eq" effectValue="1 -20 1 1 1 1 20 1"> Do you like the robot WALL-E? </speak> <speak effect="lpfilter" effectValue="1200"> Do you like the robot WALL-E? </speak> <speak effect="hpfilter" effectValue="1200"> Do you like the robot WALL-E? </speak> 如果音频不是 WAV 格式,可使用 ffmpeg 转换: 复制ffmpeg -i input_audio -acodec pcm_s16le -ac 1 -ar 16000 output.wav 背景音乐(bgm): 复制<speak bgm="https://nls.alicdn.com/bgm/2.wav" backgroundMusicVolume="30" rate="-500" volume="40"> <break time="2s"/> The old trees on the shady cliff are shrouded in mist <break time="700ms"/> The sound of rain is still in the bamboo forest <break time="700ms"/> I know that cotton contributes to the country's plan <break time="700ms"/> The scenery of Mianzhou is always pitiable <break time="2s"/> </speak> 上传音频的版权由您自行承担法律责任。 组合属性: 复制<speak> 需要使用 SSML 标签的文本 </speak> 复制<speak rate="200" pitch="-100" volume="80"> So when put together, my voice sounds like this. </speak> <break>:停顿 说明 插入一段停顿。时长单位为秒(s)或毫秒(ms)。 语法 复制# 无属性 <break/> # 带 time 属性 <break time="string"/> break 标签行为: 不带属性时,<break/> 默认停顿 1 秒。 注意:连续的 <break> 标签时长会累加,但总时长上限为 10 秒。 例如,以下三个标签总时长为 15 秒,但仅前 10 秒有效:复制<speak> 请闭上眼睛休息一下。<break time="5s"/><break time="5s"/><break time="5s"/>好了,请睁开眼睛。 </speak> 属性 属性类型必填说明timeString否停顿时长,如 "2s" 或 "50ms"。秒为单位:1 ~ 10。毫秒为单位:50 ~ 10000。 示例 复制<speak> 请闭上眼睛休息一下。<break time="500ms"/>好了,请睁开眼睛。 </speak> <sub>:替换文本 说明 将显示文本替换为其他发音。 语法 复制<sub alias="string"></sub> 属性 属性类型必填说明aliasString是替代朗读的文本。 示例 复制<speak> <sub alias="network protocol">W3C</sub> </speak> <phoneme>:设置发音 说明 使用拼音(中文)或 CMU 音标(英文)指定发音。 语法 复制<phoneme alphabet="string" ph="string">text</phoneme> 属性 属性类型必填说明alphabetString是发音类型:"py"(拼音)或 "cmu"(音标)。参见 The CMU Pronouncing Dictionary。phString是拼音或音标符号。每个汉字的拼音之间用空格分隔,音节数必须与字数一致。每个音节带声调号(1 ~ 5,其中 5 为轻声)。 示例 复制<speak> 去<phoneme alphabet="py" ph="dian3 dang4 hang2">典当行</phoneme>把这个玩意<phoneme alphabet="py" ph="dang4 diao4">当掉</phoneme> </speak> <speak> How to spell <phoneme alphabet="cmu" ph="S AY N">sin</phoneme>? </speak> <soundEvent>:插入音效 说明 在合成语音中插入外部音频文件(提示音、环境音等)。 语法 复制<soundEvent src="URL"/> 属性 属性类型必填说明srcString是音频 URL。文件需存放在 OSS 上,权限至少为公共读。URL 中的 XML 特殊字符需转义。要求:16 kHz 采样率、单声道、WAV 格式、16-bit,最大 2 MB。 如果音频不是 WAV 格式,可使用 ffmpeg 转换: 复制ffmpeg -i input_audio -acodec pcm_s16le -ac 1 -ar 16000 output.wav 上传音频的版权由您自行承担法律责任。 示例 复制<speak> A horse was frightened<soundEvent src="https://nls.alicdn.com/sound-event/horse-neigh.wav"/>and people scattered to avoid it. </speak> <say-as>:设置朗读格式 说明 指定文本的朗读方式(如数字、日期、电话号码等)。 语法 复制<say-as interpret-as="string">text</say-as> 属性 属性类型必填说明interpret-asString是文本类型。可选值:cardinal(数字)、digits(逐位数字)、telephone(电话号码)、name(姓名)、address(地址)、id(账号名/昵称)、characters(逐字符)、punctuation(标点)、date(日期)、time(时间)、currency(货币)、measure(度量单位)。 : history_output_str = history_output_str.split('Action:')[1].split('<tool_call>')[0].strip() previous_actions.append(f"Step {i + 1}: {history_output_str}") previous_actions_str = "\\n".join(previous_actions) if previous_actions else "None" instruction_prompt = f""" Please generate the next move according to the UI screenshot, instruction and previous actions. Instruction: {instruction} Previous actions: {previous_actions_str}""" messages = [{"role": "system", "content": [{"text": system_prompt}]}] history_len = min(history_n, len(history_output)) if history_len > 0: for history_id, history_item in enumerate(history_output[-history_n:], 0): if history_id == 0: messages.append({ "role": "user", "content": [ {"text": instruction_prompt}, {"image": "file://" + history_item['image']} ] }) else: messages.append({ "role": "user", "content": [{"image": "file://" + history_item['image']}] }) messages.append({ "role": "assistant", "content": [{"text": history_item['output']}], }) messages.append({ "role": "user", "content": [{"image": "file://" + image}] }) else: messages.append({ "role": "user", "content": [ {"text": instruction_prompt}, {"image": "file://" + image} ] }) return messages # ===================== 步骤3:解析模型输出与坐标映射 ===================== def extract_tool_calls(text): pattern = re.compile(r'<tool_call>(.*?)</tool_call>', re.DOTALL | re.IGNORECASE) blocks = pattern.findall(text) actions = [] for blk in blocks: blk = blk.strip() try: actions.append(json.loads(blk)) except json.JSONDecodeError as e: print(f'解析失败: {e} | 片段: {blk[:80]}...') return actions def smart_resize(height, width, factor=32, min_pixels=32*32*4, max_pixels=32*32*1280, max_long_side=8192): def round_by_factor(number, factor): return round(number / factor) * factor def ceil_by_factor(number, factor): return math.ceil(number / factor) * factor def floor_by_factor(number, factor): return math.floor(number / factor) * factor if height < 2 or width < 2: raise ValueError(f"height:{height} or width:{width} must be larger than factor:{factor}") elif max(height, width) / min(height, width) > 200: raise ValueError(f"absolute aspect ratio must be smaller than 200, got {height} / {width}") if max(height, width) > max_long_side: beta = max(height, width) / max_long_side height, width = int(height / beta), int(width / beta) h_bar = round_by_factor(height, factor) w_bar = round_by_factor(width, factor) if h_bar * w_bar > max_pixels: beta = math.sqrt((height * width) / max_pixels) h_bar = floor_by_factor(height / beta, factor) w_bar = floor_by_factor(width / beta, factor) elif h_bar * w_bar < min_pixels: beta = math.sqrt(min_pixels / (height * width)) h_bar = ceil_by_factor(height * beta, factor) w_bar = ceil_by_factor(width * beta, factor) return h_bar, w_bar # ===================== 步骤4:GUI 操作工具类 ===================== class ComputerTools: def __init__(self): self.image_info = None def load_image_info(self, path): width, height = Image.open(path).size self.image_info = (width, height) def get_screenshot(self, image_path, retry_times=3): if os.path.exists(image_path): os.remove(image_path) for i in range(retry_times): screenshot = pyautogui.screenshot() screenshot.save(image_path) if os.path.exists(image_path): self.load_image_info(image_path) return True else: time.sleep(0.1) return False def reset(self): pyautogui.hotkey('win', 'd') def press_key(self, keys): if isinstance(keys, list): cleaned_keys = [] for key in keys: if isinstance(key, str): if key.startswith("keys=["): key = key[6:] if key.endswith("]"): key = key[:-1] if key.startswith("['") or key.startswith('["'): key = key[2:] if len(key) > 2 else key if key.endswith("']") or key.endswith('"]'): key = key[:-2] if len(key) > 2 else key key = key.strip() key_map = {"arrowleft": "left", "arrowright": "right", "arrowup": "up", "arrowdown": "down"} key = key_map.get(key, key) cleaned_keys.append(key) else: cleaned_keys.append(key) keys = cleaned_keys else: keys = [keys] if len(keys) > 1: pyautogui.hotkey(*keys) else: pyautogui.press(keys[0]) def type(self, text): pyperclip.copy(text) pyautogui.keyDown('ctrl') pyautogui.keyDown('v') pyautogui.keyUp('v') pyautogui.keyUp('ctrl') def mouse_move(self, x, y): pyautogui.moveTo(x, y) time.sleep(0.1) pyautogui.moveTo(x, y) def left_click(self, x, y): pyautogui.moveTo(x, y) time.sleep(0.1) pyautogui.click() def left_click_drag(self, x, y): pyautogui.dragTo(x, y, duration=0.5) pyautogui.moveTo(x, y) def right_click(self, x, y): pyautogui.moveTo(x, y) time.sleep(0.1) pyautogui.rightClick() def middle_click(self, x, y): pyautogui.moveTo(x, y) time.sleep(0.1) pyautogui.middleClick() def double_click(self, x, y): pyautogui.moveTo(x, y) time.sleep(0.1) pyautogui.doubleClick() def triple_click(self, x, y): pyautogui.moveTo(x, y) time.sleep(0.1) pyautogui.tripleClick() def scroll(self, pixels): pyautogui.scroll(pixels) # ===================== 步骤5:完整自动化流程 ===================== def run_gui_automation(instruction, max_step=30): dashscope.api_key = os.getenv("DASHSCOPE_API_KEY") dashscope.base_http_api_url = 'https://dashscope.aliyuncs.com/api/v1' model_name = 'gui-plus-2026-02-26' computer_tools = ComputerTools() computer_tools.reset() output_dir = os.path.join(os.path.expanduser("~"), "Desktop", "gui_automation") os.makedirs(output_dir, exist_ok=True) history = [] stop_flag = False print(f"[任务] {instruction}") print("=" * 60) for step_id in range(max_step): if stop_flag: break print(f"\n[步骤 {step_id + 1}]") screen_shot = os.path.join(output_dir, f'screenshot_{step_id}.png') computer_tools.get_screenshot(screen_shot) messages = get_messages(screen_shot, instruction, history, system_prompt) response = dashscope.MultiModalConversation.call( model=model_name, messages=messages, vl_high_resolution_images=True, stream=False ) output_text = response.output.choices[0].message.content[0]['text'] print(f"[模型输出]\n{output_text}\n") action_list = extract_tool_calls(output_text) if not action_list: print("未提取到有效操作") break for action_id, action in enumerate(action_list): action_parameter = action['arguments'] action_type = action_parameter['action'] dummy_image = Image.open(screen_shot) resized_height, resized_width = smart_resize( dummy_image.height, dummy_image.width, factor=16, min_pixels=3136, max_pixels=1003520 * 200 ) for key in ['coordinate', 'coordinate1', 'coordinate2']: if key in action_parameter: action_parameter[key][0] = int(action_parameter[key][0] / 1000 * resized_width) action_parameter[key][1] = int(action_parameter[key][1] / 1000 * resized_height) if action_type in ['click', 'left_click']: computer_tools.left_click(action_parameter['coordinate'][0], action_parameter['coordinate'][1]) print(f"[OK] 左键点击 ({action_parameter['coordinate'][0]}, {action_parameter['coordinate'][1]})") elif action_type == 'mouse_move': computer_tools.mouse_move(action_parameter['coordinate'][0], action_parameter['coordinate'][1]) print(f"[OK] 移动鼠标") elif action_type == 'middle_click': computer_tools.middle_click(action_parameter['coordinate'][0], action_parameter['coordinate'][1]) print(f"[OK] 中键点击") elif action_type in ['right click', 'right_click']: computer_tools.right_click(action_parameter['coordinate'][0], action_parameter['coordinate'][1]) print(f"[OK] 右键点击") elif action_type in ['key', 'hotkey']: computer_tools.press_key(action_parameter['keys']) print(f"[OK] 按键 {action_parameter['keys']}") elif action_type == 'type': computer_tools.type(action_parameter['text']) print(f"[OK] 输入文本: {action_parameter['text']}") elif action_type == 'drag': computer_tools.left_click_drag(action_parameter['coordinate'][0], action_parameter['coordinate'][1]) print(f"[OK] 拖拽") elif action_type == 'scroll': if 'coordinate' in action_parameter: computer_tools.mouse_move(action_parameter['coordinate'][0], action_parameter['coordinate'][1]) computer_tools.scroll(action_parameter.get("pixels", 1)) print(f"[OK] 滚动 {action_parameter.get('pixels', 1)} 像素") elif action_type in ['computer_double_click', 'double_click']: computer_tools.double_click(action_parameter['coordinate'][0], action_parameter['coordinate'][1]) print(f"[OK] 双击") elif action_type == 'wait': time.sleep(action_parameter.get('time', 2)) print(f"[OK] 等待 {action_parameter.get('time', 2)} 秒") elif action_type == 'answer': print(f"[OK] 任务完成: {action_parameter.get('text', '')}") stop_flag = True break elif action_type in ['stop', 'terminate', 'done']: print(f"[OK] 任务终止: {action_parameter.get('status', 'success')}") stop_flag = True break else: print(f"未知操作类型: {action_type}") history.append({'output': output_text, 'image': screen_shot}) time.sleep(2) print("\n" + "=" * 60) print(f"[完成] 共执行 {len(history)} 步") if __name__ == '__main__': run_gui_automation( instruction='帮我打开chrome,在百度中搜索阿里巴巴', max_step=30 ) 手机端 GUI 任务 手机端通过 ADB(Android Debug Bridge)工具实现自动化操作。 环境准备: 下载适合系统的 Android Debug Bridge,保存到指定路径 在手机上开启"USB 调试"或"ADB调试"(通常需要先开启开发者选项) 通过数据线连接手机和电脑,选择"传输文件"模式 下载 ADB 键盘 的安装包,并将安装包传输到手机上打开,选择无视风险安装 在系统设置中将默认输入法切换为 ADB Keyboard 在电脑终端上测试连接:/path/to/adb devices(设备列表不为空说明连接成功) 电脑系统为 macOS/Linux 时,需要开启权限:sudo chmod +x /path/to/adb 进入手机的某个 App,然后执行命令:/path/to/adb shell am start -a android.intent.action.MAIN -c android.intent.category.HOME,如果手机设备退回到桌面,则说明一切就绪 手机端GUI示例与电脑端大致相同,完整示例代码如下: 手机端完整示例代码 构造手机端System Prompt 复制import json, os, subprocess import dashscope, time, math from PIL import Image, ImageDraw import shutil, requests from datetime import datetime mobile_system_prompt = '''# Tools You may call one or more functions to assist with the user query. You are provided with function signatures within <tools></tools> XML tags: <tools> {"type": "function", "function": {"name_for_human": "mobile_use", "name": "mobile_use", "description": "Use a touchscreen to interact with a mobile device, and take screenshots. * This is an interface to a mobile device with touchscreen. You can perform actions like clicking, typing, swiping, etc. * Some applications may take time to start or process actions, so you may need to wait and take successive screenshots to see the results of your actions. * The screen's resolution is 1000x1000. * Make sure to click any buttons, links, icons, etc with the cursor tip in the center of the element. Don't click boxes on their edges unless asked.", "parameters": {"properties": {"action": {"description": "The action to perform. The available actions are: * `key`: Perform a key event on the mobile device. - This supports adb's `keyevent` syntax. - Examples: "volume_up", "volume_down", "power", "camera", "clear". * `click`: Click the point on the screen with coordinate (x, y). * `long_press`: Press the point on the screen with coordinate (x, y) for specified seconds. * `swipe`: Swipe from the starting point with coordinate (x, y) to the end point with coordinates2 (x2, y2). * `type`: Input the specified text into the activated input box. * `system_button`: Press the system button. * `open`: Open an app on the device. * `wait`: Wait specified seconds for the change to happen. * `answer`: Terminate the current task and output the answer. * `interact`: Resolve the blocking window by interacting with the user. * `terminate`: Terminate the current task and report its completion status.", "enum": ["key", "click", "long_press", "swipe", "type", "system_button", "open", "wait", "answer", "interact", "terminate"], "type": "string"}, "coordinate": {"description": "(x, y): The x (pixels from the left edge) and y (pixels from the top edge) coordinates to move the mouse to. Required only by `action=click`, `action=long_press`, and `action=swipe`.", "type": "array"}, "coordinate2": {"description": "(x, y): The x (pixels from the left edge) and y (pixels from the top edge) coordinates to move the mouse to. Required only by `action=swipe`.", "type": "array"}, "text": {"description": "Required only by `action=key`, `action=type`, `action=open`, `action=answer`,and `action=interact`.", "type": "string"}, "time": {"description": "The seconds to wait. Required only by `action=long_press` and `action=wait`.", "type": "number"}, "button": {"description": "Back means returning to the previous interface, Home means returning to the desktop, Menu means opening the application background menu, and Enter means pressing the enter. Required only by `action=system_button`", "enum": ["Back", "Home", "Menu", "Enter"], "type": "string"}, "status": {"description": "The status of the task. Required only by `action=terminate`.", "type": "string", "enum": ["success", "failure"]}}, "required": ["action"], "type": "object"}, "args_format": "Format the arguments as a JSON object."}} </tools> For each function call, return a json object with function name and arguments within <tool_call></tool_call> XML tags: <tool_call> {"name": <function-name>, "arguments": <args-json-object>} </tool_call> # Response format Response format for every step: 1) Action: a short imperative describing what to do in the UI. 2) A single <tool_call>...</tool_call> block containing only the JSON: {"name": <function-name>, "arguments": <args-json-object>}. Rules: - Output exactly in the order: Action, <tool_call>. - Be brief: one for Action. - Do not output anything else outside those two parts. - If finishing, use action=terminate in the tool call.''' 构造多轮对话消息 复制from datetime import datetime def get_messages(image, instruction, history_output, system_prompt): history_n = 4 current_step = len(history_output) history_start_idx = max(0, current_step - history_n) previous_actions = [] for i in range(history_start_idx): if i < len(history_output): history_output_str = history_output[i]['output'] if 'Action:' in history_output_str and '<tool_call>': history_output_str = history_output_str.split('Action:')[1].split('<tool_call>')[0].strip() previous_actions.append(f"Step {i + 1}: {history_output_str}") previous_actions_str = ( "\n".join(previous_actions) if previous_actions else "None" ) # 添加背景信息 today = datetime.today() weekday_names = ["星期一", "星期二", "星期三", "星期四", "星期五", "星期六", "星期日"] weekday = weekday_names[today.weekday()] formatted_date = today.strftime("%Y年%m月%d日") + " " + weekday ground_info = f'''今天的日期是:{formatted_date}。''' instruction_prompt = f""" Please generate the next move according to the UI screenshot, instruction and previous actions. Instruction: {ground_info}{instruction} Previous actions: {previous_actions_str}""" ## 模型调用 messages = [ { "role": "system", "content": [ {"text": system_prompt} ], } ] history_len = min(history_n, len(history_output)) if history_len > 0: for history_id, history_item in enumerate(history_output[-history_n:], 0): if history_id == 0: messages.append({ "role": "user", "content": [ {"text": instruction_prompt}, {"image": "file://" +history_item['image']} ] }) else: messages.append({ "role": "user", "content": [ {"image": "file://" +history_item['image']} ] }) messages.append({ "role": "assistant", "content": [ {"text": history_item['output']}, ] }) messages.append({ "role": "user", "content": [ {"image": "file://" +image}, ] }) else: messages.append( { "role": "user", "content": [ { "text": instruction_prompt }, { "image": "file://" +image, }, ], } ) return messages 计算缩放后的图像大小 手机端与电脑端共用一套 smart_resize 函数。详情请参见前文坐标映射函数。 执行GUI操作 使用 ADB 命令执行实际的手机操作。复制import subprocess import os import time from PIL import Image class AdbTools: def __init__(self, adb_path, device=None): self.adb_path = adb_path self.device = device self.__device_str__ = f" -s {device} " if device is not None else ' ' self.image_info = None def adb_shell(self, command): command = self.adb_path + self.__device_str__ + command subprocess.run(command, capture_output=True, text=True, shell=True) ## 载入手机size def load_image_info(self, path): width, height = Image.open(path).size self.image_info = (width, height) ## 获取截图 def get_screenshot(self, image_path, retry_times=3): command = self.adb_path + (f" -s {self.device}" if self.device is not None else '') + f" exec-out screencap -p > {image_path}" for i in range(retry_times): subprocess.run(command, capture_output=True, text=True, shell=True) if os.path.exists(image_path): self.load_image_info(image_path) return True else: time.sleep(0.1) else: return False ## 点击(x,y) ## coordinate_size: 输入图片的尺寸,默认为None,则使用当前手机的尺寸, 传入为{'x': int, 'y': int} def click(self, x, y, coordinate_size=None): command = self.adb_path + self.__device_str__ + f" shell input tap {x} {y}" subprocess.run(command, capture_output=True, text=True, shell=True) def long_press(self, x, y, time=800): command = self.adb_path + self.__device_str__ + f" shell input swipe {x} {y} {x} {y} {time}" subprocess.run(command, capture_output=True, text=True, shell=True) ## 滑动从(x1,y1)->(x2,y2) ## coordinate_size: 输入图片的尺寸,默认为None,则使用当前手机的尺寸, 传入为{'x': int, 'y': int} def slide(self, x1, y1, x2, y2, coordinate_size=None, slide_time=800): command = self.adb_path + self.__device_str__ + f" shell input swipe {x1} {y1} {x2} {y2} {slide_time}" subprocess.run(command, capture_output=True, text=True, shell=True) ## 返回 def back(self): command = self.adb_path + self.__device_str__ + f" shell input keyevent 4" subprocess.run(command, capture_output=True, text=True, shell=True) # 点击Home键 def home(self): command = self.adb_path + self.__device_str__ + f" shell am start -a android.intent.action.MAIN -c android.intent.category.HOME" subprocess.run(command, capture_output=True, text=True, shell=True) ## 打字(中英均可,不确定其他语言是否可以),注意需要先在手机安装 adb 键盘 def type(self, text): escaped_text = text.replace('"', '\\"').replace("'", "\\'") command_list = [ f"shell ime enable com.android.adbkeyboard/.AdbIME ", f"shell ime set com.android.adbkeyboard/.AdbIME ", 0.1, f'shell am broadcast -a ADB_INPUT_TEXT --es msg "{escaped_text}" ', 0.1, f"shell ime disable com.android.adbkeyboard/.AdbIME" ] for command in command_list: if isinstance(command, float): time.sleep(command) elif isinstance(command, str): subprocess.run(self.adb_path + self.__device_str__ + command.strip(), capture_output=True, text=True, shell=True) def get_package_name(self, all_packages=False): try: if all_packages: command = self.adb_path + self.__device_str__ + " shell pm list packages" else: command = self.adb_path + self.__device_str__ + " shell pm list packages -3" res = subprocess.run(command, capture_output=True, text=True, shell=True) pkgs = [] for line in res.stdout.splitlines(): s = line.strip() if not s: continue # 去掉前缀 "package:" if s.startswith("package:"): s = s[len("package:"):] # 如果包含 "=",右侧才是包名 if "=" in s: _, s = s.split("=", 1) if s: pkgs.append(s) return sorted(set(pkgs)) except Exception as e: print(e) return [] def open_app(self, package_name): command = self.adb_path + self.__device_str__ + f" shell monkey -p {package_name} -c android.intent.category.LAUNCHER 1" subprocess.run(command, capture_output=True, text=True, shell=True) 应用包名映射 包名是 Android 应用的唯一标识符(格式如:com.公司名.产品名,示例:com.tencent.mm)为了支持通过应用名称打开应用(action=open),需要维护应用名称到包名的映射。复制# 常见应用包名映射(示例,可根据需要扩展) package_str_list = '''com.tencent.mm 微信 wechat com.tencent.mobileqq qq 腾讯qq com.sina.weibo 微博 com.taobao.taobao 淘宝 com.jingdong.app.mall 京东 京东秒送 com.xunmeng.pinduoduo 拼多多 com.xingin.xhs 小红书 com.douban.frodo 豆瓣 com.zhihu.android 知乎 com.autonavi.minimap 高德地图 高德 com.baidu.BaiduMap 百度地图 com.sankuai.meituan.takeoutnew 美团外卖 com.sankuai.meituan 美团 美团外卖 com.dianping.v1 大众点评 点评 me.ele 饿了么 淘宝闪购 com.yek.android.kfc.activitys 肯德基 ctrip.android.view 携程 携程旅行 com.MobileTicket 铁路12306 12306 com.Qunar 去哪儿旅行 去哪儿网 去哪儿 com.sdu.didi.psnger 滴滴出行 滴滴 tv.danmaku.bili bilibili b站 哔哩哔哩 哔站 bili com.ss.android.ugc.aweme 抖音 com.smile.gifmaker 快手 com.tencent.qqlive 腾讯视频 com.qiyi.video 爱奇艺 com.youku.phone 优酷 优酷视频 com.hunantv.imgo.activity 芒果tv 芒果 com.phoenix.read 红果短剧 红果 com.netease.cloudmusic 网易云音乐 网易云 com.tencent.qqmusic qq音乐 com.luna.music 汽水音乐 com.ximalaya.ting.android 喜马拉雅 com.dragon.read 番茄免费小说 番茄小说 com.kmxs.reader 七猫免费小说 com.ss.android.lark 飞书 com.tencent.androidqqmail qq邮箱 com.larus.nova 豆包 豆包 com.gotokeep.keep keep com.lingan.seeyou 美柚 com.tencent.news 腾讯新闻 com.ss.android.article.news 今日头条 com.lianjia.beike 贝壳找房 com.anjuke.android.app 安居客 com.hexin.plat.android 同花顺 com.miHoYo.hkrpg 星穹铁道 崩坏 com.papegames.lysk.cn 恋与深空 com.android.settings settings androidsystemsettings com.android.soundrecorder audiorecorder com.rammigsoftware.bluecoins bluecoins com.flauschcode.broccoli broccoli com.booking booking com.android.chrome 谷歌浏览器 googlechrome chrome com.android.deskclock 时钟 闹钟 clock com.android.contacts contacts com.duolingo duolingo 多邻国 com.expedia.bookings expedia com.android.fileexplorer files filemanager com.google.android.gm gmail googlemail com.google.android.apps.nbu.files googlefiles filesbygoogle com.google.android.calendar googlecalendar com.google.android.apps.dynamite googlechat com.google.android.deskclock googleclock com.google.android.contacts googlecontacts com.google.android.apps.docs.editors.docs googledocs com.google.android.apps.docs googledrive com.google.android.apps.fitness googlefit com.google.android.keep googlekeep com.google.android.apps.maps googlemaps com.google.android.apps.books googleplaybooks com.android.vending googleplaystore com.google.android.apps.docs.editors.slides googleslides com.google.android.apps.tasks googletasks net.cozic.joplin joplin com.mcdonalds.app 麦当劳 mcdonald net.osmand osmand com.Project100Pi.themusicplayer pimusicplayer com.quora.android quora com.reddit.frontpage reddit code.name.monkey.retromusic retromusic com.scientificcalculatorplus.simplecalculator.basiccalculator.mathcalc simplecalendarpro com.simplemobiletools.smsmessenger simplesmsmessenger org.telegram.messenger telegram com.einnovation.temu temu com.zhiliaoapp.musically tiktok com.twitter.android twitter x org.videolan.vlc vlc com.whatsapp whatsapp com.taobao.movie.android 淘票票 com.tongcheng.android 同程旅行 同程 com.sankuai.movie 猫眼 com.wuba.zhuanzhuan 转转 com.tencent.weread 微信读书 com.taobao.idlefish 闲鱼 com.wudaokou.hippo 盒马 com.eg.android.AlipayGphone 支付宝 com.jd.jrapp 京东金融 com.achievo.vipshop 唯品会 com.smzdm.client.android 什么值得买 cn.kuwo.player 酷我音乐 com.taobao.trip 飞猪 飞猪旅行 com.jingdong.pdj 京东到家 com.tencent.map 腾讯地图 com.shizhuang.duapp 得物 cn.damai 大麦 大麦网 com.ss.android.auto 懂车帝 com.cubic.autohome 汽车之家 com.wuba 58同城 五八同城 com.android.calendar 日历 com.alibaba.android.rimet 钉钉 com.meituan.retail.v.android 小象超市 com.aliyun.tongyi 通义 千问 通义千问 com.hupu.games 虎扑 虎扑体育 com.quark.browser 夸克 夸克浏览器 com.yuantiku.tutor 猿辅导 com.tencent.mtt qq浏览器 com.umetrip.android.msky.app 航旅纵横 com.UCMobile UC浏览器 com.ss.android.ugc.aweme.lite 抖音极速版 抖音 air.tv.douyu.android 斗鱼 com.tencent.hunyuan.app.chat 元宝 com.baidu.searchbox 百度 com.lemon.lv 剪映 cn.soulapp.android soul com.baidu.netdisk 百度网盘 com.tmri.app.main 交管12123 12123 com.kugou.android 酷狗 酷狗音乐 com.ss.android.lark 飞书 com.tencent.android.qqdownloader 应用宝 com.mt.mtxx.mtxx 美图 美图秀秀 com.tencent.karaoke 全民k歌 com.intsig.camscanner 扫描全能王 com.android.bankabc 农业银行 农行 cmb.pb 招商银行 招行 com.ganji.android.haoche_c 瓜子二手车 瓜子 com.sf.activity 顺丰 顺丰快递 顺丰速运 com.ziroom.ziroomcustomer 自如 com.yumc.phsuperapp 必胜客 cn.dominos.pizza 达美乐披萨 达美乐 cn.wps.moffice_eng WPS Office WPS com.mfw.roadbook 马蜂窝 com.moonshot.kimichat kimi com.tencent.wemeet.app 腾讯会议 com.deepseek.chat deepseek com.spdbccc.app 浦发银行 cn.samsclub.app 山姆超市 山姆 山姆会员商店 山姆会员店 com.tencent.qqsports 腾讯体育 com.hanweb.android.zhejiang.activity 浙里办 com.ss.android.article.video 西瓜视频 com.taou.maimai 脉脉 ''' PACKAGES_NAME_DICT = {} NAME_PACKAGE_DICT = {} def normalize_package_name(name): name = name.lower().strip().replace(" ", "").replace("-", "") return name for package_str in package_str_list.split("\n"): package_name = package_str.strip().split("\t") PACKAGES_NAME_DICT[package_name[0]] = [normalize_package_name(i) for i in package_name[1:]] for name in package_name[1:]: name = normalize_package_name(name) if name not in NAME_PACKAGE_DICT: NAME_PACKAGE_DICT[name] = [package_name[0]] else: NAME_PACKAGE_DICT[name].append(package_name[0]) 浏览器 GUI 任务 浏览器端通过 Playwright 控制浏览器实现自动化操作,并结合 SoM(Set-of-Mark)技术为页面元素自动添加数字标签,模型通过标签编号来精确操作网页元素。 环境准备: 安装依赖:pip install playwright pillow dashscope playwright-stealth termcolor 安装 Playwright 浏览器:playwright install chromium 完整示例代码包含以下核心流程:构造多轮对话消息、解析模型输出的工具调用、执行浏览器 GUI 操作。 浏览器端完整示例代码步骤1:构造 System Prompt 和消息构造浏览器自动化涉及多个模块的协同,包括 SoM 标注模块、Playwright 浏览器控制模块等。以下是核心代码片段:SoM 标注模块核心函数:SoM(Set-of-Mark)技术通过以下方式实现网页元素标注: 收集页面可交互元素(按钮、链接、输入框等) 为每个元素分配唯一数字标签 在截图上绘制虚线框和标签 生成元素的文本描述供模型理解 Playwright 浏览器控制:浏览器控制模块提供以下核心功能: 页面导航和加载 元素点击和文本输入 页面滚动和等待 键盘快捷键操作 新标签页处理 主链路运行:自动化流程循环执行:获取页面状态(含 SoM 标注)调用模型 解析工具调用 ` 执行操作。 工具调用模式 GUI-Plus 模型支持与其他工具或自定义函数结合使用,实现更复杂的自动化任务。 多轮对话模式 在多轮对话中,模型基于历史截图和输出生成连续的 GUI 操作指令。每次调用保留最近 4 轮完整对话(含截图),更早的历史仅保留文本摘要。 混合工具调用 模型可以同时调用 GUI 操作工具和自定义工具(如数据库查询、API 调用等),实现跨系统的复杂任务自动化。 GUI + 网页操作 模型可以同时控制桌面应用和浏览器,实现跨应用的复合操作,如在桌面应用中获取数据后在浏览器中搜索。 图片格式限制 支持的图片格式:JPEG、PNG、WEBP、GIF 单图最大 Token 数:16,384 图片会被自动缩放到模型内部处理的尺寸 建议上传清晰的屏幕截图以获得最佳效果 过大的图片会增加 Token 消耗和响应时间 计费与限流 计费说明 按输入和输出 Token 数计费 图片会转换为 Token 进行计费 具体价格请参见计费说明 限流 请参见各模型的调用频率限制 API 参考 OpenAI 兼容接口:Chat Completions DashScope 接口:多模态对话 错误码 如果模型调用失败并返回报错信息,请参见错误信息进行解决。 GUI-Plus 模型推荐提示词 电脑端 System Prompt 以下系统提示词适用于 gui-plus 模型: 复制"""# Tools You may call one or more functions to assist with the user query. You are provided with function signatures within <tools></tools> XML tags: <tools> {"type": "function", "function": {"name": "computer_use", "description": "Use a mouse and keyboard to interact with a computer, and take screenshots.\\n* This is an interface to a desktop GUI. You do not have access to a terminal or applications menu. You must click on desktop icons to start applications.\\n* Some applications may take time to start or process actions, so you may need to wait and take successive screenshots to see the results of your actions. E.g. if you click on Firefox and a window doesn't open, try wait and taking another screenshot.\\n* The screen's resolution is 1000x1000.\\n* Make sure to click any buttons, links, icons, etc with the cursor tip in the center of the element. Don't click boxes on their edges unless asked.", "parameters": {"properties": {"action": {"description": "The action to perform. The available actions are:\\n* `key`: Performs key down presses on the arguments passed in order, then performs key releases in reverse order.\\n* `type`: Type a string of text on the keyboard.\\n* `mouse_move`: Move the cursor to a specified (x, y) pixel coordinate on the screen.\\n* `left_click`: Click the left mouse button at a specified (x, y) pixel coordinate on the screen.\\n* `left_click_drag`: Click and drag the cursor to a specified (x, y) pixel coordinate on the screen.\\n* `right_click`: Click the right mouse button at a specified (x, y) pixel coordinate on the screen.\\n* `middle_click`: Click the middle mouse button at a specified (x, y) pixel coordinate on the screen.\\n* `double_click`: Double-click the left mouse button at a specified (x, y) pixel coordinate on the screen.\\n* `triple_click`: Triple-click the left mouse button at a specified (x, y) pixel coordinate on the screen (simulated as double-click since it's the closest action).\\n* `scroll`: Performs a scroll of the mouse scroll wheel.\\n* `hscroll`: Performs a horizontal scroll (mapped to regular scroll).\\n* `wait`: Wait specified seconds for the change to happen.\\n* `terminate`: Terminate the current task and report its completion status.\\n* `answer`: Answer a question.\\n* `interact`: Resolve the blocking window by interacting with the user.", "enum": ["key", "type", "mouse_move", "left_click", "left_click_drag", "right_click", "middle_click", "double_click", "triple_click", "scroll", "hscroll", "wait", "terminate", "answer", "interact"], "type": "string"}, "keys": {"description": "Required only by `action=key`.", "type": "array"}, "text": {"description": "Required only by `action=type`, `action=answer` and `action=interact`.", "type": "string"}, "coordinate": {"description": "(x, y): The x (pixels from the left edge) and y (pixels from the top edge) coordinates to move the mouse to. Required only by `action=mouse_move` and `action=left_click_drag`.", "type": "array"}, "pixels": {"description": "The amount of scrolling to perform. Positive values scroll up, negative values scroll down. Required only by `action=scroll` and `action=hscroll`.", "type": "number"}, "time": {"description": "The seconds to wait. Required only by `action=wait`.", "type": "number"}, "status": {"description": "The status of the task. Required only by `action=terminate`.", "type": "string", "enum": ["success", "failure"]}}, "required": ["action"], "type": "object"}}} </tools> For each function call, return a json object with function name and arguments within <tool_call></tool_call> XML tags: <tool_call> {"name": <function-name>, "arguments": <args-json-object>} </tool_call> # Response format Response format for every step: 1) Action: a short imperative describing what to do in the UI. 2) A single <tool_call>...</tool_call> block containing only the JSON: {"name": <function-name>, "arguments": <args-json-object>}. Rules: - Output exactly in the order: Action, <tool_call>. - Be brief: one for Action. - Do not output anything else outside those two parts. - If finishing, use action=terminate in the tool call.""" 手机端 System Prompt 以下系统提示词适用于 gui-plus 模型: 复制'''# Tools You may call one or more functions to assist with the user query. You are provided with function signatures within <tools></tools> XML tags: <tools> {"type": "function", "function": {"name_for_human": "mobile_use", "name": "mobile_use", "description": "Use a touchscreen to interact with a mobile device, and take screenshots.\n* This is an interface to a mobile device with touchscreen. You can perform actions like clicking, typing, swiping, etc.\n* Some applications may take time to start or process actions, so you may need to wait and take successive screenshots to see the results of your actions.\n* The screen's resolution is 1000x1000.\n* Make sure to click any buttons, links, icons, etc with the cursor tip in the center of the element. Don't click boxes on their edges unless asked.", "parameters": {"properties": {"action": {"description": "The action to perform. The available actions are:\n* `key`: Perform a key event on the mobile device.\n - This supports adb's `keyevent` syntax.\n - Examples: \"volume_up\", \"volume_down\", \"power\", \"camera\", \"clear\".\n* `click`: Click the point on the screen with coordinate (x, y).\n* `long_press`: Press the point on the screen with coordinate (x, y) for specified seconds.\n* `swipe`: Swipe from the starting point with coordinate (x, y) to the end point with coordinates2 (x2, y2).\n* `type`: Input the specified text into the activated input box.\n* `system_button`: Press the system button.\n* `open`: Open an app on the device.\n* `wait`: Wait specified seconds for the change to happen.\n* `answer`: Terminate the current task and output the answer.\n* `interact`: Resolve the blocking window by interacting with the user.\n* `terminate`: Terminate the current task and report its completion status.", "enum": ["key", "click", "long_press", "swipe", "type", "system_button", "open", "wait", "answer", "interact", "terminate"], "type": "string"}, "coordinate": {"description": "(x, y): The x (pixels from the left edge) and y (pixels from the top edge) coordinates to move the mouse to. Required only by `action=click`, `action=long_press`, and `action=swipe`.", "type": "array"}, "coordinate2": {"description": "(x, y): The x (pixels from the left edge) and y (pixels from the top edge) coordinates to move the mouse to. Required only by `action=swipe`.", "type": "array"}, "text": {"description": "Required only by `action=key`, `action=type`, `action=open`, `action=answer`,and `action=interact`.", "type": "string"}, "time": {"description": "The seconds to wait. Required only by `action=long_press` and `action=wait`.", "type": "number"}, "button": {"description": "Back means returning to the previous interface, Home means returning to the desktop, Menu means opening the application background menu, and Enter means pressing the enter. Required only by `action=system_button`", "enum": ["Back", "Home", "Menu", "Enter"], "type": "string"}, "status": {"description": "The status of the task. Required only by `action=terminate`.", "type": "string", "enum": ["success", "failure"]}}, "required": ["action"], "type": "object"}, "args_format": "Format the arguments as a JSON object."}} </tools> For each function call, return a json object with function name and arguments within <tool_call></tool_call> XML tags: <tool_call> {"name": <function-name>, "arguments": <args-json-object>} </tool_call> # Response format Response format for every step: 1) Action: a short imperative describing what to do in the UI. 2) A single <tool_call>...</tool_call> block containing only the JSON: {"name": <function-name>, "arguments": <args-json-object>}. Rules: - Output exactly in the order: Action, <tool_call>. - Be brief: one for Action. - Do not output anything else outside those two parts. - If finishing, use action=terminate in the tool call.''' 相关文档 API Key 计费说明 上一页意图理解(Tongyi-Intent-Detect)tongyi-intent-detect-v3 能够在百毫秒级时间内快速、准确地解析用户意图,并选择合适的工具来解决用户的问题。下一页本页目录支持的模型快速开始前提条件推荐 System Prompt快速调用示例如何使用电脑 GUI 任务手机端 GUI 任务浏览器 GUI 任务工具调用模式多轮对话模式混合工具调用GUI + 网页操作图片格式限制计费与限流计费说明限流API 参考错误码GUI-Plus 模型推荐提示词电脑端 System Prompt手机端 System Prompt相关文档
语速、音调、停顿、音量复制本文SSML(Speech Synthesis Markup Language,语音合成标记语言)是一种 XML 标记语言,用于在 CosyVoice 中控制语速、音调、停顿、音量和背景音乐。 使用限制 模型: cosyvoice-v3-flash、cosyvoice-v3-plus。 音色: 克隆音色,以及音色列表中标记为支持 SSML 的系统音色。 API: Java SDK(2.20.3+):仅支持非流式和单向流式调用。参见 Java SDK 文档。 Python SDK(1.23.4+):仅支持非流式和单向流式调用。参见 Python SDK 文档。 WebSocket API:在 run-task 中将 enable_ssml 设为 true,且仅发送一次 continue-task。参见 WebSocket API 文档。 快速开始 前置条件和教程请参见语音合成 - CosyVoice。使用 SSML 前,请先确认使用限制中支持的模型、音色和 API。 Java SDK 非流式调用单向流式调用复制import com.alibaba.dashscope.audio.ttsv2.SpeechSynthesisParam; import com.alibaba.dashscope.audio.ttsv2.SpeechSynthesizer; import com.alibaba.dashscope.utils.Constants; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.nio.ByteBuffer; // SSML 支持要求参见上方说明 public class Main { private static String model = "cosyvoice-v3-flash"; private static String voice = "longanyang"; public static void main(String[] args) { Constants.baseWebsocketApiUrl = "wss://dashscope.aliyuncs.com/api-ws/v1/inference"; streamAudioDataToSpeaker(); System.exit(0); } public static void streamAudioDataToSpeaker() { SpeechSynthesisParam param = SpeechSynthesisParam.builder() // 如果未配置环境变量,请将下面一行替换为:.apiKey("sk-xxx") .apiKey(System.getenv("DASHSCOPE_API_KEY")) .model(model) .voice(voice) .build(); SpeechSynthesizer synthesizer = new SpeechSynthesizer(param, null); ByteBuffer audio = null; try { // 非流式调用,阻塞直到返回音频 // 转义特殊字符 audio = synthesizer.call("<speak rate=\"2\">My speaking rate is faster than a normal person's.</speak>"); } catch (Exception e) { throw new RuntimeException(e); } finally { // 任务结束后关闭 WebSocket 连接 synthesizer.getDuplexApi().close(1000, "bye"); } if (audio != null) { // 将音频数据保存到本地文件 "output.mp3" File file = new File("output.mp3"); try (FileOutputStream fos = new FileOutputStream(file)) { fos.write(audio.array()); } catch (IOException e) { throw new RuntimeException(e); } } // 首包延迟包含建立 WebSocket 连接所需的时间 System.out.println( "[Metric] Request ID: " + synthesizer.getLastRequestId() + ", First packet latency (ms): " + synthesizer.getFirstPackageDelay()); } } Python SDK 非流式调用单向流式调用复制# coding=utf-8 # SSML 支持要求参见上方说明 import dashscope from dashscope.audio.tts_v2 import * import os # 如果未配置环境变量,请将下面一行替换为:dashscope.api_key = "sk-xxx" dashscope.api_key = os.environ.get('DASHSCOPE_API_KEY') dashscope.base_websocket_api_url='wss://dashscope.aliyuncs.com/api-ws/v1/inference' # 模型 model = "cosyvoice-v3-flash" # 音色 voice = "longanyang" # 实例化 SpeechSynthesizer,将模型、音色等请求参数传入构造函数 synthesizer = SpeechSynthesizer(model=model, voice=voice) # 非流式调用,阻塞直到返回音频 # 转义特殊字符 audio = synthesizer.call("<speak rate=\"2\">My speaking rate is faster than a normal person's.</speak>") # 保存音频到本地 with open('output.mp3', 'wb') as f: f.write(audio) # 首包延迟包含建立 WebSocket 连接所需的时间 print('[Metric] Request ID: {}, First packet latency: {} ms'.format( synthesizer.get_last_request_id(), synthesizer.get_first_package_delay())) WebSocket API GoC#PHPNode.jsJava (WebSocket)Python (WebSocket)复制// SSML 支持要求参见上方说明 package main import ( "encoding/json" "fmt" "net/http" "os" "strings" "time" "github.com/google/uuid" "github.com/gorilla/websocket" ) const ( wsURL = "wss://dashscope.aliyuncs.com/api-ws/v1/inference/" outputFile = "output.mp3" ) func main() { // 如果未配置环境变量,请将下面一行替换为:apiKey := "sk-xxx" apiKey := os.Getenv("DASHSCOPE_API_KEY") // 清空输出文件 os.Remove(outputFile) os.Create(outputFile) // 连接 WebSocket header := make(http.Header) header.Add("X-DashScope-DataInspection", "enable") header.Add("Authorization", fmt.Sprintf("bearer %s", apiKey)) conn, resp, err := websocket.DefaultDialer.Dial(wsURL, header) if err != nil { if resp != nil { fmt.Printf("Connection failed. HTTP status code: %d\n", resp.StatusCode) } fmt.Println("Connection failed:", err) return } defer conn.Close() // 生成任务 ID taskID := uuid.New().String() fmt.Printf("Generated task ID: %s\n", taskID) // 发送 run-task 命令 runTaskCmd := map[string]interface{}{ "header": map[string]interface{}{ "action": "run-task", "task_id": taskID, "streaming": "duplex", }, "payload": map[string]interface{}{ "task_group": "audio", "task": "tts", "function": "SpeechSynthesizer", "model": "cosyvoice-v3-flash", "parameters": map[string]interface{}{ "text_type": "PlainText", "voice": "longanyang", "format": "mp3", "sample_rate": 22050, "volume": 50, "rate": 1, "pitch": 1, // 启用 enable_ssml: true 后,仅发送一次 continue-task "enable_ssml": true, }, "input": map[string]interface{}{}, }, } runTaskJSON, _ := json.Marshal(runTaskCmd) fmt.Printf("Sending run-task command: %s\n", string(runTaskJSON)) err = conn.WriteMessage(websocket.TextMessage, runTaskJSON) if err != nil { fmt.Println("Failed to send run-task:", err) return } textSent := false // 处理消息 for { messageType, message, err := conn.ReadMessage() if err != nil { fmt.Println("Failed to read message:", err) break } // 处理二进制消息 if messageType == websocket.BinaryMessage { fmt.Printf("Received binary message, length: %d\n", len(message)) file, _ := os.OpenFile(outputFile, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0644) file.Write(message) file.Close() continue } // 处理文本消息 messageStr := string(message) fmt.Printf("Received text message: %s\n", strings.ReplaceAll(messageStr, "\n", "")) // 解析 JSON 获取事件类型 var msgMap map[string]interface{} if json.Unmarshal(message, &msgMap) == nil { if header, ok := msgMap["header"].(map[string]interface{}); ok { if event, ok := header["event"].(string); ok { fmt.Printf("Event type: %s\n", event) switch event { case "task-started": fmt.Println("=== Received task-started event ===") if !textSent { // 发送 continue-task 命令;使用 SSML 时仅可发送一次 continueTaskCmd := map[string]interface{}{ "header": map[string]interface{}{ "action": "continue-task", "task_id": taskID, "streaming": "duplex", }, "payload": map[string]interface{}{ "input": map[string]interface{}{ // 转义特殊字符 "text": "<speak rate=\"2\">My speaking rate is faster than a normal person's.</speak>", }, }, } continueTaskJSON, _ := json.Marshal(continueTaskCmd) fmt.Printf("Sending continue-task command: %s\n", string(continueTaskJSON)) err = conn.WriteMessage(websocket.TextMessage, continueTaskJSON) if err != nil { fmt.Println("Failed to send continue-task:", err) return } textSent = true // 延迟发送 finish-task time.Sleep(500 * time.Millisecond) // 发送 finish-task 命令 finishTaskCmd := map[string]interface{}{ "header": map[string]interface{}{ "action": "finish-task", "task_id": taskID, "streaming": "duplex", }, "payload": map[string]interface{}{ "input": map[string]interface{}{}, }, } finishTaskJSON, _ := json.Marshal(finishTaskCmd) fmt.Printf("Sending finish-task command: %s\n", string(finishTaskJSON)) err = conn.WriteMessage(websocket.TextMessage, finishTaskJSON) if err != nil { fmt.Println("Failed to send finish-task:", err) return } } case "task-finished": fmt.Println("=== Task finished ===") return case "task-failed": fmt.Println("=== Task failed ===") if header["error_message"] != nil { fmt.Printf("Error message: %s\n", header["error_message"]) } return case "result-generated": fmt.Println("Received result-generated event") } } } } } } 复制using System.Net.WebSockets; using System.Text; using System.Text.Json; // SSML 支持要求参见上方说明 class Program { // 如果未配置环境变量,请将下面一行替换为:private static readonly string ApiKey = "sk-xxx" private static readonly string ApiKey = Environment.GetEnvironmentVariable("DASHSCOPE_API_KEY") ?? throw new InvalidOperationException("DASHSCOPE_API_KEY environment variable is not set."); private const string WebSocketUrl = "wss://dashscope.aliyuncs.com/api-ws/v1/inference/"; // 输出文件路径 private const string OutputFilePath = "output.mp3"; // WebSocket 客户端 private static ClientWebSocket _webSocket = new ClientWebSocket(); // 取消令牌源 private static CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource(); // 任务 ID private static string? _taskId; // 任务是否已启动 private static TaskCompletionSource<bool> _taskStartedTcs = new TaskCompletionSource<bool>(); static async Task Main(string[] args) { try { // 清空输出文件 ClearOutputFile(OutputFilePath); // 连接 WebSocket 服务 await ConnectToWebSocketAsync(WebSocketUrl); // 启动消息接收任务 Task receiveTask = ReceiveMessagesAsync(); // 发送 run-task 命令 _taskId = GenerateTaskId(); await SendRunTaskCommandAsync(_taskId); // 等待 task-started 事件 await _taskStartedTcs.Task; // 发送 continue-task 命令。使用 SSML 时仅可发送一次。 // 特殊字符需要转义。 await SendContinueTaskCommandAsync("<speak rate=\"2\">My speaking rate is faster than a normal person's.</speak>"); // 发送 finish-task 命令 await SendFinishTaskCommandAsync(_taskId); // 等待接收任务完成 await receiveTask; Console.WriteLine("Task completed, connection closed."); } catch (OperationCanceledException) { Console.WriteLine("The task was canceled."); } catch (Exception ex) { Console.WriteLine($"An error occurred: {ex.Message}"); } finally { _cancellationTokenSource.Cancel(); _webSocket.Dispose(); } } private static void ClearOutputFile(string filePath) { if (File.Exists(filePath)) { File.WriteAllText(filePath, string.Empty); Console.WriteLine("The output file has been cleared."); } else { Console.WriteLine("The output file does not exist and does not need to be cleared."); } } private static async Task ConnectToWebSocketAsync(string url) { var uri = new Uri(url); if (_webSocket.State == WebSocketState.Connecting || _webSocket.State == WebSocketState.Open) { return; } // 设置 WebSocket 连接请求头 _webSocket.Options.SetRequestHeader("Authorization", $"bearer {ApiKey}"); _webSocket.Options.SetRequestHeader("X-DashScope-DataInspection", "enable"); try { await _webSocket.ConnectAsync(uri, _cancellationTokenSource.Token); Console.WriteLine("Successfully connected to the WebSocket service."); } catch (OperationCanceledException) { Console.WriteLine("WebSocket connection was canceled."); } catch (Exception ex) { Console.WriteLine($"WebSocket connection failed: {ex.Message}"); throw; } } private static async Task SendRunTaskCommandAsync(string taskId) { var command = CreateCommand("run-task", taskId, "duplex", new { task_group = "audio", task = "tts", function = "SpeechSynthesizer", model = "cosyvoice-v3-flash", parameters = new { text_type = "PlainText", voice = "longanyang", format = "mp3", sample_rate = 22050, volume = 50, rate = 1, pitch = 1, // 启用 enable_ssml: true 后,仅发送一次 continue-task enable_ssml = true }, input = new { } }); await SendJsonMessageAsync(command); Console.WriteLine("Sent run-task command."); } private static async Task SendContinueTaskCommandAsync(string text) { if (_taskId == null) { throw new InvalidOperationException("Task ID is not initialized."); } var command = CreateCommand("continue-task", _taskId, "duplex", new { input = new { text } }); await SendJsonMessageAsync(command); Console.WriteLine("Sent continue-task command."); } private static async Task SendFinishTaskCommandAsync(string taskId) { var command = CreateCommand("finish-task", taskId, "duplex", new { input = new { } }); await SendJsonMessageAsync(command); Console.WriteLine("Sent finish-task command."); } private static async Task SendJsonMessageAsync(string message) { var buffer = Encoding.UTF8.GetBytes(message); try { await _webSocket.SendAsync(new ArraySegment<byte>(buffer), WebSocketMessageType.Text, true, _cancellationTokenSource.Token); } catch (OperationCanceledException) { Console.WriteLine("Message sending was canceled."); } } private static async Task ReceiveMessagesAsync() { while (_webSocket.State == WebSocketState.Open) { var response = await ReceiveMessageAsync(); if (response != null) { var eventStr = response.RootElement.GetProperty("header").GetProperty("event").GetString(); switch (eventStr) { case "task-started": Console.WriteLine("Task started."); _taskStartedTcs.TrySetResult(true); break; case "task-finished": Console.WriteLine("Task finished."); _cancellationTokenSource.Cancel(); break; case "task-failed": Console.WriteLine("Task failed: " + response.RootElement.GetProperty("header").GetProperty("error_message").GetString()); _cancellationTokenSource.Cancel(); break; default: // 可在此处理 result-generated 事件 break; } } } } private static async Task<JsonDocument?> ReceiveMessageAsync() { var buffer = new byte[1024 * 4]; var segment = new ArraySegment<byte>(buffer); try { WebSocketReceiveResult result = await _webSocket.ReceiveAsync(segment, _cancellationTokenSource.Token); if (result.MessageType == WebSocketMessageType.Close) { await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing", _cancellationTokenSource.Token); return null; } if (result.MessageType == WebSocketMessageType.Binary) { // 处理二进制数据 Console.WriteLine("Receiving binary data..."); // 将二进制数据保存到文件 using (var fileStream = new FileStream(OutputFilePath, FileMode.Append)) { fileStream.Write(buffer, 0, result.Count); } return null; } string message = Encoding.UTF8.GetString(buffer, 0, result.Count); return, "message": { "content": [ { "text": "<tool_call>\n{\"name\": \"computer_use\", \"arguments\": {\"action\": \"left_click\", \"coordinate\": [2530, 314]}}\n</tool_call>" } ], "role": "assistant" } } ] }, "usage": { "image_tokens": 6743, "input_tokens": 7750, "input_tokens_details": { "image_tokens": 6743, "text_tokens": 1007 }, "output_tokens": 36, "output_tokens_details": { "text_tokens": 36 }, "total_tokens": 7786 }, "request_id": "6821285d-e40f-4bca-903f-69f220e3c948" } 如何使用 电脑 GUI 任务 本示例适用于 Windows 操作系统,若在 Mac/Linux 环境下,需修改 ComputerTools 类中的系统命令。 步骤1. 构造 System Prompt 复制system_prompt = """# Tools You may call one or more functions to assist with the user query. You are provided with function signatures within <tools></tools> XML tags: <tools> {"type": "function", "function": {"name": "computer_use", "description": "Use a mouse and keyboard to interact with a computer, and take screenshots.\\n* This is an interface to a desktop GUI. You do not have access to a terminal or applications menu. You must click on desktop icons to start applications.\\n* Some applications may take time to start or process actions, so you may need to wait and take successive screenshots to see the results of your actions. E.g. if you click on Firefox and a window doesn't open, try wait and taking another screenshot.\\n* The screen's resolution is 1000x1000.\\n* Make sure to click any buttons, links, icons, etc with the cursor tip in the center of the element. Don't click boxes on their edges unless asked.", "parameters": {"properties": {"action": {"description": "The action to perform. The available actions are:\\n* `key`: Performs key down presses on the arguments passed in order, then performs key releases in reverse order.\\n* `type`: Type a string of text on the keyboard.\\n* `mouse_move`: Move the cursor to a specified (x, y) pixel coordinate on the screen.\\n* `left_click`: Click the left mouse button at a specified (x, y) pixel coordinate on the screen.\\n* `left_click_drag`: Click and drag the cursor to a specified (x, y) pixel coordinate on the screen.\\n* `right_click`: Click the right mouse button at a specified (x, y) pixel coordinate on the screen.\\n* `middle_click`: Click the middle mouse button at a specified (x, y) pixel coordinate on the screen.\\n* `double_click`: Double-click the left mouse button at a specified (x, y) pixel coordinate on the screen.\\n* `triple_click`: Triple-click the left mouse button at a specified (x, y) pixel coordinate on the screen (simulated as double-click since it's the closest action).\\n* `scroll`: Performs a scroll of the mouse scroll wheel.\\n* `hscroll`: Performs a horizontal scroll (mapped to regular scroll).\\n* `wait`: Wait specified seconds for the change to happen.\\n* `terminate`: Terminate the current task and report its completion status.\\n* `answer`: Answer a question.\\n* `interact`: Resolve the blocking window by interacting with the user.", "enum": ["key", "type", "mouse_move", "left_click", "left_click_drag", "right_click", "middle_click", "double_click", "triple_click", "scroll", "hscroll", "wait", "terminate", "answer", "interact"], "type": "string"}, "keys": {"description": "Required only by `action=key`.", "type": "array"}, "text": {"description": "Required only by `action=type`, `action=answer` and `action=interact`.", "type": "string"}, "coordinate": {"description": "(x, y): The x (pixels from the left edge) and y (pixels from the top edge) coordinates to move the mouse to. Required only by `action=mouse_move` and `action=left_click_drag`.", "type": "array"}, "pixels": {"description": "The amount of scrolling to perform. Positive values scroll up, negative values scroll down. Required only by `action=scroll` and `action=hscroll`.", "type": "number"}, "time": {"description": "The seconds to wait. Required only by `action=wait`.", "type": "number"}, "status": {"description": "The status of the task. Required only by `action=terminate`.", "type": "string", "enum": ["success", "failure"]}}, "required": ["action"], "type": "object"}}} </tools> For each function call, return a json object with function name and arguments within <tool_call></tool_call> XML tags: <tool_call> {"name": <function-name>, "arguments": <args-json-object>} </tool_call> # Response format Response format for every step: 1) Action: a short imperative describing what to do in the UI. 2) A single <tool_call>...</tool_call> block containing only the JSON: {"name": <function-name>, "arguments": <args-json-object>}. Rules: - Output exactly in the order: Action, <tool_call>. - Be brief: one for Action. - Do not output anything else outside those two parts. - If finishing, use action=terminate in the tool call.""" 以上系统提示词要求模型: 假设屏幕分辨率为 1000x1000(归一化坐标系) 输出格式严格:先输出动作(Action)的描述,然后输出 ``` 支持的操作类型:点击、拖拽、输入、滚动、按键等 步骤2. 构造多轮对话消息 在 GUI 自动化任务中,模型需要基于历史操作上下文做出决策。为了让模型理解当前任务进度并生成合理的下一步操作,模型采用以下策略构造多轮对话消息: 仅保留最近 N 轮(默认 4 轮)的完整对话(截图 + 模型输出),避免模型上下文过长导致的性能下降 对更早的历史操作,仅保留文本摘要(模型输出的动作(Action)部分),不包含截图,节省 token 消耗 复制def get_messages(image, instruction, history_output, model_name, system_prompt): """ 构造多轮对话消息 参数: image: 当前截图路径 instruction: 用户指令 history_output: 历史对话记录 [{"output": "...", "image": "..."}] model_name: 模型名称 """ history_n = 4 # 保留最近4轮历史 current_step = len(history_output) # 构造历史操作摘要 history_start_idx = max(0, current_step - history_n) previous_actions = [] for i in range(history_start_idx): if i < len(history_output): history_output_str = history_output[i]['output'] if 'Action:' in history_output_str and '<tool_call>': history_output_str = history_output_str.split('Action:')[1].split('<tool_call>')[0].strip() previous_actions.append(f"Step {i + 1}: {history_output_str}") previous_actions_str = "\\n".join(previous_actions) if previous_actions else "None" instruction_prompt = f""" Please generate the next move according to the UI screenshot, instruction and previous actions. Instruction: {instruction} Previous actions: {previous_actions_str}""" # 构造 messages 数组 messages = [ { "role": "system", "content": [{"text": system_prompt}], } ] history_len = min(history_n, len(history_output)) if history_len > 0: # 添加历史对话 for history_id, history_item in enumerate(history_output[-history_n:], 0): if history_id == 0: messages.append({ "role": "user", "content": [ {"text": instruction_prompt}, {"image": "file://" + history_item['image']} ] }) else: messages.append({ "role": "user", "content": [{"image": "file://" + history_item['image']}] }) messages.append({ "role": "assistant", "content": [{"text": history_item['output']}], }) # 添加当前截图 messages.append({ "role": "user", "content": [{"image": "file://" + image}] }) else: # 首轮对话 messages.append({ "role": "user", "content": [ {"text": instruction_prompt}, {"image": "file://" + image} ] }) return messages GUI模型的多轮对话的message数组示例如下(以7轮对话为例) 复制model_input [{ "role": "system", "content": [{ "text": "# Tools\n\nYou may call one or more functions to assist with the user query.\n\nYou are provided with function signatures within <tools></tools> XML tags:\n<tools>\n{\"type\": \"function\", \"function\": {\"name_for_human\": \"mobile_use\", \"name\": \"mobile_use\", \"description\": \"Use a touchscreen to interact with a mobile device, and take screenshots.\n* This is an interface to a mobile device with touchscreen. You can perform actions like clicking, typing, swiping, etc.\n* Some applications may take time to start or process actions, so you may need to wait and take successive screenshots to see the results of your actions.\n* The screen's resolution is 1000x1000.\n* Make sure to click any buttons, links, icons, etc with the cursor tip in the center of the element. Don't click boxes on their edges unless asked.\", \"parameters\": {\"properties\": {\"action\": {\"description\": \"The action to perform. The available actions are:\n* `key`: Perform a key event on the mobile device.\n - This supports adb's `keyevent` syntax.\n - Examples: \"volume_up\", \"volume_down\", \"power\", \"camera\", \"clear\".\n* `click`: Click the point on the screen with coordinate (x, y).\n* `long_press`: Press the point on the screen with coordinate (x, y) for specified seconds.\n* `swipe`: Swipe from the starting point with coordinate (x, y) to the end point with coordinates2 (x2, y2).\n* `type`: Input the specified text into the activated input box.\n* `system_button`: Press the system button.\n* `open`: Open an app on the device.\n* `wait`: Wait specified seconds for the change to happen.\n* `answer`: Terminate the current task and output the answer.\n* `interact`: Resolve the blocking window by interacting with the user.\n* `terminate`: Terminate the current task and report its completion status.\", \"enum\": [\"key\", \"click\", \"long_press\", \"swipe\", \"type\", \"system_button\", \"open\", \"wait\", \"answer\", \"interact\", \"terminate\"], \"type\": \"string\"}, \"coordinate\": {\"description\": \"(x, y): The x (pixels from the left edge) and y (pixels from the top edge) coordinates to move the mouse to. Required only by `action=click`, `action=long_press`, and `action=swipe`.\", \"type\": \"array\"}, \"coordinate2\": {\"description\": \"(x, y): The x (pixels from the left edge) and y (pixels from the top edge) coordinates to move the mouse to. Required only by `action=swipe`.\", \"type\": \"array\"}, \"text\": {\"description\": \"Required only by `action=key`, `action=type`, `action=open`, `action=answer`,and `action=interact`.\", \"type\": \"string\"}, \"time\": {\"description\": \"The seconds to wait. Required only by `action=long_press` and `action=wait`.\", \"type\": \"number\"}, \"button\": {\"description\": \"Back means returning to the previous interface, Home means returning to the desktop, Menu means opening the application background menu, and Enter means pressing the enter. Required only by `action=system_button`\", \"enum\": [\"Back\", \"Home\", \"Menu\", \"Enter\"], \"type\": \"string\"}, \"status\": {\"description\": \"The status of the task. Required only by `action=terminate`.\", \"type\": \"string\", \"enum\": [\"success\", \"failure\"]}}, \"required\": [\"action\"], \"type\": \"object\"}, \"args_format\": \"Format the arguments as a JSON object.\"}}\n</tools>\n\nFor each function call, return a json object with function name and arguments within <tool_call></tool_call> XML tags:\n<tool_call>\n{\"name\": <function-name>, \"arguments\": <args-json-object>}\n</tool_call>\n\n# Response format\n\nResponse format for every step:\n1) Action: a short imperative describing what to do in the UI.\n2) A single <tool_call>...</tool_call> block containing only the JSON: {\"name\": <function-name>, \"arguments\": <args-json-object>}.\n\nRules:\n- Output exactly in the order: Action, <tool_call>.\n- Be brief: one for Action.\n- Do not output anything else outside those two parts.\n- If finishing, use action=terminate in the tool call." }] }, { "role": "user", "content": [{ "text": "\nPlease generate the next move according to the UI screenshot, instruction and previous actions.\n\nInstruction: 帮我在携程搜一下今天济南喜来登酒店的价格\n\nPrevious actions:\nStep 1: 点击携程旅行应用图标以启动携程旅行预订应用程序。\nStep 2: 等待促销启动画面自动过渡到携程主应用界面。" }, { "image": "http://nlp-mobile-agent.oss-cn-zhangjiakou.aliyuncs.com/computer-use%2Fscreenshot%2Fscreenshot_a84122ac_853a630315784b64988492c9c07b5534.png" }] }, { "role": "assistant", "content": [{ "text": "Action: 点击应用更新通知弹窗右上角的关闭按钮(X图标)以将其关闭。\n<tool_call>\n{\"name\": \"mobile_use\", \"arguments\": {\"action\": \"click\", \"coordinate\": [789, 280]}}\n</tool_call>" }] }, { "role": "user", "content": [{ "image": "http://nlp-mobile-agent.oss-cn-zhangjiakou.aliyuncs.com/computer-use%2Fscreenshot%2Fscreenshot_6010a769_089b9b35b1904913bd5df492563b02b9.png" }] }, { "role": "assistant", "content": [{ "text": "Action: 点击搜索栏中的“济南的酒店”文本区域,以激活搜索输入框并准备修改搜索词。\n<tool_call>\n{\"name\": \"mobile_use\", \"arguments\": {\"action\": \"click\", \"coordinate\": [112, 134]}}\n</tool_call>" }] }, { "role": "user", "content": [{ "image": "http://nlp-mobile-agent.oss-cn-zhangjiakou.aliyuncs.com/computer-use%2Fscreenshot%2Fscreenshot_47446db4_fd4a5022002c4db99f110d5c7261fea2.png" }] }, { "role": "assistant", "content": [{ "text": "Action: 点击显示“厦门”的位置字段,将搜索位置从厦门更改为济南。\n<tool_call>\n{\"name\": \"mobile_use\", \"arguments\": {\"action\": \"click\", \"coordinate\": [156, 347]}}\n</tool_call>" }] }, { "role": "user", "content": [{ "image": "http://nlp-mobile-agent.oss-cn-zhangjiakou.aliyuncs.com/computer-use%2Fscreenshot%2Fscreenshot_3832132c_8c55861c1716467e802a3554402f3580.png" }] }, { "role": "assistant", "content": [{ "text": "Action: 在搜索输入框中键入“济南”,以指定酒店搜索的城市位置。\n<tool_call>\n{\"name\": \"mobile_use\", \"arguments\": {\"action\": \"type\", \"text\": \"济南\"}}\n</tool_call>" }] }, { "role": "user", "content": [{ "image": "http://nlp-mobile-agent.oss-cn-zhangjiakou.aliyuncs.com/computer-use%2Fscreenshot%2Fscreenshot_ff247bac_39c3e20be32c4baf8677a2b6b61bc021.png" }] }] 步骤3. 解析模型输出 由于模型在处理图像时会进行内部缩放,其返回的坐标是基于缩放后图像的归一化坐标。为在原图上准确执行 GUI 操作,需要进行坐标映射。 提取 Tool Call 字段 首先从模型返回的字符串中提取 Tool Call: 复制import re import json def extract_tool_calls(text): """ 从模型输出中提取所有 <tool_call> 块 参数: text: 模型返回的文本 返回: actions: 解析后的操作列表 """ pattern = re.compile(r'<tool_call>(.*?)</tool_call>', re.DOTALL | re.IGNORECASE) blocks = pattern.findall(text) actions = [] for blk in blocks: blk = blk.strip() try: actions.append(json.loads(blk)) except json.JSONDecodeError as e: print(f'解析失败: {e} | 片段: {blk[:80]}...') return actions 坐标映射函数 模型处理图像时会进行内部缩放,以下函数用于计算缩放后的尺寸: 复制import math from PIL import Image def smart_resize(height, width, factor=32, min_pixels=32*32*4, max_pixels=32*32*1280, max_long_side=8192): """ 计算模型内部缩放后的图像尺寸 参数: height: 原始图像高度 width: 原始图像宽度 factor: 分辨率因子(固定为 16) min_pixels: 最小像素值 max_pixels: 最大像素值 max_long_side: 最长边限制 返回: (h_bar, w_bar): 缩放后的高度和宽度 """ def round_by_factor(number, factor): return round(number / factor) * factor def ceil_by_factor(number, factor): return math.ceil(number / factor) * factor def floor_by_factor(number, factor): return math.floor(number / factor) * factor if height < 2 or width < 2: raise ValueError(f"height:{height} or width:{width} must be larger than factor:{factor}") elif max(height, width) / min(height, width) > 200: raise ValueError(f"absolute aspect ratio must be smaller than 200, got {height} / {width}") # 限制最长边 if max(height, width) > max_long_side: beta = max(height, width) / max_long_side height, width = int(height / beta), int(width / beta) # 计算缩放后的尺寸 h_bar = round_by_factor(height, factor) w_bar = round_by_factor(width, factor) if h_bar * w_bar > max_pixels: beta = math.sqrt((height * width) / max_pixels) h_bar = floor_by_factor(height / beta, factor) w_bar = floor_by_factor(width / beta, factor) elif h_bar * w_bar < min_pixels: beta = math.sqrt(min_pixels / (height * width)) h_bar = ceil_by_factor(height * beta, factor) w_bar = ceil_by_factor(width * beta, factor) return h_bar, w_bar 步骤4. 执行GUI操作 解析动作指令后,接下来演示如何使用pyautogui库模拟用户的鼠标点击、键盘输入、滚动等物理 GUI 操作。 复制import pyautogui import pyperclip import time from PIL import Image import os class ComputerTools: """电脑端 GUI 操作工具类""" def __init__(self): self.image_info = None def load_image_info(self, path): """加载图像尺寸信息""" width, height = Image.open(path).size self.image_info = (width, height) def get_screenshot(self, image_path, retry_times=3): """获取桌面截图""" if os.path.exists(image_path): os.remove(image_path) for i in range(retry_times): screenshot = pyautogui.screenshot() screenshot.save(image_path) if os.path.exists(image_path): self.load_image_info(image_path) return True else: time.sleep(0.1) return False def reset(self): """显示桌面""" pyautogui.hotkey('win', 'd') def press_key(self, keys): """按键操作""" if isinstance(keys, list): cleaned_keys = [] for key in keys: if isinstance(key, str): # 处理键名格式 if key.startswith("keys=["): key = key[6:] if key.endswith("]"): key = key[:-1] if key.startswith("['") or key.startswith('["'): key = key[2:] if len(key) > 2 else key if key.endswith("']") or key.endswith('"]'): key = key[:-2] if len(key) > 2 else key key = key.strip() # 转换键名 key_map = { "arrowleft": "left", "arrowright": "right", "arrowup": "up", "arrowdown": "down" } key = key_map.get(key, key) cleaned_keys.append(key) else: cleaned_keys.append(key) keys = cleaned_keys else: keys = [keys] if len(keys) > 1: pyautogui.hotkey(*keys) else: pyautogui.press(keys[0]) def type(self, text): """输入文本(使用剪贴板方式支持中文)""" pyperclip.copy(text) pyautogui.keyDown('ctrl') pyautogui.keyDown('v') pyautogui.keyUp('v') pyautogui.keyUp('ctrl') def mouse_move(self, x, y): """移动鼠标到指定坐标""" pyautogui.moveTo(x, y) time.sleep(0.1) pyautogui.moveTo(x, y) def left_click(self, x, y): """左键点击""" pyautogui.moveTo(x, y) time.sleep(0.1) pyautogui.click() def left_click_drag(self, x, y): """从当前位置拖拽到指定坐标""" pyautogui.dragTo(x, y, duration=0.5) pyautogui.moveTo(x, y) def right_click(self, x, y): """右键点击""" pyautogui.moveTo(x, y) time.sleep(0.1) pyautogui.rightClick() def middle_click(self, x, y): """中键点击""" pyautogui.moveTo(x, y) time.sleep(0.1) pyautogui.middleClick() def double_click(self, x, y): """双击""" pyautogui.moveTo(x, y) time.sleep(0.1) pyautogui.doubleClick() def triple_click(self, x, y): """三击""" pyautogui.moveTo(x, y) time.sleep(0.1) pyautogui.tripleClick() def scroll(self, pixels): """滚轮滚动""" pyautogui.scroll(pixels) 步骤5. 完整自动化流程 将以上所有步骤整合到一个完整的自动化流程中,循环执行截图 模型推理 执行GUI操作,直到任务完成。 复制import os import dashscope import time def run_gui_automation(instruction, max_step=30): """ 运行完整的 GUI 自动化流程 参数: instruction: 用户指令 max_step: 最大执行步骤数 """ # 配置 API dashscope.api_key = os.getenv("DASHSCOPE_API_KEY") dashscope.base_http_api_url = 'https://dashscope.aliyuncs.com/api/v1' model_name = 'gui-plus-2026-02-26' # 初始化工具 computer_tools = ComputerTools() computer_tools.reset() # 显示桌面 # 创建输出目录 output_dir = os.path.join(os.path.expanduser("~"), "Desktop", "gui_automation") os.makedirs(output_dir, exist_ok=True) # 对话历史 history = [] stop_flag = False print(f"[任务] {instruction}") print("=" * 60) for step_id in range(max_step): if stop_flag: break print(f"\n[步骤 {step_id + 1}]") # 1. 截图 screen_shot = os.path.join(output_dir, f'screenshot_{step_id}.png') computer_tools.get_screenshot(screen_shot) # 2. 构造消息 messages = get_messages(screen_shot, instruction, history, model_name) # 3. 调用模型 response = dashscope.MultiModalConversation.call( model=model_name, messages=messages, vl_high_resolution_images=True, stream=False ) output_text = response.output.choices[0].message.content[0]['text'] print(f"[模型输出]\n{output_text}\n") # 4. 解析操作 action_list = extract_tool_calls(output_text) if not action_list: print("未提取到有效操作") break # 5. 执行操作 for action_id, action in enumerate(action_list): action_parameter = action['arguments'] action_type = action_parameter['action'] # 获取图像尺寸用于坐标映射 dummy_image = Image.open(screen_shot) resized_height, resized_width = smart_resize( dummy_image.height, dummy_image.width, factor=16, min_pixels=3136, max_pixels=1003520 * 200 ) # 映射坐标(从归一化坐标 1000x1000 映射到实际尺寸) for key in ['coordinate', 'coordinate1', 'coordinate2']: if key in action_parameter: action_parameter[key][0] = int(action_parameter[key][0] / 1000 * resized_width) action_parameter[key][1] = int(action_parameter[key][1] / 1000 * resized_height) # 执行对应操作 if action_type in ['click', 'left_click']: computer_tools.left_click( action_parameter['coordinate'][0], action_parameter['coordinate'][1] ) print(f"[OK] 左键点击 ({action_parameter['coordinate'][0]}, {action_parameter['coordinate'][1]})") elif action_type == 'mouse_move': computer_tools.mouse_move( action_parameter['coordinate'][0], action_parameter['coordinate'][1] ) print(f"[OK] 移动鼠标到 ({action_parameter['coordinate'][0]}, {action_parameter['coordinate'][1]})") elif action_type == 'middle_click': computer_tools.middle_click( action_parameter['coordinate'][0], action_parameter['coordinate'][1] ) print(f"[OK] 中键点击") elif action_type in ['right click', 'right_click']: computer_tools.right_click( action_parameter['coordinate'][0], action_parameter['coordinate'][1] ) print(f"[OK] 右键点击") elif action_type in ['key', 'hotkey']: computer_tools.press_key(action_parameter['keys']) print(f"[OK] 按键 {action_parameter['keys']}") elif action_type == 'type': text = action_parameter['text'] computer_tools.type(text) print(f"[OK] 输入文本: {text}") elif action_type == 'drag': computer_tools.left_click_drag( action_parameter['coordinate'][0], action_parameter['coordinate'][1] ) print(f"[OK] 拖拽到 ({action_parameter['coordinate'][0]}, {action_parameter['coordinate'][1]})") elif action_type == 'scroll': if 'coordinate' in action_parameter: computer_tools.mouse_move( action_parameter['coordinate'][0], action_parameter['coordinate'][1] ) computer_tools.scroll(action_parameter.get("pixels", 1)) print(f"[OK] 滚动 {action_parameter.get('pixels', 1)} 像素") elif action_type in ['computer_double_click', 'double_click']: computer_tools.double_click( action_parameter['coordinate'][0], action_parameter['coordinate'][1] ) print(f"[OK] 双击") elif action_type == 'wait': time.sleep(action_parameter.get('time', 2)) print(f"[OK] 等待 {action_parameter.get('time', 2)} 秒") elif action_type == 'answer': print(f"[OK] 任务完成: {action_parameter.get('text', '')}") stop_flag = True break elif action_type in ['stop', 'terminate', 'done']: print(f"[OK] 任务终止: {action_parameter.get('status', 'success')}") stop_flag = True break else: print(f"未知操作类型: {action_type}") # 6. 保存历史 history.append({ 'output': output_text, 'image': screen_shot }) time.sleep(2) # 操作间隔 print("\n" + "=" * 60) print(f"[完成] 共执行 {len(history)} 步") # 使用示例 if __name__ == '__main__': run_gui_automation( instruction='帮我打开chrome,在百度中搜索阿里巴巴', max_step=30 ) 电脑端完整示例代码复制import os import re import json import math import time import pyautogui import pyperclip import dashscope from PIL import Image # ===================== 步骤1:System Prompt ===================== system_prompt = """# Tools You may call one or more functions to assist with the user query. You are provided with function signatures within <tools></tools> XML tags: <tools> {"type": "function", "function": {"name": "computer_use", "description": "Use a mouse and keyboard to interact with a computer, and take screenshots.\\n* This is an interface to a desktop GUI. You do not have access to a terminal or applications menu. You must click on desktop icons to start applications.\\n* Some applications may take time to start or process actions, so you may need to wait and take successive screenshots to see the results of your actions. E.g. if you click on Firefox and a window doesn't open, try wait and taking another screenshot.\\n* The screen's resolution is 1000x1000.\\n* Make sure to click any buttons, links, icons, etc with the cursor tip in the center of the element. Don't click boxes on their edges unless asked.", "parameters": {"properties": {"action": {"description": "The action to perform. The available actions are:\\n* `key`: Performs key down presses on the arguments passed in order, then performs key releases in reverse order.\\n* `type`: Type a string of text on the keyboard.\\n* `mouse_move`: Move the cursor to a specified (x, y) pixel coordinate on the screen.\\n* `left_click`: Click the left mouse button at a specified (x, y) pixel coordinate on the screen.\\n* `left_click_drag`: Click and drag the cursor to a specified (x, y) pixel coordinate on the screen.\\n* `right_click`: Click the right mouse button at a specified (x, y) pixel coordinate on the screen.\\n* `middle_click`: Click the middle mouse button at a specified (x, y) pixel coordinate on the screen.\\n* `double_click`: Double-click the left mouse button at a specified (x, y) pixel coordinate on the screen.\\n* `triple_click`: Triple-click the left mouse button at a specified (x, y) pixel coordinate on the screen (simulated as double-click since it's the closest action).\\n* `scroll`: Performs a scroll of the mouse scroll wheel.\\n* `hscroll`: Performs a horizontal scroll (mapped to regular scroll).\\n* `wait`: Wait specified seconds for the change to happen.\\n* `terminate`: Terminate the current task and report its completion status.\\n* `answer`: Answer a question.\\n* `interact`: Resolve the blocking window by interacting with the user.", "enum": ["key", "type", "mouse_move", "left_click", "left_click_drag", "right_click", "middle_click", "double_click", "triple_click", "scroll", "hscroll", "wait", "terminate", "answer", "interact"], "type": "string"}, "keys": {"description": "Required only by `action=key`.", "type": "array"}, "text": {"description": "Required only by `action=type`, `action=answer` and `action=interact`.", "type": "string"}, "coordinate": {"description": "(x, y): The x (pixels from the left edge) and y (pixels from the top edge) coordinates to move the mouse to. Required only by `action=mouse_move` and `action=left_click_drag`.", "type": "array"}, "pixels": {"description": "The amount of scrolling to perform. Positive values scroll up, negative values scroll down. Required only by `action=scroll` and `action=hscroll`.", "type": "number"}, "time": {"description": "The seconds to wait. Required only by `action=wait`.", "type": "number"}, "status": {"description": "The status of the task. Required only by `action=terminate`.", "type": "string", "enum": ["success", "failure"]}}, "required": ["action"], "type": "object"}}} </tools> For each function call, return a json object with function name and arguments within <tool_call></tool_call> XML tags: <tool_call> {"name": <function-name>, "arguments": <args-json-object>} </tool_call> # Response format Response format for every step: 1) Action: a short imperative describing what to do in the UI. 2) A single <tool_call>...</tool_call> block containing only the JSON: {"name": <function-name>, "arguments": <args-json-object>}. Rules: - Output exactly in the order: Action, <tool_call>. - Be brief: one for Action. - Do not output anything else outside those two parts. - If finishing, use action=terminate in the tool call.""" # ===================== 步骤2:构造多轮对话消息 ===================== def get_messages(image, instruction, history_output, system_prompt): history_n = 4 current_step = len(history_output) history_start_idx = max(0, current_step - history_n) previous_actions = [] for i in range(history_start_idx): if i < len(history_output): history_output_str = history_output[i]['output'] if 'Action:' in history_output_str and '<tool_call>' JsonDocument.Parse(message); } catch (OperationCanceledException) { Console.WriteLine("Message reception was canceled."); return null; } } private static string GenerateTaskId() { return Guid.NewGuid().ToString("N").Substring(0, 32); } private static string CreateCommand(string action, string taskId, string streaming, object payload) { var command = new { header = new { action, task_id = taskId, streaming }, payload }; return JsonSerializer.Serialize(command); } } 示例使用以下目录结构:复制my-php-project/ ├── composer.json ├── vendor/ └── index.php composer.json 文件内容(按需调整依赖版本):复制{ "require": { "react/event-loop": "^1.3", "react/socket": "^1.11", "react/stream": "^1.2", "react/http": "^1.1", "ratchet/pawl": "^0.4" }, "autoload": { "psr-4": { "App\\": "src/" } } } index.php 文件内容:复制<!-- SSML 支持要求参见上方说明 --> <?php require __DIR__ . '/vendor/autoload.php'; use Ratchet\Client\Connector; use React\EventLoop\Loop; use React\Socket\Connector as SocketConnector; // 如果未配置环境变量,请将下面一行替换为:$api_key = "sk-xxx" $api_key = getenv("DASHSCOPE_API_KEY"); $websocket_url = 'wss://dashscope.aliyuncs.com/api-ws/v1/inference/'; // WebSocket 服务地址 $output_file = 'output.mp3'; // 输出文件路径 $loop = Loop::get(); if (file_exists($output_file)) { // 清空文件内容 file_put_contents($output_file, ''); } // 创建自定义连接器 $socketConnector = new SocketConnector($loop, [ 'tcp' => [ 'bindto' => '0.0.0.0:0', ], 'tls' => [ 'verify_peer' => false, 'verify_peer_name' => false, ], ]); $connector = new Connector($loop, $socketConnector); $headers = [ 'Authorization' => 'bearer ' . $api_key, 'X-DashScope-DataInspection' => 'enable' ]; $connector($websocket_url, [], $headers)->then(function ($conn) use ($loop, $output_file) { echo "Connected to WebSocket server\n"; // 生成任务 ID $taskId = generateTaskId(); // 发送 run-task 命令 sendRunTaskMessage($conn, $taskId); // 定义发送 continue-task 命令的函数 $sendContinueTask = function() use ($conn, $loop, $taskId) { // 发送 continue-task 命令。使用 SSML 时仅可发送一次。 $continueTaskMessage = json_encode([ "header" => [ "action" => "continue-task", "task_id" => $taskId, "streaming" => "duplex" ], "payload" => [ "input" => [ // 特殊字符需要转义 "text" => "<speak rate=\"2\">My speaking rate is faster than a normal person's.</speak>" ] ] ]); $conn->send($continueTaskMessage); // 发送 finish-task 命令 sendFinishTaskMessage($conn, $taskId); }; // 标记是否已收到 task-started 事件 $taskStarted = false; // 监听消息 $conn->on('message', function($msg) use ($conn, $sendContinueTask, $loop, &$taskStarted, $taskId, $output_file) { if ($msg->isBinary()) { // 将二进制数据写入本地文件 file_put_contents($output_file, $msg->getPayload(), FILE_APPEND); } else { // 处理非二进制消息 $response = json_decode($msg, true); if (isset($response['header']['event'])) { handleEvent($conn, $response, $sendContinueTask, $loop, $taskId, $taskStarted); } else { echo "Unknown message format\n"; } } }); // 监听连接关闭 $conn->on('close', function($code = null, $reason = null) { echo "Connection closed\n"; if ($code !== null) { echo "Close code: " . $code . "\n"; } if ($reason !== null) { echo "Close reason: " . $reason . "\n"; } }); }, function ($e) { echo "Could not connect: {$e->getMessage()}\n"; }); $loop->run(); /** * 生成任务 ID * @return string */ function generateTaskId(): string { return bin2hex(random_bytes(16)); } /** * 发送 run-task 命令 * @param $conn * @param $taskId */ function sendRunTaskMessage($conn, $taskId) { $runTaskMessage = json_encode([ "header" => [ "action" => "run-task", "task_id" => $taskId, "streaming" => "duplex" ], "payload" => [ "task_group" => "audio", "task" => "tts", "function" => "SpeechSynthesizer", "model" => "cosyvoice-v3-flash", "parameters" => [ "text_type" => "PlainText", "voice" => "longanyang", "format" => "mp3", "sample_rate" => 22050, "volume" => 50, "rate" => 1, "pitch" => 1, // 启用 enable_ssml: true 后,仅发送一次 continue-task "enable_ssml" => true ], "input" => (object) [] ] ]); echo "Preparing to send run-task command: " . $runTaskMessage . "\n"; $conn->send($runTaskMessage); echo "run-task command sent\n"; } /** * 读取音频文件 * @param string $filePath * @return bool|string */ function readAudioFile(string $filePath) { $voiceData = file_get_contents($filePath); if ($voiceData === false) { echo "Failed to read audio file\n"; } return $voiceData; } /** * 分割音频数据 * @param string $data * @param int $chunkSize * @return array */ function splitAudioData(string $data, int $chunkSize): array { return str_split($data, $chunkSize); } /** * 发送 finish-task 命令 * @param $conn * @param $taskId */ function sendFinishTaskMessage($conn, $taskId) { $finishTaskMessage = json_encode([ "header" => [ "action" => "finish-task", "task_id" => $taskId, "streaming" => "duplex" ], "payload" => [ "input" => (object) [] ] ]); echo "Preparing to send finish-task command: " . $finishTaskMessage . "\n"; $conn->send($finishTaskMessage); echo "finish-task command sent\n"; } /** * 处理事件 * @param $conn * @param $response * @param $sendContinueTask * @param $loop * @param $taskId * @param $taskStarted */ function handleEvent($conn, $response, $sendContinueTask, $loop, $taskId, &$taskStarted) { switch ($response['header']['event']) { case 'task-started': echo "Task started, sending continue-task command...\n"; $taskStarted = true; // 发送 continue-task 命令 $sendContinueTask(); break; case 'result-generated': // 忽略 result-generated 事件 break; case 'task-finished': echo "Task finished\n"; $conn->close(); break; case 'task-failed': echo "Task failed\n"; echo "Error code: " . $response['header']['error_code'] . "\n"; echo "Error message: " . $response['header']['error_message'] . "\n"; $conn->close(); break; case 'error': echo "Error: " . $response['payload']['message'] . "\n"; break; default: echo "Unknown event: " . $response['header']['event'] . "\n"; break; } // 任务完成后关闭连接 if ($response['header']['event'] == 'task-finished') { // 等待 1 秒确保所有数据已传输 $loop->addTimer(1, function() use ($conn) { $conn->close(); echo "Client closes connection\n"; }); } // 如果未收到 task-started 事件,关闭连接 if (!$taskStarted && in_array($response['header']['event'], ['task-failed', 'error'])) { $conn->close(); } } 安装所需依赖:复制npm install ws npm install uuid 示例代码:复制// SSML 支持要求参见上方说明 import fs from 'fs'; import WebSocket from 'ws'; import { v4 as uuid } from 'uuid'; // 用于生成 UUID // 如果未配置环境变量,请将下面一行替换为:const apiKey = "sk-xxx" const apiKey = process.env.DASHSCOPE_API_KEY; const url = 'wss://dashscope.aliyuncs.com/api-ws/v1/inference/'; // 输出文件路径 const outputFilePath = 'output.mp3'; // 清空输出文件 fs.writeFileSync(outputFilePath, ''); // 创建 WebSocket 客户端 const ws = new WebSocket(url, { headers: { Authorization: `bearer ${apiKey}`, 'X-DashScope-DataInspection': 'enable' } }); let taskStarted = false; let taskId = uuid(); ws.on('open', () => { console.log('Connected to WebSocket server'); // 发送 run-task 命令 const runTaskMessage = JSON.stringify({ header: { action: 'run-task', task_id: taskId, streaming: 'duplex' }, payload: { task_group: 'audio', task: 'tts', function: 'SpeechSynthesizer', model: 'cosyvoice-v3-flash', parameters: { text_type: 'PlainText', voice: 'longanyang', // 音色 format: 'mp3', // 音频格式 sample_rate: 22050, // 采样率 volume: 50, // 音量 rate: 1, // 语速 pitch: 1, // 音调 enable_ssml: true // 是否启用 SSML。启用后仅可发送一次 continue-task 命令,否则会报错 "Text request limit violated, expected 1." }, input: {} } }); ws.send(runTaskMessage); console.log('Sent run-task message'); }); const fileStream = fs.createWriteStream(outputFilePath, { flags: 'a' }); ws.on('message', (data, isBinary) => { if (isBinary) { // 将二进制数据写入文件 fileStream.write(data); } else { const message = JSON.parse(data); switch (message.header.event) { case 'task-started': taskStarted = true; console.log('Task has started'); // 发送 continue-task 命令 sendContinueTasks(ws); break; case 'task-finished': console.log('Task has finished'); ws.close(); fileStream.end(() => { console.log('File stream has been closed'); }); break; case 'task-failed': console.error('Task failed: ', message.header.error_message); ws.close(); fileStream.end(() => { console.log('File stream has been closed'); }); break; default: // 可在此处理 result-generated 事件 break; } } }); function sendContinueTasks(ws) { if (taskStarted) { // 发送 continue-task 命令。使用 SSML 时仅可发送一次。 const continueTaskMessage = JSON.stringify({ header: { action: 'continue-task', task_id: taskId, streaming: 'duplex' }, payload: { input: { // 特殊字符需要转义 text: '<speak rate="2">My speaking rate is faster than a normal person\'s.</speak>' } } }); ws.send(continueTaskMessage); // 发送 finish-task 命令 const finishTaskMessage = JSON.stringify({ header: { action: 'finish-task', task_id: taskId, streaming: 'duplex' }, payload: { input: {} } }); ws.send(finishTaskMessage); } } ws.on('close', () => { console.log('Disconnected from the WebSocket server'); }); Java 建议使用 Java SDK。以下 Java WebSocket 示例需要以下依赖: Java-WebSocket jackson-databind 使用 Maven 或 Gradle 管理依赖:pom.xmlbuild.gradle复制<dependencies> <!-- WebSocket 客户端 --> <dependency> <groupId>org.java-websocket</groupId> <artifactId>Java-WebSocket</artifactId> <version>1.5.3</version> </dependency> <!-- JSON 处理 --> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.13.0</version> </dependency> </dependencies> Java 代码:复制import com.fasterxml.jackson.databind.ObjectMapper; import org.java_websocket.client.WebSocketClient; import org.java_websocket.handshake.ServerHandshake; import java.io.FileOutputStream; import java.io.IOException; import java.net.URI; import java.nio.ByteBuffer; import java.util.*; /** * SSML 功能说明: * 1. 发送 run-task 命令时,将 enable_ssml 参数设为 true 以启用 SSML 支持。 * 2. 通过 continue-task 命令发送包含 SSML 的文本,仅可发送一次。 * 3. SSML 仅支持 cosyvoice-v3-flash 和 cosyvoice-v3-plus 模型的克隆音色,以及音色列表中标记为支持 SSML 的系统音色(例如 cosyvoice-v3-flash 的 longanyang 音色)。 */ public class TTSWebSocketClient extends WebSocketClient { private final String taskId = UUID.randomUUID().toString(); private final String outputFile = "output_" + System.currentTimeMillis() + ".mp3"; private boolean taskFinished = false; public TTSWebSocketClient(URI serverUri, Map<String, String> headers) { super(serverUri, headers); } @Override public void onOpen(ServerHandshake serverHandshake) { System.out.println("Connection successful"); // 发送 run-task 命令 // 启用 enable_ssml 后,仅可发送一次 continue-task 命令。 // 否则会报错 "Text request limit violated, expected 1." String runTaskCommand = "{ \"header\": { \"action\": \"run-task\", \"task_id\": \"" + taskId + "\", \"streaming\": \"duplex\" }, \"payload\": { \"task_group\": \"audio\", \"task\": \"tts\", \"function\": \"SpeechSynthesizer\", \"model\": \"cosyvoice-v3-flash\", \"parameters\": { \"text_type\": \"PlainText\", \"voice\": \"longanyang\", \"format\": \"mp3\", \"sample_rate\": 22050, \"volume\": 50, \"rate\": 1, \"pitch\": 1, \"enable_ssml\": true }, \"input\": {} }}"; send(runTaskCommand); } @Override public void onMessage(String message) { System.out.println("Received message from server: " + message); try { // 解析 JSON 消息 Map<String, Object> messageMap = new ObjectMapper().readValue(message, Map.class); if (messageMap.containsKey("header")) { Map<String, Object> header = (Map<String, Object>) messageMap.get("header"); if (header.containsKey("event")) { String event = (String) header.get("event"); if ("task-started".equals(event)) { System.out.println("Received task-started event from server"); // 发送 continue-task 命令。使用 SSML 时仅可发送一次。 // 特殊字符需要转义。 sendContinueTask("<speak rate=\\\"2\\\">My speaking rate is faster than a normal person's.</speak>"); // 发送 finish-task 命令 sendFinishTask(); } else if ("task-finished".equals(event)) { System.out.println("Received task-finished event from server"); taskFinished = true; closeConnection(); } else if ("task-failed".equals(event)) { System.out.println("Task failed: " + message); closeConnection(); } } } } catch (Exception e) { System.err.println("An exception occurred: " + e.getMessage()); } } @Override public void onMessage(ByteBuffer message) { System.out.println("Size of received binary audio data: " + message.remaining()); try (FileOutputStream fos = new FileOutputStream(outputFile, true)) { byte[] buffer = new byte[message.remaining()]; message.get(buffer); fos.write(buffer); System.out.println("Audio data has been written to the local file " + outputFile); } catch (IOException e) { System.err.println("Failed to write audio data to local file: " + e.getMessage()); } } @Override public void onClose(int code, String reason, boolean remote) { System.out.println("Connection closed: " + reason + " (" + code + ")"); } @Override public void onError(Exception ex) { System.err.println("Error: " + ex.getMessage()); ex.printStackTrace(); } private void sendContinueTask(String text) { String command = "{ \"header\": { \"action\": \"continue-task\", \"task_id\": \"" + taskId + "\", \"streaming\": \"duplex\" }, \"payload\": { \"input\": { \"text\": \"" + text + "\" } }}"; send(command); } private void sendFinishTask() { String command = "{ \"header\": { \"action\": \"finish-task\", \"task_id\": \"" + taskId + "\", \"streaming\": \"duplex\" }, \"payload\": { \"input\": {} }}"; send(command); } private void closeConnection() { if (!isClosed()) { close(); } } public static void main(String[] args) { try { // 如果未配置环境变量,请将下面一行替换为:String apiKey = "sk-xxx" String apiKey = System.getenv("DASHSCOPE_API_KEY"); if (apiKey == null || apiKey.isEmpty()) { System.err.println("Please set the DASHSCOPE_API_KEY environment variable"); return; } Map<String, String> headers = new HashMap<>(); headers.put("Authorization", "bearer " + apiKey); TTSWebSocketClient client = new TTSWebSocketClient(new URI("wss://dashscope.aliyuncs.com/api-ws/v1/inference/"), headers); client.connect(); while (!client.isClosed() && !client.taskFinished) { Thread.sleep(1000); } } catch (Exception e) { System.err.println("Failed to connect to WebSocket service: " + e.getMessage()); e.printStackTrace(); } } } Python 建议使用 Python SDK。运行示例前请先安装依赖:复制pip uninstall websocket-client pip uninstall websocket pip install websocket-client 不要将 Python 文件命名为 websocket.py,否则会导致 AttributeError: module 'websocket' has no attribute 'WebSocketApp'。复制# SSML 功能说明: # 1. 发送 run-task 命令时,将 enable_ssml 参数设为 true 以启用 SSML 支持。 # 2. 通过 continue-task 命令发送包含 SSML 的文本,仅可发送一次。 # 3. SSML 仅支持 cosyvoice-v3-flash 和 cosyvoice-v3-plus 模型的克隆音色,以及音色列表中标记为支持 SSML 的系统音色(例如 cosyvoice-v3-flash 的 longanyang 音色)。 import websocket import json import uuid import os import time class TTSClient: def __init__(self, api_key, uri): """ 初始化 TTSClient 实例。 参数: api_key (str): 用于认证的 API Key。 uri (str): WebSocket 服务地址。 """ self.api_key = api_key # 替换为你的 API Key self.uri = uri # 替换为你的 WebSocket 地址 self.task_id = str(uuid.uuid4()) # 生成唯一任务 ID self.output_file = f"output_{int(time.time())}.mp3" # 输出音频文件路径 self.ws = None # WebSocketApp 实例 self.task_started = False # 是否已收到 task-started self.task_finished = False # 是否已收到 task-finished/task-failed def on_open(self, ws): """ WebSocket 连接建立时的回调函数。 发送 run-task 命令启动语音合成任务。 """ print("WebSocket connection established") # 构建 run-task 命令 run_task_cmd = { "header": { "action": "run-task", "task_id": self.task_id, "streaming": "duplex" }, "payload": { "task_group": "audio", "task": "tts", "function": "SpeechSynthesizer", "model": "cosyvoice-v3-flash", "parameters": { "text_type": "PlainText", "voice": "longanyang", "format": "mp3", "sample_rate": 22050, "volume": 50, "rate": 1, "pitch": 1, # 启用 enable_ssml: true 后,仅发送一次 continue-task "enable_ssml": True }, "input": {} } } # 发送 run-task 命令 ws.send(json.dumps(run_task_cmd)) print("run-task command sent") def on_message(self, ws, message): """ 收到消息时的回调函数。 分别处理文本消息和二进制消息。 """ if isinstance(message, str): # 处理 JSON 文本消息 try: msg_json = json.loads(message) print(f"Received JSON message: {msg_json}") if "header" in msg_json: header = msg_json["header"] if "event" in header: event = header["event"] if event == "task-started": print("Task started") self.task_started = True # 发送 continue-task 命令。使用 SSML 时仅可发送一次。 # 特殊字符需要转义。 self.send_continue_task("<speak rate=\"2\">My speaking rate is faster than a normal person's.</speak>") # 发送完 continue-task 后发送 finish-task self.send_finish_task() elif event == "task-finished": print("Task finished") self.task_finished = True self.close(ws) elif event == "task-failed": error_msg = msg_json.get("error_message", "Unknown error") print(f"Task failed: {error_msg}") self.task_finished = True self.close(ws) except json.JSONDecodeError as e: print(f"JSON parsing failed: {e}") else: # 处理二进制消息(音频数据) print(f"Received binary message, size: {len(message)} bytes") with open(self.output_file, "ab") as f: f.write(message) print(f"Audio data has been written to the local file {self.output_file}") def on_error(self, ws, error): """错误回调。""" print(f"WebSocket error: {error}") def on_close(self, ws, close_status_code, close_msg): """关闭回调。""" print(f"WebSocket closed: {close_msg} ({close_status_code})") def send_continue_task(self, text): """发送 continue-task 命令,包含待合成的文本。""" cmd = { "header": { "action": "continue-task", "task_id": self.task_id, "streaming": "duplex" }, "payload": { "input": { "text": text } } } self.ws.send(json.dumps(cmd)) print(f"Sent continue-task command, text content: {text}") def send_finish_task(self): """发送 finish-task 命令,结束语音合成任务。""" cmd = { "header": { "action": "finish-task", "task_id": self.task_id, "streaming": "duplex" }, "payload": { "input": {} } } self.ws.send(json.dumps(cmd)) print("Sent finish-task command") def close(self, ws): """主动关闭连接。""" if ws and ws.sock and ws.sock.connected: ws.close() print("Connection actively closed") def run(self): """启动 WebSocket 客户端。""" # 设置请求头(认证) header = { "Authorization": f"bearer {self.api_key}", "X-DashScope-DataInspection": "enable" } # 创建 WebSocketApp 实例 self.ws = websocket.WebSocketApp( self.uri, header=header, on_open=self.on_open, on_message=self.on_message, on_error=self.on_error, on_close=self.on_close ) print("Listening for WebSocket messages...") self.ws.run_forever() # 启动持久连接监听 # 使用示例 if __name__ == "__main__": # 如果未配置环境变量,请将下面一行替换为:API_KEY = "sk-xxx" API_KEY = os.environ.get("DASHSCOPE_API_KEY") SERVER_URI = "wss://dashscope.aliyuncs.com/api-ws/v1/inference/" client = TTSClient(API_KEY, SERVER_URI) client.run() 标签 CosyVoice SSML 基于 W3C SSML 1.0,仅支持部分标签。语法规则: 所有 SSML 内容必须包裹在 <speak></speak> 标签中。 可以连续使用多个 <speak> 标签,但不能嵌套。 需要转义 XML 特殊字符:" → ",' → ',& → &,< → <,> → >。 <speak>:根标签 说明 所有 SSML 内容必须包裹在 <speak></speak> 标签中。 语法 复制<speak>需要使用 SSML 功能的文本</speak> 属性 属性类型必填说明voiceString否音色名称。覆盖 API 参数 voice。参见音色列表。rateString否语速。覆盖 API 参数 speech_rate。推荐范围:0.5 ~ 2,默认值 1。大于 1 加速,小于 1 减速。pitchString否音调。覆盖 API 参数 pitch_rate。推荐范围:0.5 ~ 2,默认值 1。大于 1 升高,小于 1 降低。volumeString否音量。覆盖 API 参数 volume。取值范围:0 ~ 100,默认值 50。effectString否音效。可选值:robot、lolita(活泼女声)、lowpass、echo、eq(均衡器,高级)、lpfilter(低通滤波器,高级)、hpfilter(高通滤波器,高级)。eq、lpfilter、hpfilter 需配合 effectValue 使用。每个标签只能设置一种音效。音效会增加延迟。effectValueString否自定义 effect 参数。eq:8 个以空格分隔的整数(-20 ~ 20),分别对应 ["40 Hz", "100 Hz", "200 Hz", "400 Hz", "800 Hz", "1600 Hz", "4000 Hz", "12000 Hz"] 频段的增益,示例:"1 1 1 1 1 1 1 1"。lpfilter:整数频率,范围 (0, sample_rate/2],示例:"800"。hpfilter:整数频率,范围 (0, sample_rate/2],示例:"1200"。bgmString否背景音乐 URL。文件需存放在 OSS 上,权限至少为公共读。URL 中的 XML 特殊字符需转义。要求:16 kHz 采样率、单声道、WAV 格式、16-bit。如果合成音频长于背景音乐,音乐将循环播放。backgroundMusicVolumeString否背景音乐音量。 示例 音色: 复制<speak voice="longcheng_v2"> I am a male voice. </speak> 语速: 复制<speak rate="2"> My speech rate is faster than normal. </speak> 音调: 复制<speak pitch="0.5"> However, my pitch is lower than others. </speak> 音量: 复制<speak volume="80"> My volume is also very high. </speak> 音效: 复制<speak effect="robot"> Do you like the robot WALL-E? </speak> 音效 + effectValue: 复制<speak effect="eq" effectValue="1 -20 1 1 1 1 20 1"> Do you like the robot WALL-E? </speak> <speak effect="lpfilter" effectValue="1200"> Do you like the robot WALL-E? </speak> <speak effect="hpfilter" effectValue="1200"> Do you like the robot WALL-E? </speak> 如果音频不是 WAV 格式,可使用 ffmpeg 转换: 复制ffmpeg -i input_audio -acodec pcm_s16le -ac 1 -ar 16000 output.wav 背景音乐(bgm): 复制<speak bgm="https://nls.alicdn.com/bgm/2.wav" backgroundMusicVolume="30" rate="-500" volume="40"> <break time="2s"/> The old trees on the shady cliff are shrouded in mist <break time="700ms"/> The sound of rain is still in the bamboo forest <break time="700ms"/> I know that cotton contributes to the country's plan <break time="700ms"/> The scenery of Mianzhou is always pitiable <break time="2s"/> </speak> 上传音频的版权由您自行承担法律责任。 组合属性: 复制<speak> 需要使用 SSML 标签的文本 </speak> 复制<speak rate="200" pitch="-100" volume="80"> So when put together, my voice sounds like this. </speak> <break>:停顿 说明 插入一段停顿。时长单位为秒(s)或毫秒(ms)。 语法 复制# 无属性 <break/> # 带 time 属性 <break time="string"/> break 标签行为: 不带属性时,<break/> 默认停顿 1 秒。 注意:连续的 <break> 标签时长会累加,但总时长上限为 10 秒。 例如,以下三个标签总时长为 15 秒,但仅前 10 秒有效:复制<speak> 请闭上眼睛休息一下。<break time="5s"/><break time="5s"/><break time="5s"/>好了,请睁开眼睛。 </speak> 属性 属性类型必填说明timeString否停顿时长,如 "2s" 或 "50ms"。秒为单位:1 ~ 10。毫秒为单位:50 ~ 10000。 示例 复制<speak> 请闭上眼睛休息一下。<break time="500ms"/>好了,请睁开眼睛。 </speak> <sub>:替换文本 说明 将显示文本替换为其他发音。 语法 复制<sub alias="string"></sub> 属性 属性类型必填说明aliasString是替代朗读的文本。 示例 复制<speak> <sub alias="network protocol">W3C</sub> </speak> <phoneme>:设置发音 说明 使用拼音(中文)或 CMU 音标(英文)指定发音。 语法 复制<phoneme alphabet="string" ph="string">text</phoneme> 属性 属性类型必填说明alphabetString是发音类型:"py"(拼音)或 "cmu"(音标)。参见 The CMU Pronouncing Dictionary。phString是拼音或音标符号。每个汉字的拼音之间用空格分隔,音节数必须与字数一致。每个音节带声调号(1 ~ 5,其中 5 为轻声)。 示例 复制<speak> 去<phoneme alphabet="py" ph="dian3 dang4 hang2">典当行</phoneme>把这个玩意<phoneme alphabet="py" ph="dang4 diao4">当掉</phoneme> </speak> <speak> How to spell <phoneme alphabet="cmu" ph="S AY N">sin</phoneme>? </speak> <soundEvent>:插入音效 说明 在合成语音中插入外部音频文件(提示音、环境音等)。 语法 复制<soundEvent src="URL"/> 属性 属性类型必填说明srcString是音频 URL。文件需存放在 OSS 上,权限至少为公共读。URL 中的 XML 特殊字符需转义。要求:16 kHz 采样率、单声道、WAV 格式、16-bit,最大 2 MB。 如果音频不是 WAV 格式,可使用 ffmpeg 转换: 复制ffmpeg -i input_audio -acodec pcm_s16le -ac 1 -ar 16000 output.wav 上传音频的版权由您自行承担法律责任。 示例 复制<speak> A horse was frightened<soundEvent src="https://nls.alicdn.com/sound-event/horse-neigh.wav"/>and people scattered to avoid it. </speak> <say-as>:设置朗读格式 说明 指定文本的朗读方式(如数字、日期、电话号码等)。 语法 复制<say-as interpret-as="string">text</say-as> 属性 属性类型必填说明interpret-asString是文本类型。可选值:cardinal(数字)、digits(逐位数字)、telephone(电话号码)、name(姓名)、address(地址)、id(账号名/昵称)、characters(逐字符)、punctuation(标点)、date(日期)、time(时间)、currency(货币)、measure(度量单位)。 : history_output_str = history_output_str.split('Action:')[1].split('<tool_call>')[0].strip() previous_actions.append(f"Step {i + 1}: {history_output_str}") previous_actions_str = "\\n".join(previous_actions) if previous_actions else "None" instruction_prompt = f""" Please generate the next move according to the UI screenshot, instruction and previous actions. Instruction: {instruction} Previous actions: {previous_actions_str}""" messages = [{"role": "system", "content": [{"text": system_prompt}]}] history_len = min(history_n, len(history_output)) if history_len > 0: for history_id, history_item in enumerate(history_output[-history_n:], 0): if history_id == 0: messages.append({ "role": "user", "content": [ {"text": instruction_prompt}, {"image": "file://" + history_item['image']} ] }) else: messages.append({ "role": "user", "content": [{"image": "file://" + history_item['image']}] }) messages.append({ "role": "assistant", "content": [{"text": history_item['output']}], }) messages.append({ "role": "user", "content": [{"image": "file://" + image}] }) else: messages.append({ "role": "user", "content": [ {"text": instruction_prompt}, {"image": "file://" + image} ] }) return messages # ===================== 步骤3:解析模型输出与坐标映射 ===================== def extract_tool_calls(text): pattern = re.compile(r'<tool_call>(.*?)</tool_call>', re.DOTALL | re.IGNORECASE) blocks = pattern.findall(text) actions = [] for blk in blocks: blk = blk.strip() try: actions.append(json.loads(blk)) except json.JSONDecodeError as e: print(f'解析失败: {e} | 片段: {blk[:80]}...') return actions def smart_resize(height, width, factor=32, min_pixels=32*32*4, max_pixels=32*32*1280, max_long_side=8192): def round_by_factor(number, factor): return round(number / factor) * factor def ceil_by_factor(number, factor): return math.ceil(number / factor) * factor def floor_by_factor(number, factor): return math.floor(number / factor) * factor if height < 2 or width < 2: raise ValueError(f"height:{height} or width:{width} must be larger than factor:{factor}") elif max(height, width) / min(height, width) > 200: raise ValueError(f"absolute aspect ratio must be smaller than 200, got {height} / {width}") if max(height, width) > max_long_side: beta = max(height, width) / max_long_side height, width = int(height / beta), int(width / beta) h_bar = round_by_factor(height, factor) w_bar = round_by_factor(width, factor) if h_bar * w_bar > max_pixels: beta = math.sqrt((height * width) / max_pixels) h_bar = floor_by_factor(height / beta, factor) w_bar = floor_by_factor(width / beta, factor) elif h_bar * w_bar < min_pixels: beta = math.sqrt(min_pixels / (height * width)) h_bar = ceil_by_factor(height * beta, factor) w_bar = ceil_by_factor(width * beta, factor) return h_bar, w_bar # ===================== 步骤4:GUI 操作工具类 ===================== class ComputerTools: def __init__(self): self.image_info = None def load_image_info(self, path): width, height = Image.open(path).size self.image_info = (width, height) def get_screenshot(self, image_path, retry_times=3): if os.path.exists(image_path): os.remove(image_path) for i in range(retry_times): screenshot = pyautogui.screenshot() screenshot.save(image_path) if os.path.exists(image_path): self.load_image_info(image_path) return True else: time.sleep(0.1) return False def reset(self): pyautogui.hotkey('win', 'd') def press_key(self, keys): if isinstance(keys, list): cleaned_keys = [] for key in keys: if isinstance(key, str): if key.startswith("keys=["): key = key[6:] if key.endswith("]"): key = key[:-1] if key.startswith("['") or key.startswith('["'): key = key[2:] if len(key) > 2 else key if key.endswith("']") or key.endswith('"]'): key = key[:-2] if len(key) > 2 else key key = key.strip() key_map = {"arrowleft": "left", "arrowright": "right", "arrowup": "up", "arrowdown": "down"} key = key_map.get(key, key) cleaned_keys.append(key) else: cleaned_keys.append(key) keys = cleaned_keys else: keys = [keys] if len(keys) > 1: pyautogui.hotkey(*keys) else: pyautogui.press(keys[0]) def type(self, text): pyperclip.copy(text) pyautogui.keyDown('ctrl') pyautogui.keyDown('v') pyautogui.keyUp('v') pyautogui.keyUp('ctrl') def mouse_move(self, x, y): pyautogui.moveTo(x, y) time.sleep(0.1) pyautogui.moveTo(x, y) def left_click(self, x, y): pyautogui.moveTo(x, y) time.sleep(0.1) pyautogui.click() def left_click_drag(self, x, y): pyautogui.dragTo(x, y, duration=0.5) pyautogui.moveTo(x, y) def right_click(self, x, y): pyautogui.moveTo(x, y) time.sleep(0.1) pyautogui.rightClick() def middle_click(self, x, y): pyautogui.moveTo(x, y) time.sleep(0.1) pyautogui.middleClick() def double_click(self, x, y): pyautogui.moveTo(x, y) time.sleep(0.1) pyautogui.doubleClick() def triple_click(self, x, y): pyautogui.moveTo(x, y) time.sleep(0.1) pyautogui.tripleClick() def scroll(self, pixels): pyautogui.scroll(pixels) # ===================== 步骤5:完整自动化流程 ===================== def run_gui_automation(instruction, max_step=30): dashscope.api_key = os.getenv("DASHSCOPE_API_KEY") dashscope.base_http_api_url = 'https://dashscope.aliyuncs.com/api/v1' model_name = 'gui-plus-2026-02-26' computer_tools = ComputerTools() computer_tools.reset() output_dir = os.path.join(os.path.expanduser("~"), "Desktop", "gui_automation") os.makedirs(output_dir, exist_ok=True) history = [] stop_flag = False print(f"[任务] {instruction}") print("=" * 60) for step_id in range(max_step): if stop_flag: break print(f"\n[步骤 {step_id + 1}]") screen_shot = os.path.join(output_dir, f'screenshot_{step_id}.png') computer_tools.get_screenshot(screen_shot) messages = get_messages(screen_shot, instruction, history, system_prompt) response = dashscope.MultiModalConversation.call( model=model_name, messages=messages, vl_high_resolution_images=True, stream=False ) output_text = response.output.choices[0].message.content[0]['text'] print(f"[模型输出]\n{output_text}\n") action_list = extract_tool_calls(output_text) if not action_list: print("未提取到有效操作") break for action_id, action in enumerate(action_list): action_parameter = action['arguments'] action_type = action_parameter['action'] dummy_image = Image.open(screen_shot) resized_height, resized_width = smart_resize( dummy_image.height, dummy_image.width, factor=16, min_pixels=3136, max_pixels=1003520 * 200 ) for key in ['coordinate', 'coordinate1', 'coordinate2']: if key in action_parameter: action_parameter[key][0] = int(action_parameter[key][0] / 1000 * resized_width) action_parameter[key][1] = int(action_parameter[key][1] / 1000 * resized_height) if action_type in ['click', 'left_click']: computer_tools.left_click(action_parameter['coordinate'][0], action_parameter['coordinate'][1]) print(f"[OK] 左键点击 ({action_parameter['coordinate'][0]}, {action_parameter['coordinate'][1]})") elif action_type == 'mouse_move': computer_tools.mouse_move(action_parameter['coordinate'][0], action_parameter['coordinate'][1]) print(f"[OK] 移动鼠标") elif action_type == 'middle_click': computer_tools.middle_click(action_parameter['coordinate'][0], action_parameter['coordinate'][1]) print(f"[OK] 中键点击") elif action_type in ['right click', 'right_click']: computer_tools.right_click(action_parameter['coordinate'][0], action_parameter['coordinate'][1]) print(f"[OK] 右键点击") elif action_type in ['key', 'hotkey']: computer_tools.press_key(action_parameter['keys']) print(f"[OK] 按键 {action_parameter['keys']}") elif action_type == 'type': computer_tools.type(action_parameter['text']) print(f"[OK] 输入文本: {action_parameter['text']}") elif action_type == 'drag': computer_tools.left_click_drag(action_parameter['coordinate'][0], action_parameter['coordinate'][1]) print(f"[OK] 拖拽") elif action_type == 'scroll': if 'coordinate' in action_parameter: computer_tools.mouse_move(action_parameter['coordinate'][0], action_parameter['coordinate'][1]) computer_tools.scroll(action_parameter.get("pixels", 1)) print(f"[OK] 滚动 {action_parameter.get('pixels', 1)} 像素") elif action_type in ['computer_double_click', 'double_click']: computer_tools.double_click(action_parameter['coordinate'][0], action_parameter['coordinate'][1]) print(f"[OK] 双击") elif action_type == 'wait': time.sleep(action_parameter.get('time', 2)) print(f"[OK] 等待 {action_parameter.get('time', 2)} 秒") elif action_type == 'answer': print(f"[OK] 任务完成: {action_parameter.get('text', '')}") stop_flag = True break elif action_type in ['stop', 'terminate', 'done']: print(f"[OK] 任务终止: {action_parameter.get('status', 'success')}") stop_flag = True break else: print(f"未知操作类型: {action_type}") history.append({'output': output_text, 'image': screen_shot}) time.sleep(2) print("\n" + "=" * 60) print(f"[完成] 共执行 {len(history)} 步") if __name__ == '__main__': run_gui_automation( instruction='帮我打开chrome,在百度中搜索阿里巴巴', max_step=30 )