From 7353fe1df48b3a529e1c192ce76998c955223f23 Mon Sep 17 00:00:00 2001 From: "bobo.yang" Date: Tue, 11 Mar 2025 14:05:35 +0800 Subject: [PATCH] fix lint errors --- community/refactor/api/command.py | 75 +++++---- community/test/api/config/command.py | 76 ++++----- community/test/api/upload/command.py | 42 ++--- community/test/api/utils.py | 55 ++++--- lib/workflow/__init__.py | 2 +- lib/workflow/call.py | 82 +++++----- merico/chatflow/ask/command.py | 15 +- merico/chatflow/gen/command.py | 195 ++++++++++++----------- merico/chatflow/util/contexts.py | 17 +- merico/chatflow/util/ide_service_demo.py | 4 +- merico/unit_tests/main.py | 2 +- merico/unit_tests/ut_workflow.py | 2 +- 12 files changed, 302 insertions(+), 265 deletions(-) diff --git a/community/refactor/api/command.py b/community/refactor/api/command.py index 1e285b9..3568113 100755 --- a/community/refactor/api/command.py +++ b/community/refactor/api/command.py @@ -2,14 +2,14 @@ # -*- coding: utf-8 -*- import sys -import json -from typing import Dict, Any, Tuple +from typing import Dict -from lib.ide_service import IDEService -from lib.chatmark import Button, Form, TextEditor -from lib.workflow import workflow_call from devchat.llm import chat_json +from lib.chatmark import Button, Form, TextEditor +from lib.ide_service import IDEService +from lib.workflow import workflow_call + # 步骤3: 使用AI识别API路径和METHOD的提示词 API_ANALYSIS_PROMPT = """ 分析以下代码,识别其中的API路径和HTTP方法。 @@ -30,11 +30,13 @@ API_ANALYSIS_PROMPT = """ }} """ + @chat_json(prompt=API_ANALYSIS_PROMPT) def analyze_api(code: str) -> Dict[str, str]: """使用AI分析代码中的API路径和HTTP方法""" pass + def main() -> None: """API重构工作流主函数""" try: @@ -42,86 +44,89 @@ def main() -> None: if len(sys.argv) < 2: print("错误: 请提供重构目标") sys.exit(1) - + refactor_target = sys.argv[1] - + # 步骤2: 获取用户选中的代码 selected_code = IDEService().get_selected_range() if not selected_code or not selected_code.text.strip(): print("错误: 请先选择需要重构的代码") sys.exit(1) - + # 步骤3: 使用AI识别API路径和METHOD print("正在分析选中代码中的API信息...") api_info = analyze_api(code=selected_code.text) - + if not api_info or "api_path" not in api_info or "method" not in api_info: print("错误: 无法识别API信息") sys.exit(1) - + api_path = api_info["api_path"] method = api_info["method"] - + # 步骤4: 显示识别结果并让用户确认 - print(f"识别到的API信息:") + print("识别到的API信息:") print(f"API路径: {api_path}") print(f"HTTP方法: {method}") - + api_path_editor = TextEditor(api_path) - - form = Form([ - "### 请确认API信息", - "API路径:", - api_path_editor, - f"HTTP方法: {method}", - "请确认或修改API路径,然后点击下方按钮继续" - ]) - + + form = Form( + [ + "### 请确认API信息", + "API路径:", + api_path_editor, + f"HTTP方法: {method}", + "请确认或修改API路径,然后点击下方按钮继续", + ] + ) + form.render() - + # 获取用户确认后的API路径 confirmed_api_path = api_path_editor.new_text - + # 步骤5: 调用重构工作流进行代码重构 print(f"正在重构API: {confirmed_api_path}...") refactor_result = workflow_call(f"/refactor {refactor_target}") - + if refactor_result != 0: print("错误: API重构失败") sys.exit(1) - + print("API重构成功!") - + # 步骤6: 显示按钮让用户确认是否继续 continue_button = Button(["提交修改并测试API", "结束重构"]) continue_button.render() - + if continue_button.clicked == 1: # 用户选择结束 print("API重构已完成,未提交修改") return - + # 步骤7: 调用GitHub提交工作流提交修改 print("正在提交修改...") commit_result = workflow_call("/github.commit") - + if commit_result != 0: print("警告: 代码提交失败,但将继续进行API测试") else: print("代码提交成功!") - + # 步骤8: 调用API测试工作流对重构API进行测试 print("正在准备API测试...") test_command = f"/test.api.upload {confirmed_api_path} {method} {refactor_target}" test_result = workflow_call(test_command) - + if test_result != 0: print("警告: API测试可能未成功完成") - + print("API重构工作流执行完毕!") - + except Exception as e: print(f"错误: 执行过程中发生异常: {str(e)}") sys.exit(1) + if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/community/test/api/config/command.py b/community/test/api/config/command.py index 9c19be1..c2761bb 100644 --- a/community/test/api/config/command.py +++ b/community/test/api/config/command.py @@ -14,30 +14,30 @@ def read_global_config(): if os.path.exists(config_path): with open(config_path, "r", encoding="utf-8") as f: config_data = json.load(f) - + server_url = config_data.get("api_testing_server_url", "") username = config_data.get("api_testing_server_username", "") password = config_data.get("api_testing_server_password", "") - + return server_url, username, password def save_global_config(server_url, username, password): """保存全局配置信息""" config_path = os.path.join(os.path.expanduser("~/.chat"), ".workflow_config.json") - + # 确保目录存在 os.makedirs(os.path.dirname(config_path), exist_ok=True) - + config_data = {} if os.path.exists(config_path): with open(config_path, "r", encoding="utf-8") as f: config_data = json.load(f) - + config_data["api_testing_server_url"] = server_url config_data["api_testing_server_username"] = username config_data["api_testing_server_password"] = password - + with open(config_path, "w+", encoding="utf-8") as f: json.dump(config_data, f, indent=4) @@ -49,30 +49,30 @@ def read_repo_config(): if os.path.exists(config_path): with open(config_path, "r", encoding="utf-8") as f: config_data = json.load(f) - + project_id = config_data.get("test_api_project_id", "") openapi_url = config_data.get("test_api_openapi_url", "") version_url = config_data.get("test_api_version_url", "") - + return project_id, openapi_url, version_url def save_repo_config(project_id, openapi_url, version_url): """保存仓库相关配置信息""" config_path = os.path.join(os.getcwd(), ".chat", ".workflow_config.json") - + # 确保目录存在 os.makedirs(os.path.dirname(config_path), exist_ok=True) - + config_data = {} if os.path.exists(config_path): with open(config_path, "r", encoding="utf-8") as f: config_data = json.load(f) - + config_data["test_api_project_id"] = project_id config_data["test_api_openapi_url"] = openapi_url config_data["test_api_version_url"] = version_url - + with open(config_path, "w+", encoding="utf-8") as f: json.dump(config_data, f, indent=4) @@ -82,7 +82,7 @@ def main(): # 读取全局配置 server_url, username, password = read_global_config() - + # 读取仓库配置 project_id, openapi_url, version_url = read_repo_config() @@ -93,28 +93,30 @@ def main(): project_id_editor = TextEditor(project_id) openapi_url_editor = TextEditor(openapi_url) version_url_editor = TextEditor(version_url) - + # 创建表单 - form = Form([ - "## DevChat API 测试服务器配置", - "请输入服务器 URL (例如: http://kagent.merico.cn:8000):", - server_url_editor, - "请输入用户名:", - username_editor, - "请输入密码:", - password_editor, - "## 仓库配置", - "请输入DevChat API 测试服务器中项目 ID (例如: 37):", - project_id_editor, - "请输入 OpenAPI URL (例如: http://kagent.merico.cn:8080/openapi.json):", - openapi_url_editor, - "请输入版本 URL (例如: http://kagent.merico.cn:8080/version),不输入表示不需要检查服务器测试环境版本:", - version_url_editor, - ]) - + form = Form( + [ + "## DevChat API 测试服务器配置", + "请输入服务器 URL (例如: http://kagent.merico.cn:8000):", + server_url_editor, + "请输入用户名:", + username_editor, + "请输入密码:", + password_editor, + "## 仓库配置", + "请输入DevChat API 测试服务器中项目 ID (例如: 37):", + project_id_editor, + "请输入 OpenAPI URL (例如: http://kagent.merico.cn:8080/openapi.json):", + openapi_url_editor, + "请输入版本 URL (例如: http://kagent.merico.cn:8080/version),不输入表示不需要检查服务器测试环境版本:", + version_url_editor, + ] + ) + # 渲染表单 form.render() - + # 获取用户输入 server_url = server_url_editor.new_text.strip() username = username_editor.new_text.strip() @@ -129,7 +131,7 @@ def main(): else: print("请提供完整的全局配置信息 (SERVER_URL, USERNAME, PASSWORD)。") sys.exit(1) - + # 保存仓库配置 if project_id and openapi_url and version_url: save_repo_config(project_id, openapi_url, version_url) @@ -139,11 +141,13 @@ def main(): print("\n配置信息已成功保存!") print(f"全局配置: SERVER_URL={server_url}, USERNAME={username}, PASSWORD={'*' * len(password)}") - print(f"仓库配置: PROJECT_ID={project_id}, OPENAPI_URL={openapi_url}, VERSION_URL={version_url}") - + print( + f"仓库配置: PROJECT_ID={project_id}, OPENAPI_URL={openapi_url}, VERSION_URL={version_url}" + ) + print("\n您现在可以使用其他 API 测试工作流了。") sys.exit(0) if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/community/test/api/upload/command.py b/community/test/api/upload/command.py index e854b2d..b5aa69d 100755 --- a/community/test/api/upload/command.py +++ b/community/test/api/upload/command.py @@ -8,9 +8,17 @@ import requests ROOT_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) sys.path.append(ROOT_DIR) -from api.utils import PROJECT_ID, SERVER_URL, OPENAPI_URL, VERSION_URL, get_path_op_id, session +from api.utils import ( # noqa: E402 + OPENAPI_URL, + PROJECT_ID, + SERVER_URL, + VERSION_URL, + get_path_op_id, + session, +) # noqa: E402 + +from lib.chatmark.step import Step # noqa: E402 -from lib.chatmark.step import Step def get_apidocs(): res = session.get( @@ -23,9 +31,7 @@ def get_apidocs(): def delete_old_apidocs(): apidocs = get_apidocs() for apidoc in apidocs: - session.delete( - f"{SERVER_URL}/autotest/projects/{PROJECT_ID}/apidocs/{apidoc['id']}" - ) + session.delete(f"{SERVER_URL}/autotest/projects/{PROJECT_ID}/apidocs/{apidoc['id']}") def get_local_version(): @@ -39,7 +45,7 @@ def check_api_version(): if not VERSION_URL: print("未配置VERSION_URL,跳过API版本检查...") return - + local_version = get_local_version() print("检查被测服务器文档是否已经更新到最新版本...") while True: @@ -51,7 +57,7 @@ def check_api_version(): break else: print( - f".", + ".", end="", flush=True, ) @@ -74,7 +80,7 @@ def wait_for_testcase_done(testcase_id): break else: print( - f".", + ".", end="", flush=True, ) @@ -95,7 +101,7 @@ def wait_for_testcode_done(task_id): break else: print( - f".", + ".", end="", flush=True, ) @@ -124,7 +130,7 @@ def wait_for_task_done(task_id): break else: print( - f".", + ".", end="", flush=True, ) @@ -143,9 +149,7 @@ def get_testcase(api_path_id): def main(): - error_msg = ( - "请输入要测试的API名称和测试目标!如:/test.api.upload api_path method test_target" - ) + error_msg = "请输入要测试的API名称和测试目标!如:/test.api.upload api_path method test_target" if len(sys.argv) < 2: print(error_msg) return @@ -160,15 +164,13 @@ def main(): with Step("检查 API 版本是否更新..."): check_api_version() delete_old_apidocs() - - with Step( - f"上传 OpenAPI 文档,并且触发 API {api_path} 的测试用例和自动测试脚本生成任务..." - ): + + with Step(f"上传 OpenAPI 文档,并且触发 API {api_path} 的测试用例和自动测试脚本生成任务..."): # 使用配置的OPENAPI_URL if not OPENAPI_URL: print("错误:未配置OPENAPI_URL,无法获取OpenAPI文档") return - + res = requests.get( OPENAPI_URL, ) @@ -219,7 +221,7 @@ def main(): else: print(f"提交执行自动测试脚本失败!{res.text}") return - + api_path_id = get_path_op_id(api_path, method) with Step("开始查询测试脚本执行结果..."): while True: @@ -242,4 +244,4 @@ def main(): if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/community/test/api/utils.py b/community/test/api/utils.py index 2ca4807..14b7019 100644 --- a/community/test/api/utils.py +++ b/community/test/api/utils.py @@ -1,7 +1,8 @@ -import os import json +import os + import requests -import sys + from lib.workflow.call import workflow_call # 默认配置,仅在无法读取配置文件时使用 @@ -10,6 +11,7 @@ from lib.workflow.call import workflow_call session = requests.Session() _is_login = False + def read_config(): """读取配置文件中的设置""" # 读取全局配置 @@ -21,7 +23,7 @@ def read_config(): global_config = json.load(f) except Exception: pass - + # 读取仓库配置 repo_config_path = os.path.join(os.getcwd(), ".chat", ".workflow_config.json") repo_config = {} @@ -31,7 +33,7 @@ def read_config(): repo_config = json.load(f) except Exception: pass - + # 获取配置值 server_url = global_config.get("api_testing_server_url", "") username = global_config.get("api_testing_server_username", "") @@ -39,18 +41,19 @@ def read_config(): project_id = repo_config.get("test_api_project_id", "") openapi_url = repo_config.get("test_api_openapi_url", "") version_url = repo_config.get("test_api_version_url", "") - + return server_url, username, password, project_id, openapi_url, version_url + def ensure_config(): """确保配置存在,如果不存在则调用配置工作流""" global_config_path = os.path.join(os.path.expanduser("~/.chat"), ".workflow_config.json") repo_config_path = os.path.join(os.getcwd(), ".chat", ".workflow_config.json") - + # 检查全局配置和仓库配置是否存在 global_config_exists = os.path.exists(global_config_path) repo_config_exists = os.path.exists(repo_config_path) - + # 检查必填配置项是否存在 config_valid = True if global_config_exists and repo_config_exists: @@ -64,21 +67,23 @@ def ensure_config(): repo_config = json.load(f) except Exception: config_valid = False - + # 检查必填项 - if (not global_config.get("api_testing_server_url") or - not global_config.get("api_testing_server_username") or - not global_config.get("api_testing_server_password") or - not repo_config.get("test_api_project_id") or - not repo_config.get("test_api_openapi_url")): + if ( + not global_config.get("api_testing_server_url") + or not global_config.get("api_testing_server_username") + or not global_config.get("api_testing_server_password") + or not repo_config.get("test_api_project_id") + or not repo_config.get("test_api_openapi_url") + ): config_valid = False else: config_valid = False - + if not config_valid: print("缺少API测试所需的配置,将启动配置向导...") workflow_call("/test.api.config") - + # 重新检查配置是否已创建并包含必要项 try: if os.path.exists(global_config_path) and os.path.exists(repo_config_path): @@ -86,21 +91,24 @@ def ensure_config(): global_config = json.load(f) with open(repo_config_path, "r", encoding="utf-8") as f: repo_config = json.load(f) - - if (global_config.get("api_testing_server_url") and - global_config.get("api_testing_server_username") and - global_config.get("api_testing_server_password") and - repo_config.get("test_api_project_id") and - repo_config.get("test_api_openapi_url")): + + if ( + global_config.get("api_testing_server_url") + and global_config.get("api_testing_server_username") + and global_config.get("api_testing_server_password") + and repo_config.get("test_api_project_id") + and repo_config.get("test_api_openapi_url") + ): return True print("配置失败") return False except Exception: print("配置失败") return False - + return True + # 读取配置 result = ensure_config() if not result: @@ -108,6 +116,7 @@ if not result: exit(0) SERVER_URL, USERNAME, PASSWORD, PROJECT_ID, OPENAPI_URL, VERSION_URL = read_config() + def login(): global _is_login if _is_login: @@ -129,4 +138,4 @@ def get_path_op_id(keyword: str, method: str): return pathop["id"] -login() \ No newline at end of file +login() diff --git a/lib/workflow/__init__.py b/lib/workflow/__init__.py index 6e1b845..9865e8b 100644 --- a/lib/workflow/__init__.py +++ b/lib/workflow/__init__.py @@ -1,3 +1,3 @@ from .call import workflow_call -__all__ = ["workflow_call"] \ No newline at end of file +__all__ = ["workflow_call"] diff --git a/lib/workflow/call.py b/lib/workflow/call.py index e8b8198..c7cb787 100644 --- a/lib/workflow/call.py +++ b/lib/workflow/call.py @@ -1,16 +1,18 @@ import os -import sys -import subprocess -import yaml import re +import subprocess +import sys + +import yaml + def find_workflow_script(command_name: str) -> str: """ 根据命令名称查找对应的工作流脚本路径 - + Args: command_name: 工作流命令名称,如 "github.commit" - + Returns: 找到的脚本路径,如果未找到则返回空字符串 """ @@ -18,12 +20,12 @@ def find_workflow_script(command_name: str) -> str: workflow_base_dirs = [ os.path.expanduser("~/.chat/scripts/custom"), os.path.expanduser("~/.chat/scripts/community"), - os.path.expanduser("~/.chat/scripts/merico") + os.path.expanduser("~/.chat/scripts/merico"), ] - + # 解析命令名称,处理子命令 - parts = command_name.split('.') - + parts = command_name.split(".") + for base_dir in workflow_base_dirs: # 检查custom目录下是否有config.yml定义命名空间 if base_dir.endswith("/custom"): @@ -31,12 +33,12 @@ def find_workflow_script(command_name: str) -> str: namespaces = [] if os.path.exists(config_path): try: - with open(config_path, 'r', encoding='utf-8') as f: + with open(config_path, "r", encoding="utf-8") as f: config = yaml.safe_load(f) - namespaces = config.get('namespaces', []) + namespaces = config.get("namespaces", []) except Exception: pass - + # 在每个命名空间下查找 for namespace in namespaces: namespace_dir = os.path.join(base_dir, namespace) @@ -48,98 +50,100 @@ def find_workflow_script(command_name: str) -> str: script_path = find_script_in_path(base_dir, parts) if script_path: return script_path - + return "" + def find_script_in_path(base_dir: str, command_parts: list) -> str: """ 在指定路径下查找工作流脚本 - + Args: base_dir: 基础目录 command_parts: 命令名称拆分的部分 - + Returns: 找到的脚本路径,如果未找到则返回空字符串 """ # 构建工作流目录路径 workflow_dir = os.path.join(base_dir, *command_parts) - + # 检查目录是否存在 if not os.path.isdir(workflow_dir): return "" - + # 查找目录下的Python脚本 - py_files = [f for f in os.listdir(workflow_dir) if f.endswith('.py')] - + py_files = [f for f in os.listdir(workflow_dir) if f.endswith(".py")] + # 如果只有一个Python脚本,直接返回 if len(py_files) == 1: return os.path.join(workflow_dir, py_files[0]) - + # 如果有多个Python脚本,查找command.yml yml_path = os.path.join(workflow_dir, "command.yml") if os.path.exists(yml_path): try: - with open(yml_path, 'r', encoding='utf-8') as f: + with open(yml_path, "r", encoding="utf-8") as f: yml_content = f.read() - + # 查找steps部分中的脚本名称 for py_file in py_files: py_file_name = os.path.splitext(py_file)[0] # 查找类似 $command_path/script_name.py 的模式 - if re.search(rf'\$command_path/{py_file_name}\.py', yml_content): + if re.search(rf"\$command_path/{py_file_name}\.py", yml_content): return os.path.join(workflow_dir, py_file) except Exception: pass - + # 尝试查找与最后一个命令部分同名的脚本 last_part_script = f"{command_parts[-1]}.py" if last_part_script in py_files: return os.path.join(workflow_dir, last_part_script) - + # 尝试查找名为command.py的脚本 if "command.py" in py_files: return os.path.join(workflow_dir, "command.py") - + # 没有找到合适的脚本 return "" + def workflow_call(command: str) -> int: - """ + """ 调用工作流命令 - + Args: command: 完整的工作流命令,如 "/github.commit message" - + Returns: 命令执行的返回码 """ # 解析命令和参数 parts = command.strip().split(maxsplit=1) - cmd_name = parts[0].lstrip('/') + cmd_name = parts[0].lstrip("/") cmd_args = parts[1] if len(parts) > 1 else "" - + # 查找对应的工作流脚本 script_path = find_workflow_script(cmd_name) - + if not script_path: print(f"找不到工作流命令: {cmd_name}") return 1 - + # 使用Popen并将标准输入输出错误流连接到父进程 process = subprocess.Popen( [sys.executable, script_path, cmd_args], - stdin=sys.stdin, # 父进程的标准输入传递给子进程 + stdin=sys.stdin, # 父进程的标准输入传递给子进程 stdout=sys.stdout, # 子进程的标准输出传递给父进程 stderr=sys.stderr, # 子进程的标准错误传递给父进程 - text=True, # 使用文本模式 - bufsize=1 # 行缓冲,确保输出及时显示 + text=True, # 使用文本模式 + bufsize=1, # 行缓冲,确保输出及时显示 ) - + # 等待子进程完成并获取返回码 return_code = process.wait() - + if return_code != 0: print(f"命令执行失败,返回码: {return_code}") - + return return_code diff --git a/merico/chatflow/ask/command.py b/merico/chatflow/ask/command.py index 8ca9d02..651f10e 100644 --- a/merico/chatflow/ask/command.py +++ b/merico/chatflow/ask/command.py @@ -3,22 +3,26 @@ import os import sys from devchat.llm import chat + from lib.ide_service import IDEService ROOT_WORKFLOW_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) sys.path.append(ROOT_WORKFLOW_DIR) -from chatflow.util.contexts import CONTEXTS +from chatflow.util.contexts import CONTEXTS # noqa: E402 - - -PROMPT = f""" +PROMPT = ( + f""" {CONTEXTS.replace("{", "{{").replace("}", "}}")} -""" + """ +""" + + """ 当前选中代码:{selected_code} 当前打开文件路径:{file_path} 用户要求或问题:{question} """ +) + + @chat(prompt=PROMPT, stream_out=True) def ask(question, selected_code, file_path): pass @@ -38,5 +42,6 @@ def main(question): ask(question=question, selected_code=code_text, file_path=file_path) sys.exit(0) + if __name__ == "__main__": main(sys.argv[1]) diff --git a/merico/chatflow/gen/command.py b/merico/chatflow/gen/command.py index 0c0dddc..9309190 100644 --- a/merico/chatflow/gen/command.py +++ b/merico/chatflow/gen/command.py @@ -7,20 +7,19 @@ #!/usr/bin/env python3 import os -import sys -import re -import yaml import subprocess -from pathlib import Path -from lib.chatmark import Button, Form, TextEditor, Step, Radio, Checkbox -from lib.ide_service import IDEService +import sys + +import yaml from devchat.llm import chat, chat_json +from lib.chatmark import Form, Radio, Step, TextEditor +from lib.ide_service import IDEService + ROOT_WORKFLOW_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) sys.path.append(ROOT_WORKFLOW_DIR) -from chatflow.util.contexts import CONTEXTS - +from chatflow.util.contexts import CONTEXTS # noqa: E402 # 工作流命令定义模板 COMMAND_YML_TEMPLATE = """description: {description} @@ -140,80 +139,89 @@ CODE_IMPLEMENTATION_PROMPT = """ ``` """ + @chat_json(prompt=EXTRACT_INFO_PROMPT) def extract_workflow_info(user_input, contexts): """从用户输入中提取工作流信息""" pass + @chat(prompt=WORKFLOW_STEPS_PROMPT, stream_out=False) -def generate_workflow_steps(command_name, description, input_requirement, purpose, implementation_ideas, contexts): +def generate_workflow_steps( + command_name, description, input_requirement, purpose, implementation_ideas, contexts +): """生成工作流实现步骤描述""" pass + @chat(prompt=CODE_IMPLEMENTATION_PROMPT, stream_out=False) -def generate_workflow_code(command_name, description, input_requirement, custom_env, workflow_steps, contexts): +def generate_workflow_code( + command_name, description, input_requirement, custom_env, workflow_steps, contexts +): """生成工作流实现代码""" pass + def parse_command_path(command_name): """ 解析命令路径,返回目录结构和最终命令名 确保工作流创建在custom目录下的有效namespace中 """ - parts = command_name.strip('/').split('.') - + parts = command_name.strip("/").split(".") + # 获取custom目录路径 - custom_dir = os.path.join(os.path.expanduser('~'), '.chat', 'scripts', 'custom') - + custom_dir = os.path.join(os.path.expanduser("~"), ".chat", "scripts", "custom") + # 获取custom目录下的有效namespace valid_namespaces = [] - config_path = os.path.join(custom_dir, 'config.yml') - + config_path = os.path.join(custom_dir, "config.yml") + if os.path.exists(config_path): try: - with open(config_path, 'r') as f: + with open(config_path, "r") as f: config = yaml.safe_load(f) - if config and 'namespaces' in config: - valid_namespaces = config['namespaces'] + if config and "namespaces" in config: + valid_namespaces = config["namespaces"] except Exception as e: print(f"读取custom配置文件失败: {str(e)}") - + # 如果没有找到有效namespace,使用默认namespace if not valid_namespaces: print("警告: 未找到有效的custom namespace,将使用默认namespace 'default'") - valid_namespaces = ['default'] - + valid_namespaces = ["default"] + # 确保default namespace存在于config.yml中 try: os.makedirs(os.path.dirname(config_path), exist_ok=True) - with open(config_path, 'w') as f: - yaml.dump({'namespaces': ['default']}, f) + with open(config_path, "w") as f: + yaml.dump({"namespaces": ["default"]}, f) except Exception as e: print(f"创建默认namespace配置失败: {str(e)}") - + # 使用第一个有效namespace namespace = valid_namespaces[0] - + # 创建最终命令目录路径 command_dir = os.path.join(custom_dir, namespace) - + # 创建目录结构 for part in parts[:-1]: command_dir = os.path.join(command_dir, part) - + return command_dir, parts[-1] + def parse_markdown_block(response, block_type="steps"): """ 从AI响应中解析指定类型的Markdown代码块内容,支持处理嵌套的代码块。 - + Args: response (str): AI生成的响应文本 block_type (str): 要解析的代码块类型,默认为"steps" - + Returns: str: 解析出的代码块内容 - + Raises: Exception: 解析失败时抛出异常 """ @@ -221,34 +229,34 @@ def parse_markdown_block(response, block_type="steps"): # 处理可能存在的思考过程 if response.find("") != -1: response = response.split("")[-1] - + # 构建起始标记 start_marker = f"```{block_type}" end_marker = "```" - + # 查找起始位置 start_pos = response.find(start_marker) if start_pos == -1: # 如果没有找到指定类型的标记,直接返回原文本 return response.strip() - + # 从标记后开始的位置 content_start = start_pos + len(start_marker) - + # 从content_start开始找到第一个未配对的``` pos = content_start open_blocks = 1 # 已经有一个开放的块 - + while True: # 找到下一个``` next_marker = response.find(end_marker, pos) if next_marker == -1: # 如果没有找到结束标记,返回剩余所有内容 break - + # 检查这是开始还是结束标记 # 向后看是否跟着语言标识符 - after_marker = response[next_marker + 3:] + after_marker = response[next_marker + 3 :] # 检查是否是新的代码块开始 - 只要```后面跟着非空白字符,就认为是新代码块开始 if after_marker.strip() and not after_marker.startswith("\n"): first_word = after_marker.split()[0] @@ -258,66 +266,67 @@ def parse_markdown_block(response, block_type="steps"): open_blocks -= 1 else: open_blocks -= 1 - + if open_blocks == 0: # 找到匹配的结束标记 return response[content_start:next_marker].strip() - + pos = next_marker + 3 - + # 如果没有找到匹配的结束标记,返回从content_start到末尾的内容 return response[content_start:].strip() - + except Exception as e: import logging + logging.info(f"Response: {response}") logging.error(f"Exception in parse_markdown_block: {str(e)}") raise Exception(f"解析{block_type}内容失败: {str(e)}") from e -def create_workflow_files(command_dir, command_name, description, input_required, - code): + +def create_workflow_files(command_dir, command_name, description, input_required, code): """创建工作流命令文件""" # 创建命令目录 os.makedirs(command_dir, exist_ok=True) - + # 创建command.yml input_section = f"input: {'required' if input_required else 'optional'}" help_section = "help: README.md" - input_param = ' "$input"' if input_required else '' - + input_param = ' "$input"' if input_required else "" + # 添加自定义环境配置 workflow_python_section = "" python_cmd = "devchat_python" - + yml_content = COMMAND_YML_TEMPLATE.format( description=description, input_section=input_section, help_section=help_section, workflow_python_section=workflow_python_section, python_cmd=python_cmd, - input_param=input_param + input_param=input_param, ) - - with open(os.path.join(command_dir, 'command.yml'), 'w') as f: + + with open(os.path.join(command_dir, "command.yml"), "w") as f: f.write(yml_content) - + # 创建command.py - with open(os.path.join(command_dir, 'command.py'), 'w') as f: + with open(os.path.join(command_dir, "command.py"), "w") as f: f.write(code) - + # 设置执行权限 - os.chmod(os.path.join(command_dir, 'command.py'), 0o755) - + os.chmod(os.path.join(command_dir, "command.py"), 0o755) + # 创建README.md readme_content = f"# {command_name}\n\n{description}\n" - with open(os.path.join(command_dir, 'README.md'), 'w') as f: + with open(os.path.join(command_dir, "README.md"), "w") as f: f.write(readme_content) - + def main(): # 获取用户输入 user_input = sys.argv[1] if len(sys.argv) > 1 else "" - + # 步骤1: 通过AI分析用户输入,提取必要信息 with Step("分析用户输入,提取工作流信息..."): workflow_info = extract_workflow_info(user_input=user_input, contexts=CONTEXTS) @@ -330,14 +339,14 @@ def main(): input_requirement="可选", purpose=workflow_info.get("purpose", ""), implementation_ideas=workflow_info.get("implementation_ideas", ""), - contexts=CONTEXTS + contexts=CONTEXTS, ) - + workflow_steps = parse_markdown_block(workflow_steps, block_type="steps") - + # 步骤2: 使用Form组件一次性展示所有信息,让用户编辑 print("\n## 工作流信息\n") - + # 创建所有编辑组件 command_name_editor = TextEditor(workflow_info.get("command_name", "/example.command")) description_editor = TextEditor(workflow_info.get("description", "工作流命令描述")) @@ -345,33 +354,32 @@ def main(): purpose_editor = TextEditor(workflow_info.get("purpose", "请描述工作流的主要目的和功能")) # ideas_editor = TextEditor(workflow_info.get("implementation_ideas", "请描述实现思路")) steps_editor = TextEditor(workflow_steps) - # 使用Form组件一次性展示所有编辑组件 - form = Form([ - "命令名称:", - command_name_editor, - "输入要求:", - input_radio, - "描述:", - description_editor, - "工作流目的:", - purpose_editor, - "实现步骤:", - steps_editor - ]) - + form = Form( + [ + "命令名称:", + command_name_editor, + "输入要求:", + input_radio, + "描述:", + description_editor, + "工作流目的:", + purpose_editor, + "实现步骤:", + steps_editor, + ] + ) + form.render() - + # 获取用户编辑后的值 command_name = command_name_editor.new_text description = description_editor.new_text input_required = input_radio.selection == 0 - purpose = purpose_editor.new_text # implementation_ideas = ideas_editor.new_text workflow_steps = steps_editor.new_text - - + # 步骤4: 生成工作流实现代码 with Step("生成工作流实现代码..."): code = generate_workflow_code( @@ -380,42 +388,37 @@ def main(): input_requirement="必选" if input_required else "可选", custom_env=False, workflow_steps=workflow_steps, - contexts=CONTEXTS + contexts=CONTEXTS, ) code = code.strip() start_index = code.find("```python") end_index = code.rfind("```") - code = code[start_index + len("```python"):end_index].strip() + code = code[start_index + len("```python") : end_index].strip() # code = parse_markdown_block(code, block_type="python") - + # 解析命令路径 command_dir, final_command = parse_command_path(command_name) full_command_dir = os.path.join(command_dir, final_command) - + # 步骤5: 创建工作流文件 with Step(f"创建工作流文件到 {full_command_dir}"): - create_workflow_files( - full_command_dir, - command_name, - description, - input_required, - code - ) - + create_workflow_files(full_command_dir, command_name, description, input_required, code) + # 更新斜杠命令 with Step("更新斜杠命令..."): IDEService().update_slash_commands() - + # 在新窗口中打开工作流目录 try: subprocess.run(["code", full_command_dir], check=True) except subprocess.SubprocessError: print(f"无法自动打开编辑器,请手动打开工作流目录: {full_command_dir}") - + print(f"\n✅ 工作流命令 {command_name} 已成功创建!") print(f"命令文件位置: {full_command_dir}") print("你现在可以在DevChat中使用这个命令了。") + if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/merico/chatflow/util/contexts.py b/merico/chatflow/util/contexts.py index eaa9230..76c947c 100644 --- a/merico/chatflow/util/contexts.py +++ b/merico/chatflow/util/contexts.py @@ -10,7 +10,7 @@ """ import os -from typing import List + def load_file_in_user_scripts(filename: str) -> str: """ @@ -20,7 +20,8 @@ def load_file_in_user_scripts(filename: str) -> str: file_path = os.path.join(user_path, filename) with open(file_path, "r") as f: return f.read() - + + def load_local_file(filename: str) -> str: """ 从当前脚本所在目录的相对目录加载文件内容 @@ -30,8 +31,9 @@ def load_local_file(filename: str) -> str: with open(file_path, "r") as f: return f.read() + def load_existing_workflow_defines() -> str: - """ 从user scripts目录遍历找到所有command.yml文件,并加载其内容 """ + """从user scripts目录遍历找到所有command.yml文件,并加载其内容""" merico_path = os.path.expanduser("~/.chat/scripts/merico") community_path = os.path.expanduser("~/.chat/scripts/community") custom_path = os.path.expanduser("~/.chat/scripts/custom") @@ -43,7 +45,7 @@ def load_existing_workflow_defines() -> str: if root == path: root_paths.extend([os.path.join(root, d) for d in dirs]) break - + wrkflow_defines = [] # 遍历所有根目录,对每个根目录进行递归遍历,找到所有command.yml文件,并加载其内容 # 将目录名称与command.yml内容拼接成一个字符串,添加到wrkflow_defines列表中 @@ -53,7 +55,12 @@ def load_existing_workflow_defines() -> str: for root, dirs, files in os.walk(root_path): if "command.yml" in files: with open(os.path.join(root, "command.yml"), "r") as f: - wrkflow_defines.append(f"工作流命令/{root[len(root_path)+1:].replace(os.sep, '.')}的定义:\n{f.read()}\n\n") + wrkflow_defines.append( + ( + f"工作流命令/{root[len(root_path)+1:].replace(os.sep, '.')}的定义:" + f"\n{f.read()}\n\n" + ) + ) return "\n".join(wrkflow_defines) diff --git a/merico/chatflow/util/ide_service_demo.py b/merico/chatflow/util/ide_service_demo.py index 1caef0f..0188a16 100644 --- a/merico/chatflow/util/ide_service_demo.py +++ b/merico/chatflow/util/ide_service_demo.py @@ -1,5 +1,3 @@ from lib.ide_service import IDEService -IDEService().ide_logging( - "debug", "Hello IDE Service!" -) \ No newline at end of file +IDEService().ide_logging("debug", "Hello IDE Service!") diff --git a/merico/unit_tests/main.py b/merico/unit_tests/main.py index e3a402e..a4a306d 100644 --- a/merico/unit_tests/main.py +++ b/merico/unit_tests/main.py @@ -7,7 +7,6 @@ import openai sys.path.append(os.path.dirname(__file__)) -from cache import LocalCache from i18n import TUILanguage, get_translation from model import ( FuncToTest, @@ -16,6 +15,7 @@ from model import ( ) from ut_workflow import UnitTestsWorkflow +from cache import LocalCache from lib.chatmark import Step from lib.ide_service import IDEService diff --git a/merico/unit_tests/ut_workflow.py b/merico/unit_tests/ut_workflow.py index b4c650b..423ede1 100644 --- a/merico/unit_tests/ut_workflow.py +++ b/merico/unit_tests/ut_workflow.py @@ -1,6 +1,5 @@ from typing import Dict, List, Tuple -from cache import LocalCache from find_context import ( Context, Position, @@ -18,6 +17,7 @@ from propose_test import propose_test from tools.file_util import retrieve_file_content from write_tests import write_and_print_tests +from cache import LocalCache from lib.chatmark import Checkbox, Form, Step, TextEditor