fix lint errors

This commit is contained in:
bobo.yang 2025-03-11 14:05:35 +08:00
parent 4dcdcc65e6
commit 7353fe1df4
12 changed files with 302 additions and 265 deletions

View File

@ -2,14 +2,14 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import sys import sys
import json from typing import Dict
from typing import Dict, Any, Tuple
from lib.ide_service import IDEService
from lib.chatmark import Button, Form, TextEditor
from lib.workflow import workflow_call
from devchat.llm import chat_json from devchat.llm import chat_json
from lib.chatmark import Button, Form, TextEditor
from lib.ide_service import IDEService
from lib.workflow import workflow_call
# 步骤3: 使用AI识别API路径和METHOD的提示词 # 步骤3: 使用AI识别API路径和METHOD的提示词
API_ANALYSIS_PROMPT = """ API_ANALYSIS_PROMPT = """
分析以下代码识别其中的API路径和HTTP方法 分析以下代码识别其中的API路径和HTTP方法
@ -30,11 +30,13 @@ API_ANALYSIS_PROMPT = """
}} }}
""" """
@chat_json(prompt=API_ANALYSIS_PROMPT) @chat_json(prompt=API_ANALYSIS_PROMPT)
def analyze_api(code: str) -> Dict[str, str]: def analyze_api(code: str) -> Dict[str, str]:
"""使用AI分析代码中的API路径和HTTP方法""" """使用AI分析代码中的API路径和HTTP方法"""
pass pass
def main() -> None: def main() -> None:
"""API重构工作流主函数""" """API重构工作流主函数"""
try: try:
@ -42,86 +44,89 @@ def main() -> None:
if len(sys.argv) < 2: if len(sys.argv) < 2:
print("错误: 请提供重构目标") print("错误: 请提供重构目标")
sys.exit(1) sys.exit(1)
refactor_target = sys.argv[1] refactor_target = sys.argv[1]
# 步骤2: 获取用户选中的代码 # 步骤2: 获取用户选中的代码
selected_code = IDEService().get_selected_range() selected_code = IDEService().get_selected_range()
if not selected_code or not selected_code.text.strip(): if not selected_code or not selected_code.text.strip():
print("错误: 请先选择需要重构的代码") print("错误: 请先选择需要重构的代码")
sys.exit(1) sys.exit(1)
# 步骤3: 使用AI识别API路径和METHOD # 步骤3: 使用AI识别API路径和METHOD
print("正在分析选中代码中的API信息...") print("正在分析选中代码中的API信息...")
api_info = analyze_api(code=selected_code.text) api_info = analyze_api(code=selected_code.text)
if not api_info or "api_path" not in api_info or "method" not in api_info: if not api_info or "api_path" not in api_info or "method" not in api_info:
print("错误: 无法识别API信息") print("错误: 无法识别API信息")
sys.exit(1) sys.exit(1)
api_path = api_info["api_path"] api_path = api_info["api_path"]
method = api_info["method"] method = api_info["method"]
# 步骤4: 显示识别结果并让用户确认 # 步骤4: 显示识别结果并让用户确认
print(f"识别到的API信息:") print("识别到的API信息:")
print(f"API路径: {api_path}") print(f"API路径: {api_path}")
print(f"HTTP方法: {method}") print(f"HTTP方法: {method}")
api_path_editor = TextEditor(api_path) api_path_editor = TextEditor(api_path)
form = Form([ form = Form(
"### 请确认API信息", [
"API路径:", "### 请确认API信息",
api_path_editor, "API路径:",
f"HTTP方法: {method}", api_path_editor,
"请确认或修改API路径然后点击下方按钮继续" f"HTTP方法: {method}",
]) "请确认或修改API路径然后点击下方按钮继续",
]
)
form.render() form.render()
# 获取用户确认后的API路径 # 获取用户确认后的API路径
confirmed_api_path = api_path_editor.new_text confirmed_api_path = api_path_editor.new_text
# 步骤5: 调用重构工作流进行代码重构 # 步骤5: 调用重构工作流进行代码重构
print(f"正在重构API: {confirmed_api_path}...") print(f"正在重构API: {confirmed_api_path}...")
refactor_result = workflow_call(f"/refactor {refactor_target}") refactor_result = workflow_call(f"/refactor {refactor_target}")
if refactor_result != 0: if refactor_result != 0:
print("错误: API重构失败") print("错误: API重构失败")
sys.exit(1) sys.exit(1)
print("API重构成功!") print("API重构成功!")
# 步骤6: 显示按钮让用户确认是否继续 # 步骤6: 显示按钮让用户确认是否继续
continue_button = Button(["提交修改并测试API", "结束重构"]) continue_button = Button(["提交修改并测试API", "结束重构"])
continue_button.render() continue_button.render()
if continue_button.clicked == 1: # 用户选择结束 if continue_button.clicked == 1: # 用户选择结束
print("API重构已完成未提交修改") print("API重构已完成未提交修改")
return return
# 步骤7: 调用GitHub提交工作流提交修改 # 步骤7: 调用GitHub提交工作流提交修改
print("正在提交修改...") print("正在提交修改...")
commit_result = workflow_call("/github.commit") commit_result = workflow_call("/github.commit")
if commit_result != 0: if commit_result != 0:
print("警告: 代码提交失败但将继续进行API测试") print("警告: 代码提交失败但将继续进行API测试")
else: else:
print("代码提交成功!") print("代码提交成功!")
# 步骤8: 调用API测试工作流对重构API进行测试 # 步骤8: 调用API测试工作流对重构API进行测试
print("正在准备API测试...") print("正在准备API测试...")
test_command = f"/test.api.upload {confirmed_api_path} {method} {refactor_target}" test_command = f"/test.api.upload {confirmed_api_path} {method} {refactor_target}"
test_result = workflow_call(test_command) test_result = workflow_call(test_command)
if test_result != 0: if test_result != 0:
print("警告: API测试可能未成功完成") print("警告: API测试可能未成功完成")
print("API重构工作流执行完毕!") print("API重构工作流执行完毕!")
except Exception as e: except Exception as e:
print(f"错误: 执行过程中发生异常: {str(e)}") print(f"错误: 执行过程中发生异常: {str(e)}")
sys.exit(1) sys.exit(1)
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@ -14,30 +14,30 @@ def read_global_config():
if os.path.exists(config_path): if os.path.exists(config_path):
with open(config_path, "r", encoding="utf-8") as f: with open(config_path, "r", encoding="utf-8") as f:
config_data = json.load(f) config_data = json.load(f)
server_url = config_data.get("api_testing_server_url", "") server_url = config_data.get("api_testing_server_url", "")
username = config_data.get("api_testing_server_username", "") username = config_data.get("api_testing_server_username", "")
password = config_data.get("api_testing_server_password", "") password = config_data.get("api_testing_server_password", "")
return server_url, username, password return server_url, username, password
def save_global_config(server_url, username, password): def save_global_config(server_url, username, password):
"""保存全局配置信息""" """保存全局配置信息"""
config_path = os.path.join(os.path.expanduser("~/.chat"), ".workflow_config.json") config_path = os.path.join(os.path.expanduser("~/.chat"), ".workflow_config.json")
# 确保目录存在 # 确保目录存在
os.makedirs(os.path.dirname(config_path), exist_ok=True) os.makedirs(os.path.dirname(config_path), exist_ok=True)
config_data = {} config_data = {}
if os.path.exists(config_path): if os.path.exists(config_path):
with open(config_path, "r", encoding="utf-8") as f: with open(config_path, "r", encoding="utf-8") as f:
config_data = json.load(f) config_data = json.load(f)
config_data["api_testing_server_url"] = server_url config_data["api_testing_server_url"] = server_url
config_data["api_testing_server_username"] = username config_data["api_testing_server_username"] = username
config_data["api_testing_server_password"] = password config_data["api_testing_server_password"] = password
with open(config_path, "w+", encoding="utf-8") as f: with open(config_path, "w+", encoding="utf-8") as f:
json.dump(config_data, f, indent=4) json.dump(config_data, f, indent=4)
@ -49,30 +49,30 @@ def read_repo_config():
if os.path.exists(config_path): if os.path.exists(config_path):
with open(config_path, "r", encoding="utf-8") as f: with open(config_path, "r", encoding="utf-8") as f:
config_data = json.load(f) config_data = json.load(f)
project_id = config_data.get("test_api_project_id", "") project_id = config_data.get("test_api_project_id", "")
openapi_url = config_data.get("test_api_openapi_url", "") openapi_url = config_data.get("test_api_openapi_url", "")
version_url = config_data.get("test_api_version_url", "") version_url = config_data.get("test_api_version_url", "")
return project_id, openapi_url, version_url return project_id, openapi_url, version_url
def save_repo_config(project_id, openapi_url, version_url): def save_repo_config(project_id, openapi_url, version_url):
"""保存仓库相关配置信息""" """保存仓库相关配置信息"""
config_path = os.path.join(os.getcwd(), ".chat", ".workflow_config.json") config_path = os.path.join(os.getcwd(), ".chat", ".workflow_config.json")
# 确保目录存在 # 确保目录存在
os.makedirs(os.path.dirname(config_path), exist_ok=True) os.makedirs(os.path.dirname(config_path), exist_ok=True)
config_data = {} config_data = {}
if os.path.exists(config_path): if os.path.exists(config_path):
with open(config_path, "r", encoding="utf-8") as f: with open(config_path, "r", encoding="utf-8") as f:
config_data = json.load(f) config_data = json.load(f)
config_data["test_api_project_id"] = project_id config_data["test_api_project_id"] = project_id
config_data["test_api_openapi_url"] = openapi_url config_data["test_api_openapi_url"] = openapi_url
config_data["test_api_version_url"] = version_url config_data["test_api_version_url"] = version_url
with open(config_path, "w+", encoding="utf-8") as f: with open(config_path, "w+", encoding="utf-8") as f:
json.dump(config_data, f, indent=4) json.dump(config_data, f, indent=4)
@ -82,7 +82,7 @@ def main():
# 读取全局配置 # 读取全局配置
server_url, username, password = read_global_config() server_url, username, password = read_global_config()
# 读取仓库配置 # 读取仓库配置
project_id, openapi_url, version_url = read_repo_config() project_id, openapi_url, version_url = read_repo_config()
@ -93,28 +93,30 @@ def main():
project_id_editor = TextEditor(project_id) project_id_editor = TextEditor(project_id)
openapi_url_editor = TextEditor(openapi_url) openapi_url_editor = TextEditor(openapi_url)
version_url_editor = TextEditor(version_url) version_url_editor = TextEditor(version_url)
# 创建表单 # 创建表单
form = Form([ form = Form(
"## DevChat API 测试服务器配置", [
"请输入服务器 URL (例如: http://kagent.merico.cn:8000):", "## DevChat API 测试服务器配置",
server_url_editor, "请输入服务器 URL (例如: http://kagent.merico.cn:8000):",
"请输入用户名:", server_url_editor,
username_editor, "请输入用户名:",
"请输入密码:", username_editor,
password_editor, "请输入密码:",
"## 仓库配置", password_editor,
"请输入DevChat API 测试服务器中项目 ID (例如: 37):", "## 仓库配置",
project_id_editor, "请输入DevChat API 测试服务器中项目 ID (例如: 37):",
"请输入 OpenAPI URL (例如: http://kagent.merico.cn:8080/openapi.json):", project_id_editor,
openapi_url_editor, "请输入 OpenAPI URL (例如: http://kagent.merico.cn:8080/openapi.json):",
"请输入版本 URL (例如: http://kagent.merico.cn:8080/version),不输入表示不需要检查服务器测试环境版本:", openapi_url_editor,
version_url_editor, "请输入版本 URL (例如: http://kagent.merico.cn:8080/version),不输入表示不需要检查服务器测试环境版本:",
]) version_url_editor,
]
)
# 渲染表单 # 渲染表单
form.render() form.render()
# 获取用户输入 # 获取用户输入
server_url = server_url_editor.new_text.strip() server_url = server_url_editor.new_text.strip()
username = username_editor.new_text.strip() username = username_editor.new_text.strip()
@ -129,7 +131,7 @@ def main():
else: else:
print("请提供完整的全局配置信息 (SERVER_URL, USERNAME, PASSWORD)。") print("请提供完整的全局配置信息 (SERVER_URL, USERNAME, PASSWORD)。")
sys.exit(1) sys.exit(1)
# 保存仓库配置 # 保存仓库配置
if project_id and openapi_url and version_url: if project_id and openapi_url and version_url:
save_repo_config(project_id, openapi_url, version_url) save_repo_config(project_id, openapi_url, version_url)
@ -139,11 +141,13 @@ def main():
print("\n配置信息已成功保存!") print("\n配置信息已成功保存!")
print(f"全局配置: SERVER_URL={server_url}, USERNAME={username}, PASSWORD={'*' * len(password)}") print(f"全局配置: SERVER_URL={server_url}, USERNAME={username}, PASSWORD={'*' * len(password)}")
print(f"仓库配置: PROJECT_ID={project_id}, OPENAPI_URL={openapi_url}, VERSION_URL={version_url}") print(
f"仓库配置: PROJECT_ID={project_id}, OPENAPI_URL={openapi_url}, VERSION_URL={version_url}"
)
print("\n您现在可以使用其他 API 测试工作流了。") print("\n您现在可以使用其他 API 测试工作流了。")
sys.exit(0) sys.exit(0)
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@ -8,9 +8,17 @@ import requests
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) ROOT_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.append(ROOT_DIR) sys.path.append(ROOT_DIR)
from api.utils import PROJECT_ID, SERVER_URL, OPENAPI_URL, VERSION_URL, get_path_op_id, session from api.utils import ( # noqa: E402
OPENAPI_URL,
PROJECT_ID,
SERVER_URL,
VERSION_URL,
get_path_op_id,
session,
) # noqa: E402
from lib.chatmark.step import Step # noqa: E402
from lib.chatmark.step import Step
def get_apidocs(): def get_apidocs():
res = session.get( res = session.get(
@ -23,9 +31,7 @@ def get_apidocs():
def delete_old_apidocs(): def delete_old_apidocs():
apidocs = get_apidocs() apidocs = get_apidocs()
for apidoc in apidocs: for apidoc in apidocs:
session.delete( session.delete(f"{SERVER_URL}/autotest/projects/{PROJECT_ID}/apidocs/{apidoc['id']}")
f"{SERVER_URL}/autotest/projects/{PROJECT_ID}/apidocs/{apidoc['id']}"
)
def get_local_version(): def get_local_version():
@ -39,7 +45,7 @@ def check_api_version():
if not VERSION_URL: if not VERSION_URL:
print("未配置VERSION_URL跳过API版本检查...") print("未配置VERSION_URL跳过API版本检查...")
return return
local_version = get_local_version() local_version = get_local_version()
print("检查被测服务器文档是否已经更新到最新版本...") print("检查被测服务器文档是否已经更新到最新版本...")
while True: while True:
@ -51,7 +57,7 @@ def check_api_version():
break break
else: else:
print( print(
f".", ".",
end="", end="",
flush=True, flush=True,
) )
@ -74,7 +80,7 @@ def wait_for_testcase_done(testcase_id):
break break
else: else:
print( print(
f".", ".",
end="", end="",
flush=True, flush=True,
) )
@ -95,7 +101,7 @@ def wait_for_testcode_done(task_id):
break break
else: else:
print( print(
f".", ".",
end="", end="",
flush=True, flush=True,
) )
@ -124,7 +130,7 @@ def wait_for_task_done(task_id):
break break
else: else:
print( print(
f".", ".",
end="", end="",
flush=True, flush=True,
) )
@ -143,9 +149,7 @@ def get_testcase(api_path_id):
def main(): def main():
error_msg = ( error_msg = "请输入要测试的API名称和测试目标/test.api.upload api_path method test_target"
"请输入要测试的API名称和测试目标/test.api.upload api_path method test_target"
)
if len(sys.argv) < 2: if len(sys.argv) < 2:
print(error_msg) print(error_msg)
return return
@ -160,15 +164,13 @@ def main():
with Step("检查 API 版本是否更新..."): with Step("检查 API 版本是否更新..."):
check_api_version() check_api_version()
delete_old_apidocs() delete_old_apidocs()
with Step( with Step(f"上传 OpenAPI 文档,并且触发 API {api_path} 的测试用例和自动测试脚本生成任务..."):
f"上传 OpenAPI 文档,并且触发 API {api_path} 的测试用例和自动测试脚本生成任务..."
):
# 使用配置的OPENAPI_URL # 使用配置的OPENAPI_URL
if not OPENAPI_URL: if not OPENAPI_URL:
print("错误未配置OPENAPI_URL无法获取OpenAPI文档") print("错误未配置OPENAPI_URL无法获取OpenAPI文档")
return return
res = requests.get( res = requests.get(
OPENAPI_URL, OPENAPI_URL,
) )
@ -219,7 +221,7 @@ def main():
else: else:
print(f"提交执行自动测试脚本失败!{res.text}") print(f"提交执行自动测试脚本失败!{res.text}")
return return
api_path_id = get_path_op_id(api_path, method) api_path_id = get_path_op_id(api_path, method)
with Step("开始查询测试脚本执行结果..."): with Step("开始查询测试脚本执行结果..."):
while True: while True:
@ -242,4 +244,4 @@ def main():
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@ -1,7 +1,8 @@
import os
import json import json
import os
import requests import requests
import sys
from lib.workflow.call import workflow_call from lib.workflow.call import workflow_call
# 默认配置,仅在无法读取配置文件时使用 # 默认配置,仅在无法读取配置文件时使用
@ -10,6 +11,7 @@ from lib.workflow.call import workflow_call
session = requests.Session() session = requests.Session()
_is_login = False _is_login = False
def read_config(): def read_config():
"""读取配置文件中的设置""" """读取配置文件中的设置"""
# 读取全局配置 # 读取全局配置
@ -21,7 +23,7 @@ def read_config():
global_config = json.load(f) global_config = json.load(f)
except Exception: except Exception:
pass pass
# 读取仓库配置 # 读取仓库配置
repo_config_path = os.path.join(os.getcwd(), ".chat", ".workflow_config.json") repo_config_path = os.path.join(os.getcwd(), ".chat", ".workflow_config.json")
repo_config = {} repo_config = {}
@ -31,7 +33,7 @@ def read_config():
repo_config = json.load(f) repo_config = json.load(f)
except Exception: except Exception:
pass pass
# 获取配置值 # 获取配置值
server_url = global_config.get("api_testing_server_url", "") server_url = global_config.get("api_testing_server_url", "")
username = global_config.get("api_testing_server_username", "") username = global_config.get("api_testing_server_username", "")
@ -39,18 +41,19 @@ def read_config():
project_id = repo_config.get("test_api_project_id", "") project_id = repo_config.get("test_api_project_id", "")
openapi_url = repo_config.get("test_api_openapi_url", "") openapi_url = repo_config.get("test_api_openapi_url", "")
version_url = repo_config.get("test_api_version_url", "") version_url = repo_config.get("test_api_version_url", "")
return server_url, username, password, project_id, openapi_url, version_url return server_url, username, password, project_id, openapi_url, version_url
def ensure_config(): def ensure_config():
"""确保配置存在,如果不存在则调用配置工作流""" """确保配置存在,如果不存在则调用配置工作流"""
global_config_path = os.path.join(os.path.expanduser("~/.chat"), ".workflow_config.json") global_config_path = os.path.join(os.path.expanduser("~/.chat"), ".workflow_config.json")
repo_config_path = os.path.join(os.getcwd(), ".chat", ".workflow_config.json") repo_config_path = os.path.join(os.getcwd(), ".chat", ".workflow_config.json")
# 检查全局配置和仓库配置是否存在 # 检查全局配置和仓库配置是否存在
global_config_exists = os.path.exists(global_config_path) global_config_exists = os.path.exists(global_config_path)
repo_config_exists = os.path.exists(repo_config_path) repo_config_exists = os.path.exists(repo_config_path)
# 检查必填配置项是否存在 # 检查必填配置项是否存在
config_valid = True config_valid = True
if global_config_exists and repo_config_exists: if global_config_exists and repo_config_exists:
@ -64,21 +67,23 @@ def ensure_config():
repo_config = json.load(f) repo_config = json.load(f)
except Exception: except Exception:
config_valid = False config_valid = False
# 检查必填项 # 检查必填项
if (not global_config.get("api_testing_server_url") or if (
not global_config.get("api_testing_server_username") or not global_config.get("api_testing_server_url")
not global_config.get("api_testing_server_password") or or not global_config.get("api_testing_server_username")
not repo_config.get("test_api_project_id") or or not global_config.get("api_testing_server_password")
not repo_config.get("test_api_openapi_url")): or not repo_config.get("test_api_project_id")
or not repo_config.get("test_api_openapi_url")
):
config_valid = False config_valid = False
else: else:
config_valid = False config_valid = False
if not config_valid: if not config_valid:
print("缺少API测试所需的配置将启动配置向导...") print("缺少API测试所需的配置将启动配置向导...")
workflow_call("/test.api.config") workflow_call("/test.api.config")
# 重新检查配置是否已创建并包含必要项 # 重新检查配置是否已创建并包含必要项
try: try:
if os.path.exists(global_config_path) and os.path.exists(repo_config_path): if os.path.exists(global_config_path) and os.path.exists(repo_config_path):
@ -86,21 +91,24 @@ def ensure_config():
global_config = json.load(f) global_config = json.load(f)
with open(repo_config_path, "r", encoding="utf-8") as f: with open(repo_config_path, "r", encoding="utf-8") as f:
repo_config = json.load(f) repo_config = json.load(f)
if (global_config.get("api_testing_server_url") and if (
global_config.get("api_testing_server_username") and global_config.get("api_testing_server_url")
global_config.get("api_testing_server_password") and and global_config.get("api_testing_server_username")
repo_config.get("test_api_project_id") and and global_config.get("api_testing_server_password")
repo_config.get("test_api_openapi_url")): and repo_config.get("test_api_project_id")
and repo_config.get("test_api_openapi_url")
):
return True return True
print("配置失败") print("配置失败")
return False return False
except Exception: except Exception:
print("配置失败") print("配置失败")
return False return False
return True return True
# 读取配置 # 读取配置
result = ensure_config() result = ensure_config()
if not result: if not result:
@ -108,6 +116,7 @@ if not result:
exit(0) exit(0)
SERVER_URL, USERNAME, PASSWORD, PROJECT_ID, OPENAPI_URL, VERSION_URL = read_config() SERVER_URL, USERNAME, PASSWORD, PROJECT_ID, OPENAPI_URL, VERSION_URL = read_config()
def login(): def login():
global _is_login global _is_login
if _is_login: if _is_login:
@ -129,4 +138,4 @@ def get_path_op_id(keyword: str, method: str):
return pathop["id"] return pathop["id"]
login() login()

View File

@ -1,3 +1,3 @@
from .call import workflow_call from .call import workflow_call
__all__ = ["workflow_call"] __all__ = ["workflow_call"]

View File

@ -1,16 +1,18 @@
import os import os
import sys
import subprocess
import yaml
import re import re
import subprocess
import sys
import yaml
def find_workflow_script(command_name: str) -> str: def find_workflow_script(command_name: str) -> str:
""" """
根据命令名称查找对应的工作流脚本路径 根据命令名称查找对应的工作流脚本路径
Args: Args:
command_name: 工作流命令名称 "github.commit" command_name: 工作流命令名称 "github.commit"
Returns: Returns:
找到的脚本路径如果未找到则返回空字符串 找到的脚本路径如果未找到则返回空字符串
""" """
@ -18,12 +20,12 @@ def find_workflow_script(command_name: str) -> str:
workflow_base_dirs = [ workflow_base_dirs = [
os.path.expanduser("~/.chat/scripts/custom"), os.path.expanduser("~/.chat/scripts/custom"),
os.path.expanduser("~/.chat/scripts/community"), os.path.expanduser("~/.chat/scripts/community"),
os.path.expanduser("~/.chat/scripts/merico") os.path.expanduser("~/.chat/scripts/merico"),
] ]
# 解析命令名称,处理子命令 # 解析命令名称,处理子命令
parts = command_name.split('.') parts = command_name.split(".")
for base_dir in workflow_base_dirs: for base_dir in workflow_base_dirs:
# 检查custom目录下是否有config.yml定义命名空间 # 检查custom目录下是否有config.yml定义命名空间
if base_dir.endswith("/custom"): if base_dir.endswith("/custom"):
@ -31,12 +33,12 @@ def find_workflow_script(command_name: str) -> str:
namespaces = [] namespaces = []
if os.path.exists(config_path): if os.path.exists(config_path):
try: try:
with open(config_path, 'r', encoding='utf-8') as f: with open(config_path, "r", encoding="utf-8") as f:
config = yaml.safe_load(f) config = yaml.safe_load(f)
namespaces = config.get('namespaces', []) namespaces = config.get("namespaces", [])
except Exception: except Exception:
pass pass
# 在每个命名空间下查找 # 在每个命名空间下查找
for namespace in namespaces: for namespace in namespaces:
namespace_dir = os.path.join(base_dir, namespace) namespace_dir = os.path.join(base_dir, namespace)
@ -48,98 +50,100 @@ def find_workflow_script(command_name: str) -> str:
script_path = find_script_in_path(base_dir, parts) script_path = find_script_in_path(base_dir, parts)
if script_path: if script_path:
return script_path return script_path
return "" return ""
def find_script_in_path(base_dir: str, command_parts: list) -> str: def find_script_in_path(base_dir: str, command_parts: list) -> str:
""" """
在指定路径下查找工作流脚本 在指定路径下查找工作流脚本
Args: Args:
base_dir: 基础目录 base_dir: 基础目录
command_parts: 命令名称拆分的部分 command_parts: 命令名称拆分的部分
Returns: Returns:
找到的脚本路径如果未找到则返回空字符串 找到的脚本路径如果未找到则返回空字符串
""" """
# 构建工作流目录路径 # 构建工作流目录路径
workflow_dir = os.path.join(base_dir, *command_parts) workflow_dir = os.path.join(base_dir, *command_parts)
# 检查目录是否存在 # 检查目录是否存在
if not os.path.isdir(workflow_dir): if not os.path.isdir(workflow_dir):
return "" return ""
# 查找目录下的Python脚本 # 查找目录下的Python脚本
py_files = [f for f in os.listdir(workflow_dir) if f.endswith('.py')] py_files = [f for f in os.listdir(workflow_dir) if f.endswith(".py")]
# 如果只有一个Python脚本直接返回 # 如果只有一个Python脚本直接返回
if len(py_files) == 1: if len(py_files) == 1:
return os.path.join(workflow_dir, py_files[0]) return os.path.join(workflow_dir, py_files[0])
# 如果有多个Python脚本查找command.yml # 如果有多个Python脚本查找command.yml
yml_path = os.path.join(workflow_dir, "command.yml") yml_path = os.path.join(workflow_dir, "command.yml")
if os.path.exists(yml_path): if os.path.exists(yml_path):
try: try:
with open(yml_path, 'r', encoding='utf-8') as f: with open(yml_path, "r", encoding="utf-8") as f:
yml_content = f.read() yml_content = f.read()
# 查找steps部分中的脚本名称 # 查找steps部分中的脚本名称
for py_file in py_files: for py_file in py_files:
py_file_name = os.path.splitext(py_file)[0] py_file_name = os.path.splitext(py_file)[0]
# 查找类似 $command_path/script_name.py 的模式 # 查找类似 $command_path/script_name.py 的模式
if re.search(rf'\$command_path/{py_file_name}\.py', yml_content): if re.search(rf"\$command_path/{py_file_name}\.py", yml_content):
return os.path.join(workflow_dir, py_file) return os.path.join(workflow_dir, py_file)
except Exception: except Exception:
pass pass
# 尝试查找与最后一个命令部分同名的脚本 # 尝试查找与最后一个命令部分同名的脚本
last_part_script = f"{command_parts[-1]}.py" last_part_script = f"{command_parts[-1]}.py"
if last_part_script in py_files: if last_part_script in py_files:
return os.path.join(workflow_dir, last_part_script) return os.path.join(workflow_dir, last_part_script)
# 尝试查找名为command.py的脚本 # 尝试查找名为command.py的脚本
if "command.py" in py_files: if "command.py" in py_files:
return os.path.join(workflow_dir, "command.py") return os.path.join(workflow_dir, "command.py")
# 没有找到合适的脚本 # 没有找到合适的脚本
return "" return ""
def workflow_call(command: str) -> int: def workflow_call(command: str) -> int:
""" """
调用工作流命令 调用工作流命令
Args: Args:
command: 完整的工作流命令 "/github.commit message" command: 完整的工作流命令 "/github.commit message"
Returns: Returns:
命令执行的返回码 命令执行的返回码
""" """
# 解析命令和参数 # 解析命令和参数
parts = command.strip().split(maxsplit=1) parts = command.strip().split(maxsplit=1)
cmd_name = parts[0].lstrip('/') cmd_name = parts[0].lstrip("/")
cmd_args = parts[1] if len(parts) > 1 else "" cmd_args = parts[1] if len(parts) > 1 else ""
# 查找对应的工作流脚本 # 查找对应的工作流脚本
script_path = find_workflow_script(cmd_name) script_path = find_workflow_script(cmd_name)
if not script_path: if not script_path:
print(f"找不到工作流命令: {cmd_name}") print(f"找不到工作流命令: {cmd_name}")
return 1 return 1
# 使用Popen并将标准输入输出错误流连接到父进程 # 使用Popen并将标准输入输出错误流连接到父进程
process = subprocess.Popen( process = subprocess.Popen(
[sys.executable, script_path, cmd_args], [sys.executable, script_path, cmd_args],
stdin=sys.stdin, # 父进程的标准输入传递给子进程 stdin=sys.stdin, # 父进程的标准输入传递给子进程
stdout=sys.stdout, # 子进程的标准输出传递给父进程 stdout=sys.stdout, # 子进程的标准输出传递给父进程
stderr=sys.stderr, # 子进程的标准错误传递给父进程 stderr=sys.stderr, # 子进程的标准错误传递给父进程
text=True, # 使用文本模式 text=True, # 使用文本模式
bufsize=1 # 行缓冲,确保输出及时显示 bufsize=1, # 行缓冲,确保输出及时显示
) )
# 等待子进程完成并获取返回码 # 等待子进程完成并获取返回码
return_code = process.wait() return_code = process.wait()
if return_code != 0: if return_code != 0:
print(f"命令执行失败,返回码: {return_code}") print(f"命令执行失败,返回码: {return_code}")
return return_code return return_code

View File

@ -3,22 +3,26 @@ import os
import sys import sys
from devchat.llm import chat from devchat.llm import chat
from lib.ide_service import IDEService from lib.ide_service import IDEService
ROOT_WORKFLOW_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) ROOT_WORKFLOW_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.append(ROOT_WORKFLOW_DIR) sys.path.append(ROOT_WORKFLOW_DIR)
from chatflow.util.contexts import CONTEXTS from chatflow.util.contexts import CONTEXTS # noqa: E402
PROMPT = (
f"""
PROMPT = f"""
{CONTEXTS.replace("{", "{{").replace("}", "}}")} {CONTEXTS.replace("{", "{{").replace("}", "}}")}
""" + """ """
+ """
当前选中代码{selected_code} 当前选中代码{selected_code}
当前打开文件路径{file_path} 当前打开文件路径{file_path}
用户要求或问题{question} 用户要求或问题{question}
""" """
)
@chat(prompt=PROMPT, stream_out=True) @chat(prompt=PROMPT, stream_out=True)
def ask(question, selected_code, file_path): def ask(question, selected_code, file_path):
pass pass
@ -38,5 +42,6 @@ def main(question):
ask(question=question, selected_code=code_text, file_path=file_path) ask(question=question, selected_code=code_text, file_path=file_path)
sys.exit(0) sys.exit(0)
if __name__ == "__main__": if __name__ == "__main__":
main(sys.argv[1]) main(sys.argv[1])

View File

@ -7,20 +7,19 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import os import os
import sys
import re
import yaml
import subprocess import subprocess
from pathlib import Path import sys
from lib.chatmark import Button, Form, TextEditor, Step, Radio, Checkbox
from lib.ide_service import IDEService import yaml
from devchat.llm import chat, chat_json from devchat.llm import chat, chat_json
from lib.chatmark import Form, Radio, Step, TextEditor
from lib.ide_service import IDEService
ROOT_WORKFLOW_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) ROOT_WORKFLOW_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.append(ROOT_WORKFLOW_DIR) sys.path.append(ROOT_WORKFLOW_DIR)
from chatflow.util.contexts import CONTEXTS from chatflow.util.contexts import CONTEXTS # noqa: E402
# 工作流命令定义模板 # 工作流命令定义模板
COMMAND_YML_TEMPLATE = """description: {description} COMMAND_YML_TEMPLATE = """description: {description}
@ -140,80 +139,89 @@ CODE_IMPLEMENTATION_PROMPT = """
``` ```
""" """
@chat_json(prompt=EXTRACT_INFO_PROMPT) @chat_json(prompt=EXTRACT_INFO_PROMPT)
def extract_workflow_info(user_input, contexts): def extract_workflow_info(user_input, contexts):
"""从用户输入中提取工作流信息""" """从用户输入中提取工作流信息"""
pass pass
@chat(prompt=WORKFLOW_STEPS_PROMPT, stream_out=False) @chat(prompt=WORKFLOW_STEPS_PROMPT, stream_out=False)
def generate_workflow_steps(command_name, description, input_requirement, purpose, implementation_ideas, contexts): def generate_workflow_steps(
command_name, description, input_requirement, purpose, implementation_ideas, contexts
):
"""生成工作流实现步骤描述""" """生成工作流实现步骤描述"""
pass pass
@chat(prompt=CODE_IMPLEMENTATION_PROMPT, stream_out=False) @chat(prompt=CODE_IMPLEMENTATION_PROMPT, stream_out=False)
def generate_workflow_code(command_name, description, input_requirement, custom_env, workflow_steps, contexts): def generate_workflow_code(
command_name, description, input_requirement, custom_env, workflow_steps, contexts
):
"""生成工作流实现代码""" """生成工作流实现代码"""
pass pass
def parse_command_path(command_name): def parse_command_path(command_name):
""" """
解析命令路径返回目录结构和最终命令名 解析命令路径返回目录结构和最终命令名
确保工作流创建在custom目录下的有效namespace中 确保工作流创建在custom目录下的有效namespace中
""" """
parts = command_name.strip('/').split('.') parts = command_name.strip("/").split(".")
# 获取custom目录路径 # 获取custom目录路径
custom_dir = os.path.join(os.path.expanduser('~'), '.chat', 'scripts', 'custom') custom_dir = os.path.join(os.path.expanduser("~"), ".chat", "scripts", "custom")
# 获取custom目录下的有效namespace # 获取custom目录下的有效namespace
valid_namespaces = [] valid_namespaces = []
config_path = os.path.join(custom_dir, 'config.yml') config_path = os.path.join(custom_dir, "config.yml")
if os.path.exists(config_path): if os.path.exists(config_path):
try: try:
with open(config_path, 'r') as f: with open(config_path, "r") as f:
config = yaml.safe_load(f) config = yaml.safe_load(f)
if config and 'namespaces' in config: if config and "namespaces" in config:
valid_namespaces = config['namespaces'] valid_namespaces = config["namespaces"]
except Exception as e: except Exception as e:
print(f"读取custom配置文件失败: {str(e)}") print(f"读取custom配置文件失败: {str(e)}")
# 如果没有找到有效namespace使用默认namespace # 如果没有找到有效namespace使用默认namespace
if not valid_namespaces: if not valid_namespaces:
print("警告: 未找到有效的custom namespace将使用默认namespace 'default'") print("警告: 未找到有效的custom namespace将使用默认namespace 'default'")
valid_namespaces = ['default'] valid_namespaces = ["default"]
# 确保default namespace存在于config.yml中 # 确保default namespace存在于config.yml中
try: try:
os.makedirs(os.path.dirname(config_path), exist_ok=True) os.makedirs(os.path.dirname(config_path), exist_ok=True)
with open(config_path, 'w') as f: with open(config_path, "w") as f:
yaml.dump({'namespaces': ['default']}, f) yaml.dump({"namespaces": ["default"]}, f)
except Exception as e: except Exception as e:
print(f"创建默认namespace配置失败: {str(e)}") print(f"创建默认namespace配置失败: {str(e)}")
# 使用第一个有效namespace # 使用第一个有效namespace
namespace = valid_namespaces[0] namespace = valid_namespaces[0]
# 创建最终命令目录路径 # 创建最终命令目录路径
command_dir = os.path.join(custom_dir, namespace) command_dir = os.path.join(custom_dir, namespace)
# 创建目录结构 # 创建目录结构
for part in parts[:-1]: for part in parts[:-1]:
command_dir = os.path.join(command_dir, part) command_dir = os.path.join(command_dir, part)
return command_dir, parts[-1] return command_dir, parts[-1]
def parse_markdown_block(response, block_type="steps"): def parse_markdown_block(response, block_type="steps"):
""" """
从AI响应中解析指定类型的Markdown代码块内容支持处理嵌套的代码块 从AI响应中解析指定类型的Markdown代码块内容支持处理嵌套的代码块
Args: Args:
response (str): AI生成的响应文本 response (str): AI生成的响应文本
block_type (str): 要解析的代码块类型默认为"steps" block_type (str): 要解析的代码块类型默认为"steps"
Returns: Returns:
str: 解析出的代码块内容 str: 解析出的代码块内容
Raises: Raises:
Exception: 解析失败时抛出异常 Exception: 解析失败时抛出异常
""" """
@ -221,34 +229,34 @@ def parse_markdown_block(response, block_type="steps"):
# 处理可能存在的思考过程 # 处理可能存在的思考过程
if response.find("</think>") != -1: if response.find("</think>") != -1:
response = response.split("</think>")[-1] response = response.split("</think>")[-1]
# 构建起始标记 # 构建起始标记
start_marker = f"```{block_type}" start_marker = f"```{block_type}"
end_marker = "```" end_marker = "```"
# 查找起始位置 # 查找起始位置
start_pos = response.find(start_marker) start_pos = response.find(start_marker)
if start_pos == -1: if start_pos == -1:
# 如果没有找到指定类型的标记,直接返回原文本 # 如果没有找到指定类型的标记,直接返回原文本
return response.strip() return response.strip()
# 从标记后开始的位置 # 从标记后开始的位置
content_start = start_pos + len(start_marker) content_start = start_pos + len(start_marker)
# 从content_start开始找到第一个未配对的``` # 从content_start开始找到第一个未配对的```
pos = content_start pos = content_start
open_blocks = 1 # 已经有一个开放的块 open_blocks = 1 # 已经有一个开放的块
while True: while True:
# 找到下一个``` # 找到下一个```
next_marker = response.find(end_marker, pos) next_marker = response.find(end_marker, pos)
if next_marker == -1: if next_marker == -1:
# 如果没有找到结束标记,返回剩余所有内容 # 如果没有找到结束标记,返回剩余所有内容
break break
# 检查这是开始还是结束标记 # 检查这是开始还是结束标记
# 向后看是否跟着语言标识符 # 向后看是否跟着语言标识符
after_marker = response[next_marker + 3:] after_marker = response[next_marker + 3 :]
# 检查是否是新的代码块开始 - 只要```后面跟着非空白字符,就认为是新代码块开始 # 检查是否是新的代码块开始 - 只要```后面跟着非空白字符,就认为是新代码块开始
if after_marker.strip() and not after_marker.startswith("\n"): if after_marker.strip() and not after_marker.startswith("\n"):
first_word = after_marker.split()[0] first_word = after_marker.split()[0]
@ -258,66 +266,67 @@ def parse_markdown_block(response, block_type="steps"):
open_blocks -= 1 open_blocks -= 1
else: else:
open_blocks -= 1 open_blocks -= 1
if open_blocks == 0: if open_blocks == 0:
# 找到匹配的结束标记 # 找到匹配的结束标记
return response[content_start:next_marker].strip() return response[content_start:next_marker].strip()
pos = next_marker + 3 pos = next_marker + 3
# 如果没有找到匹配的结束标记返回从content_start到末尾的内容 # 如果没有找到匹配的结束标记返回从content_start到末尾的内容
return response[content_start:].strip() return response[content_start:].strip()
except Exception as e: except Exception as e:
import logging import logging
logging.info(f"Response: {response}") logging.info(f"Response: {response}")
logging.error(f"Exception in parse_markdown_block: {str(e)}") logging.error(f"Exception in parse_markdown_block: {str(e)}")
raise Exception(f"解析{block_type}内容失败: {str(e)}") from e raise Exception(f"解析{block_type}内容失败: {str(e)}") from e
def create_workflow_files(command_dir, command_name, description, input_required,
code): def create_workflow_files(command_dir, command_name, description, input_required, code):
"""创建工作流命令文件""" """创建工作流命令文件"""
# 创建命令目录 # 创建命令目录
os.makedirs(command_dir, exist_ok=True) os.makedirs(command_dir, exist_ok=True)
# 创建command.yml # 创建command.yml
input_section = f"input: {'required' if input_required else 'optional'}" input_section = f"input: {'required' if input_required else 'optional'}"
help_section = "help: README.md" help_section = "help: README.md"
input_param = ' "$input"' if input_required else '' input_param = ' "$input"' if input_required else ""
# 添加自定义环境配置 # 添加自定义环境配置
workflow_python_section = "" workflow_python_section = ""
python_cmd = "devchat_python" python_cmd = "devchat_python"
yml_content = COMMAND_YML_TEMPLATE.format( yml_content = COMMAND_YML_TEMPLATE.format(
description=description, description=description,
input_section=input_section, input_section=input_section,
help_section=help_section, help_section=help_section,
workflow_python_section=workflow_python_section, workflow_python_section=workflow_python_section,
python_cmd=python_cmd, python_cmd=python_cmd,
input_param=input_param input_param=input_param,
) )
with open(os.path.join(command_dir, 'command.yml'), 'w') as f: with open(os.path.join(command_dir, "command.yml"), "w") as f:
f.write(yml_content) f.write(yml_content)
# 创建command.py # 创建command.py
with open(os.path.join(command_dir, 'command.py'), 'w') as f: with open(os.path.join(command_dir, "command.py"), "w") as f:
f.write(code) f.write(code)
# 设置执行权限 # 设置执行权限
os.chmod(os.path.join(command_dir, 'command.py'), 0o755) os.chmod(os.path.join(command_dir, "command.py"), 0o755)
# 创建README.md # 创建README.md
readme_content = f"# {command_name}\n\n{description}\n" readme_content = f"# {command_name}\n\n{description}\n"
with open(os.path.join(command_dir, 'README.md'), 'w') as f: with open(os.path.join(command_dir, "README.md"), "w") as f:
f.write(readme_content) f.write(readme_content)
def main(): def main():
# 获取用户输入 # 获取用户输入
user_input = sys.argv[1] if len(sys.argv) > 1 else "" user_input = sys.argv[1] if len(sys.argv) > 1 else ""
# 步骤1: 通过AI分析用户输入提取必要信息 # 步骤1: 通过AI分析用户输入提取必要信息
with Step("分析用户输入,提取工作流信息..."): with Step("分析用户输入,提取工作流信息..."):
workflow_info = extract_workflow_info(user_input=user_input, contexts=CONTEXTS) workflow_info = extract_workflow_info(user_input=user_input, contexts=CONTEXTS)
@ -330,14 +339,14 @@ def main():
input_requirement="可选", input_requirement="可选",
purpose=workflow_info.get("purpose", ""), purpose=workflow_info.get("purpose", ""),
implementation_ideas=workflow_info.get("implementation_ideas", ""), implementation_ideas=workflow_info.get("implementation_ideas", ""),
contexts=CONTEXTS contexts=CONTEXTS,
) )
workflow_steps = parse_markdown_block(workflow_steps, block_type="steps") workflow_steps = parse_markdown_block(workflow_steps, block_type="steps")
# 步骤2: 使用Form组件一次性展示所有信息让用户编辑 # 步骤2: 使用Form组件一次性展示所有信息让用户编辑
print("\n## 工作流信息\n") print("\n## 工作流信息\n")
# 创建所有编辑组件 # 创建所有编辑组件
command_name_editor = TextEditor(workflow_info.get("command_name", "/example.command")) command_name_editor = TextEditor(workflow_info.get("command_name", "/example.command"))
description_editor = TextEditor(workflow_info.get("description", "工作流命令描述")) description_editor = TextEditor(workflow_info.get("description", "工作流命令描述"))
@ -345,33 +354,32 @@ def main():
purpose_editor = TextEditor(workflow_info.get("purpose", "请描述工作流的主要目的和功能")) purpose_editor = TextEditor(workflow_info.get("purpose", "请描述工作流的主要目的和功能"))
# ideas_editor = TextEditor(workflow_info.get("implementation_ideas", "请描述实现思路")) # ideas_editor = TextEditor(workflow_info.get("implementation_ideas", "请描述实现思路"))
steps_editor = TextEditor(workflow_steps) steps_editor = TextEditor(workflow_steps)
# 使用Form组件一次性展示所有编辑组件 # 使用Form组件一次性展示所有编辑组件
form = Form([ form = Form(
"命令名称:", [
command_name_editor, "命令名称:",
"输入要求:", command_name_editor,
input_radio, "输入要求:",
"描述:", input_radio,
description_editor, "描述:",
"工作流目的:", description_editor,
purpose_editor, "工作流目的:",
"实现步骤:", purpose_editor,
steps_editor "实现步骤:",
]) steps_editor,
]
)
form.render() form.render()
# 获取用户编辑后的值 # 获取用户编辑后的值
command_name = command_name_editor.new_text command_name = command_name_editor.new_text
description = description_editor.new_text description = description_editor.new_text
input_required = input_radio.selection == 0 input_required = input_radio.selection == 0
purpose = purpose_editor.new_text
# implementation_ideas = ideas_editor.new_text # implementation_ideas = ideas_editor.new_text
workflow_steps = steps_editor.new_text workflow_steps = steps_editor.new_text
# 步骤4: 生成工作流实现代码 # 步骤4: 生成工作流实现代码
with Step("生成工作流实现代码..."): with Step("生成工作流实现代码..."):
code = generate_workflow_code( code = generate_workflow_code(
@ -380,42 +388,37 @@ def main():
input_requirement="必选" if input_required else "可选", input_requirement="必选" if input_required else "可选",
custom_env=False, custom_env=False,
workflow_steps=workflow_steps, workflow_steps=workflow_steps,
contexts=CONTEXTS contexts=CONTEXTS,
) )
code = code.strip() code = code.strip()
start_index = code.find("```python") start_index = code.find("```python")
end_index = code.rfind("```") end_index = code.rfind("```")
code = code[start_index + len("```python"):end_index].strip() code = code[start_index + len("```python") : end_index].strip()
# code = parse_markdown_block(code, block_type="python") # code = parse_markdown_block(code, block_type="python")
# 解析命令路径 # 解析命令路径
command_dir, final_command = parse_command_path(command_name) command_dir, final_command = parse_command_path(command_name)
full_command_dir = os.path.join(command_dir, final_command) full_command_dir = os.path.join(command_dir, final_command)
# 步骤5: 创建工作流文件 # 步骤5: 创建工作流文件
with Step(f"创建工作流文件到 {full_command_dir}"): with Step(f"创建工作流文件到 {full_command_dir}"):
create_workflow_files( create_workflow_files(full_command_dir, command_name, description, input_required, code)
full_command_dir,
command_name,
description,
input_required,
code
)
# 更新斜杠命令 # 更新斜杠命令
with Step("更新斜杠命令..."): with Step("更新斜杠命令..."):
IDEService().update_slash_commands() IDEService().update_slash_commands()
# 在新窗口中打开工作流目录 # 在新窗口中打开工作流目录
try: try:
subprocess.run(["code", full_command_dir], check=True) subprocess.run(["code", full_command_dir], check=True)
except subprocess.SubprocessError: except subprocess.SubprocessError:
print(f"无法自动打开编辑器,请手动打开工作流目录: {full_command_dir}") print(f"无法自动打开编辑器,请手动打开工作流目录: {full_command_dir}")
print(f"\n✅ 工作流命令 {command_name} 已成功创建!") print(f"\n✅ 工作流命令 {command_name} 已成功创建!")
print(f"命令文件位置: {full_command_dir}") print(f"命令文件位置: {full_command_dir}")
print("你现在可以在DevChat中使用这个命令了。") print("你现在可以在DevChat中使用这个命令了。")
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@ -10,7 +10,7 @@
""" """
import os import os
from typing import List
def load_file_in_user_scripts(filename: str) -> str: def load_file_in_user_scripts(filename: str) -> str:
""" """
@ -20,7 +20,8 @@ def load_file_in_user_scripts(filename: str) -> str:
file_path = os.path.join(user_path, filename) file_path = os.path.join(user_path, filename)
with open(file_path, "r") as f: with open(file_path, "r") as f:
return f.read() return f.read()
def load_local_file(filename: str) -> str: def load_local_file(filename: str) -> str:
""" """
从当前脚本所在目录的相对目录加载文件内容 从当前脚本所在目录的相对目录加载文件内容
@ -30,8 +31,9 @@ def load_local_file(filename: str) -> str:
with open(file_path, "r") as f: with open(file_path, "r") as f:
return f.read() return f.read()
def load_existing_workflow_defines() -> str: def load_existing_workflow_defines() -> str:
""" 从user scripts目录遍历找到所有command.yml文件并加载其内容 """ """从user scripts目录遍历找到所有command.yml文件并加载其内容"""
merico_path = os.path.expanduser("~/.chat/scripts/merico") merico_path = os.path.expanduser("~/.chat/scripts/merico")
community_path = os.path.expanduser("~/.chat/scripts/community") community_path = os.path.expanduser("~/.chat/scripts/community")
custom_path = os.path.expanduser("~/.chat/scripts/custom") custom_path = os.path.expanduser("~/.chat/scripts/custom")
@ -43,7 +45,7 @@ def load_existing_workflow_defines() -> str:
if root == path: if root == path:
root_paths.extend([os.path.join(root, d) for d in dirs]) root_paths.extend([os.path.join(root, d) for d in dirs])
break break
wrkflow_defines = [] wrkflow_defines = []
# 遍历所有根目录对每个根目录进行递归遍历找到所有command.yml文件并加载其内容 # 遍历所有根目录对每个根目录进行递归遍历找到所有command.yml文件并加载其内容
# 将目录名称与command.yml内容拼接成一个字符串添加到wrkflow_defines列表中 # 将目录名称与command.yml内容拼接成一个字符串添加到wrkflow_defines列表中
@ -53,7 +55,12 @@ def load_existing_workflow_defines() -> str:
for root, dirs, files in os.walk(root_path): for root, dirs, files in os.walk(root_path):
if "command.yml" in files: if "command.yml" in files:
with open(os.path.join(root, "command.yml"), "r") as f: with open(os.path.join(root, "command.yml"), "r") as f:
wrkflow_defines.append(f"工作流命令/{root[len(root_path)+1:].replace(os.sep, '.')}的定义:\n{f.read()}\n\n") wrkflow_defines.append(
(
f"工作流命令/{root[len(root_path)+1:].replace(os.sep, '.')}的定义:"
f"\n{f.read()}\n\n"
)
)
return "\n".join(wrkflow_defines) return "\n".join(wrkflow_defines)

View File

@ -1,5 +1,3 @@
from lib.ide_service import IDEService from lib.ide_service import IDEService
IDEService().ide_logging( IDEService().ide_logging("debug", "Hello IDE Service!")
"debug", "Hello IDE Service!"
)

View File

@ -7,7 +7,6 @@ import openai
sys.path.append(os.path.dirname(__file__)) sys.path.append(os.path.dirname(__file__))
from cache import LocalCache
from i18n import TUILanguage, get_translation from i18n import TUILanguage, get_translation
from model import ( from model import (
FuncToTest, FuncToTest,
@ -16,6 +15,7 @@ from model import (
) )
from ut_workflow import UnitTestsWorkflow from ut_workflow import UnitTestsWorkflow
from cache import LocalCache
from lib.chatmark import Step from lib.chatmark import Step
from lib.ide_service import IDEService from lib.ide_service import IDEService

View File

@ -1,6 +1,5 @@
from typing import Dict, List, Tuple from typing import Dict, List, Tuple
from cache import LocalCache
from find_context import ( from find_context import (
Context, Context,
Position, Position,
@ -18,6 +17,7 @@ from propose_test import propose_test
from tools.file_util import retrieve_file_content from tools.file_util import retrieve_file_content
from write_tests import write_and_print_tests from write_tests import write_and_print_tests
from cache import LocalCache
from lib.chatmark import Checkbox, Form, Step, TextEditor from lib.chatmark import Checkbox, Form, Step, TextEditor