Compare commits

..

No commits in common. "scripts" and "update_refactor" have entirely different histories.

152 changed files with 917 additions and 1881 deletions

View File

@ -1 +0,0 @@
description: Root of github commands.

View File

@ -1,5 +0,0 @@
description: 'Update issue tasks.'
input: required
help: README.md
steps:
- run: $devchat_python $command_path/command.py "$input"

View File

@ -1 +0,0 @@
description: Root of gitlab commands.

View File

@ -1,5 +0,0 @@
description: 'Update issue tasks.'
input: required
help: README.md
steps:
- run: $devchat_python $command_path/command.py "$input"

View File

@ -1,3 +0,0 @@
# /refactor.api
对选中代码进行API重构

View File

@ -1,132 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sys
from typing import Dict
from devchat.llm import chat_json
from lib.chatmark import Button, Form, TextEditor
from lib.ide_service import IDEService
from lib.workflow import workflow_call
# 步骤3: 使用AI识别API路径和METHOD的提示词
API_ANALYSIS_PROMPT = """
分析以下代码识别其中的API路径和HTTP方法
代码:
```
{code}
```
请提取代码中定义或使用的API路径和HTTP方法(GET, POST, PUT, DELETE等)
如果代码中有多个API请识别最主要的一个
如果无法确定HTTP方法请使用"GET"作为默认值
返回JSON格式如下:
{{
"api_path": "识别到的API路径例如/api/users",
"method": "识别到的HTTP方法例如GET、POST、PUT、DELETE等"
}}
"""
@chat_json(prompt=API_ANALYSIS_PROMPT)
def analyze_api(code: str) -> Dict[str, str]:
"""使用AI分析代码中的API路径和HTTP方法"""
pass
def main() -> None:
"""API重构工作流主函数"""
try:
# 步骤1: 获取用户输入的重构目标
if len(sys.argv) < 2:
print("错误: 请提供重构目标")
sys.exit(1)
refactor_target = sys.argv[1]
# 步骤2: 获取用户选中的代码
selected_code = IDEService().get_selected_range()
if not selected_code or not selected_code.text.strip():
print("错误: 请先选择需要重构的代码")
sys.exit(1)
# 步骤3: 使用AI识别API路径和METHOD
print("正在分析选中代码中的API信息...")
api_info = analyze_api(code=selected_code.text)
if not api_info or "api_path" not in api_info or "method" not in api_info:
print("错误: 无法识别API信息")
sys.exit(1)
api_path = api_info["api_path"]
method = api_info["method"]
# 步骤4: 显示识别结果并让用户确认
print("识别到的API信息:")
print(f"API路径: {api_path}")
print(f"HTTP方法: {method}")
api_path_editor = TextEditor(api_path)
form = Form(
[
"### 请确认API信息",
"API路径:",
api_path_editor,
f"HTTP方法: {method}",
"请确认或修改API路径然后点击下方按钮继续",
]
)
form.render()
# 获取用户确认后的API路径
confirmed_api_path = api_path_editor.new_text
# 步骤5: 调用重构工作流进行代码重构
print(f"正在重构API: {confirmed_api_path}...")
refactor_result = workflow_call(f"/refactor {refactor_target}")
if refactor_result != 0:
print("错误: API重构失败")
sys.exit(1)
print("API重构成功!")
# 步骤6: 显示按钮让用户确认是否继续
continue_button = Button(["提交修改并测试API", "结束重构"])
continue_button.render()
if continue_button.clicked == 1: # 用户选择结束
print("API重构已完成未提交修改")
return
# 步骤7: 调用GitHub提交工作流提交修改
print("正在提交修改...")
commit_result = workflow_call("/github.commit")
if commit_result != 0:
print("警告: 代码提交失败但将继续进行API测试")
else:
print("代码提交成功!")
# 步骤8: 调用API测试工作流对重构API进行测试
print("正在准备API测试...")
test_command = f"/test.api.upload {confirmed_api_path} {method} {refactor_target}"
test_result = workflow_call(test_command)
if test_result != 0:
print("警告: API测试可能未成功完成")
print("API重构工作流执行完毕!")
except Exception as e:
print(f"错误: 执行过程中发生异常: {str(e)}")
sys.exit(1)
if __name__ == "__main__":
main()

View File

@ -1,5 +0,0 @@
description: Refactor for selected api.
input: required
help: README.md
steps:
- run: $devchat_python $command_path/command.py "$input"

View File

@ -1,28 +0,0 @@
### test.api.config
配置API测试工作流所需的全局和仓库相关设置。
#### 用途
- 配置服务器连接信息SERVER_URL, USERNAME, PASSWORD
- 配置项目相关信息PROJECT_ID, OPENAPI_URL, VERSION_URL
#### 使用方法
执行命令: `/test.api.config`
#### 操作流程
1. 输入服务器URL例如: http://kagent.merico.cn:8000
2. 输入用户名
3. 输入密码
4. 输入项目ID例如: 37
5. 输入OpenAPI文档URL例如: http://kagent.merico.cn:8080/openapi.json
6. 输入版本信息URL例如: http://kagent.merico.cn:8080/version
7. 保存配置信息
#### 配置信息存储位置
- 全局配置SERVER_URL, USERNAME, PASSWORD保存在 `~/.chat/.workflow_config.json`
- 仓库配置PROJECT_ID, OPENAPI_URL, VERSION_URL保存在当前仓库的 `.chat/.workflow_config.json`
#### 注意事项
- 密码信息应妥善保管,不要泄露
- 配置完成后其他API测试工作流将自动使用这些配置信息
- 如需修改配置,重新运行此命令即可

View File

@ -1,151 +0,0 @@
import json
import os
import sys
from lib.chatmark import Form, TextEditor # 导入 ChatMark 组件
def read_global_config():
"""读取全局配置信息"""
config_path = os.path.join(os.path.expanduser("~/.chat"), ".workflow_config.json")
config_data = {}
if os.path.exists(config_path):
with open(config_path, "r", encoding="utf-8") as f:
config_data = json.load(f)
server_url = config_data.get("api_testing_server_url", "")
username = config_data.get("api_testing_server_username", "")
password = config_data.get("api_testing_server_password", "")
return server_url, username, password
def save_global_config(server_url, username, password):
"""保存全局配置信息"""
config_path = os.path.join(os.path.expanduser("~/.chat"), ".workflow_config.json")
# 确保目录存在
os.makedirs(os.path.dirname(config_path), exist_ok=True)
config_data = {}
if os.path.exists(config_path):
with open(config_path, "r", encoding="utf-8") as f:
config_data = json.load(f)
config_data["api_testing_server_url"] = server_url
config_data["api_testing_server_username"] = username
config_data["api_testing_server_password"] = password
with open(config_path, "w+", encoding="utf-8") as f:
json.dump(config_data, f, indent=4)
def read_repo_config():
"""读取仓库相关配置信息"""
config_path = os.path.join(os.getcwd(), ".chat", ".workflow_config.json")
config_data = {}
if os.path.exists(config_path):
with open(config_path, "r", encoding="utf-8") as f:
config_data = json.load(f)
project_id = config_data.get("test_api_project_id", "")
openapi_url = config_data.get("test_api_openapi_url", "")
version_url = config_data.get("test_api_version_url", "")
return project_id, openapi_url, version_url
def save_repo_config(project_id, openapi_url, version_url):
"""保存仓库相关配置信息"""
config_path = os.path.join(os.getcwd(), ".chat", ".workflow_config.json")
# 确保目录存在
os.makedirs(os.path.dirname(config_path), exist_ok=True)
config_data = {}
if os.path.exists(config_path):
with open(config_path, "r", encoding="utf-8") as f:
config_data = json.load(f)
config_data["test_api_project_id"] = project_id
config_data["test_api_openapi_url"] = openapi_url
config_data["test_api_version_url"] = version_url
with open(config_path, "w+", encoding="utf-8") as f:
json.dump(config_data, f, indent=4)
def main():
print("开始配置 API 测试所需的设置...", end="\n\n", flush=True)
# 读取全局配置
server_url, username, password = read_global_config()
# 读取仓库配置
project_id, openapi_url, version_url = read_repo_config()
# 创建表单组件
server_url_editor = TextEditor(server_url)
username_editor = TextEditor(username)
password_editor = TextEditor(password)
project_id_editor = TextEditor(project_id)
openapi_url_editor = TextEditor(openapi_url)
version_url_editor = TextEditor(version_url)
# 创建表单
form = Form(
[
"## DevChat API 测试服务器配置",
"请输入服务器 URL (例如: http://kagent.merico.cn:8000):",
server_url_editor,
"请输入用户名:",
username_editor,
"请输入密码:",
password_editor,
"## 仓库配置",
"请输入DevChat API 测试服务器中项目 ID (例如: 37):",
project_id_editor,
"请输入 OpenAPI URL (例如: http://kagent.merico.cn:8080/openapi.json):",
openapi_url_editor,
"请输入版本 URL (例如: http://kagent.merico.cn:8080/version),不输入表示不需要检查服务器测试环境版本:",
version_url_editor,
]
)
# 渲染表单
form.render()
# 获取用户输入
server_url = server_url_editor.new_text.strip()
username = username_editor.new_text.strip()
password = password_editor.new_text.strip()
project_id = project_id_editor.new_text.strip()
openapi_url = openapi_url_editor.new_text.strip()
version_url = version_url_editor.new_text.strip()
# 保存全局配置
if server_url and username and password:
save_global_config(server_url, username, password)
else:
print("请提供完整的全局配置信息 (SERVER_URL, USERNAME, PASSWORD)。")
sys.exit(1)
# 保存仓库配置
if project_id and openapi_url and version_url:
save_repo_config(project_id, openapi_url, version_url)
else:
print("请提供完整的仓库配置信息 (PROJECT_ID, OPENAPI_URL, VERSION_URL)。")
sys.exit(1)
print("\n配置信息已成功保存!")
print(f"全局配置: SERVER_URL={server_url}, USERNAME={username}, PASSWORD={'*' * len(password)}")
print(
f"仓库配置: PROJECT_ID={project_id}, OPENAPI_URL={openapi_url}, VERSION_URL={version_url}"
)
print("\n您现在可以使用其他 API 测试工作流了。")
sys.exit(0)
if __name__ == "__main__":
main()

View File

@ -1,4 +0,0 @@
description: 'Configure global and repository-specific settings for API testing.'
help: README.md
steps:
- run: $devchat_python $command_path/command.py

View File

@ -1,244 +0,0 @@
import os
import subprocess
import sys
import time
import requests
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.append(ROOT_DIR)
# noqa: I001
from api.utils import ( # noqa: E402
OPENAPI_URL,
PROJECT_ID,
SERVER_URL,
VERSION_URL,
get_path_op_id,
session,
)
# noqa: E402
from lib.chatmark.step import Step # noqa: E402
def get_apidocs():
res = session.get(
f"{SERVER_URL}/autotest/projects/{PROJECT_ID}/apidocs",
params={"page": 1, "size": 100},
)
return res.json()["docs"]
def delete_old_apidocs(apidocs):
for apidoc in apidocs:
session.delete(f"{SERVER_URL}/autotest/projects/{PROJECT_ID}/apidocs/{apidoc['id']}")
def get_local_version():
cmd = "git rev-parse HEAD"
res = subprocess.run(cmd, shell=True, check=True, stdout=subprocess.PIPE)
return res.stdout.decode("utf-8").strip()
def check_api_version():
# 如果没有配置VERSION_URL则跳过版本检查
if not VERSION_URL:
print("未配置VERSION_URL跳过API版本检查...")
return
local_version = get_local_version()
print("检查被测服务器文档是否已经更新到最新版本...", flush=True)
while True:
try:
res = session.get(VERSION_URL)
version = res.json()["version"]
if version == local_version:
print(f"API 文档已更新,当前版本为 {version},开始上传 OpenAPI 文档...", flush=True)
break
else:
print(
".",
end="",
flush=True,
)
time.sleep(5)
except Exception as e:
print(f"检查 API 版本失败!{e}", flush=True)
time.sleep(5)
def wait_for_testcase_done(testcase_id):
while True:
try:
res = session.get(
f"{SERVER_URL}/autotest/projects/{PROJECT_ID}/testcases/{testcase_id}"
)
data = res.json()
status = data["status"]
if status == "content_ready":
print("文本用例生成完成!", flush=True)
break
else:
print(
".",
end="",
flush=True,
)
time.sleep(5)
except Exception as e:
print(f"检查文本用例状态失败!{e}", flush=True)
time.sleep(5)
def wait_for_testcode_done(task_id):
while True:
try:
res = session.get(f"{SERVER_URL}/tasks/{task_id}")
data = res.json()
status = data["status"]
if status == "succeeded":
print("自动测试脚本生成完成!", flush=True)
break
else:
print(
".",
end="",
flush=True,
)
time.sleep(5)
except Exception as e:
print(f"检查自动测试脚本生成失败!{e}", flush=True)
time.sleep(5)
def get_testcode(testcase_id):
res = session.get(
f"{SERVER_URL}/autotest/projects/{PROJECT_ID}/testcodes",
params={"testcase_id": testcase_id},
)
return res.json()["testcodes"][0]
def wait_for_task_done(task_id):
while True:
try:
res = session.get(f"{SERVER_URL}/tasks/{task_id}")
data = res.json()
status = data["status"]
CREATED = "created"
RUNNING = "running"
WAITING = "waiting"
if status not in [CREATED, RUNNING, WAITING]:
print("自动测试脚本执行完成!", flush=True)
break
else:
print(
".",
end="",
flush=True,
)
time.sleep(5)
except Exception as e:
print(f"检查自动测试脚本状态失败!{e}", flush=True)
time.sleep(5)
def get_testcase(api_path_id):
res = session.get(
f"{SERVER_URL}/autotest/projects/{PROJECT_ID}/testcases",
params={"page": 1, "size": 100, "pathop_id": api_path_id},
)
return res.json()["testcases"][0]
def main():
error_msg = "请输入要测试的API名称和测试目标/test.api.upload api_path method test_target"
if len(sys.argv) < 2:
print(error_msg)
return
args = sys.argv[1].strip().split(" ")
if len(args) < 3:
print(error_msg)
return
api_path = args[0]
method = args[1]
test_target = " ".join(args[2:])
docs = get_apidocs()
with Step("检查 API 版本是否更新..."):
check_api_version()
delete_old_apidocs(docs)
with Step(f"上传 OpenAPI 文档,并且触发 API {api_path} 的测试用例和自动测试脚本生成任务..."):
# 使用配置的OPENAPI_URL
if not OPENAPI_URL:
print("错误未配置OPENAPI_URL无法获取OpenAPI文档")
return
res = requests.get(
OPENAPI_URL,
)
res = session.post(
f"{SERVER_URL}/autotest/projects/{PROJECT_ID}/apidocs",
files={"file": ("openapi.json", res.content, "application/json")},
data={"apiauth_id": docs[0]["apiauth_id"]},
)
if res.status_code == 200:
print("上传 OpenAPI 文档成功!\n")
else:
print(f"上传 OpenAPI 文档失败!{res.text}", flush=True)
return
apipathop_id = get_path_op_id(api_path, method)
with Step("开始生成文本用例..."):
res = session.post(
f"{SERVER_URL}/autotest/projects/{PROJECT_ID}/testcases",
params={"generate_content": True},
json={"apipathop_id": apipathop_id, "title": test_target},
)
if res.status_code == 200:
print("提交生成文本用例成功!等待生成完成...", flush=True)
testcase_id = res.json()["id"]
wait_for_testcase_done(testcase_id)
else:
print(f"提交生成文本用例失败!{res.text}", flush=True)
return
with Step("开始生成自动测试脚本..."):
res = session.post(
f"{SERVER_URL}/autotest/projects/{PROJECT_ID}/tasks/testcode",
params={"testcase_id": testcase_id},
)
if res.status_code == 200:
print("提交生成自动测试脚本成功!等待生成完成...", flush=True)
task_id = res.json()["id"]
wait_for_testcode_done(task_id)
else:
print(f"提交生成自动测试脚本失败!{res.text}", flush=True)
return
with Step("开始执行自动测试脚本..."):
testcode = get_testcode(testcase_id)
testcode_id = testcode["id"]
res = session.post(
f"{SERVER_URL}/autotest/projects/{PROJECT_ID}/testcodes/{testcode_id}/exec",
)
if res.status_code == 200:
print("提交执行自动测试脚本成功!", flush=True)
else:
print(f"提交执行自动测试脚本失败!{res.text}")
return
api_path_id = get_path_op_id(api_path, method)
with Step("开始查询测试脚本执行结果..."):
while True:
testcase = get_testcase(api_path_id)
last_testcode_passed = testcase["last_testcode_passed"]
if last_testcode_passed:
print("测试脚本执行成功!", flush=True)
break
else:
print("测试脚本执行失败!", flush=True)
break
if __name__ == "__main__":
main()

View File

@ -1,4 +0,0 @@
description: 'Upload API documentation, generate test cases and test scripts for the target API, and execute the test code. Input format: APIPATH METHOD API_REFACTOR_DESCRIPTION'
input: required
steps:
- run: $devchat_python $command_path/command.py "$input"

View File

@ -1,141 +0,0 @@
import json
import os
import requests
from lib.workflow.call import workflow_call
# 默认配置,仅在无法读取配置文件时使用
session = requests.Session()
_is_login = False
def read_config():
"""读取配置文件中的设置"""
# 读取全局配置
global_config_path = os.path.join(os.path.expanduser("~/.chat"), ".workflow_config.json")
global_config = {}
if os.path.exists(global_config_path):
try:
with open(global_config_path, "r", encoding="utf-8") as f:
global_config = json.load(f)
except Exception:
pass
# 读取仓库配置
repo_config_path = os.path.join(os.getcwd(), ".chat", ".workflow_config.json")
repo_config = {}
if os.path.exists(repo_config_path):
try:
with open(repo_config_path, "r", encoding="utf-8") as f:
repo_config = json.load(f)
except Exception:
pass
# 获取配置值
server_url = global_config.get("api_testing_server_url", "")
username = global_config.get("api_testing_server_username", "")
password = global_config.get("api_testing_server_password", "")
project_id = repo_config.get("test_api_project_id", "")
openapi_url = repo_config.get("test_api_openapi_url", "")
version_url = repo_config.get("test_api_version_url", "")
return server_url, username, password, project_id, openapi_url, version_url
def ensure_config():
"""确保配置存在,如果不存在则调用配置工作流"""
global_config_path = os.path.join(os.path.expanduser("~/.chat"), ".workflow_config.json")
repo_config_path = os.path.join(os.getcwd(), ".chat", ".workflow_config.json")
# 检查全局配置和仓库配置是否存在
global_config_exists = os.path.exists(global_config_path)
repo_config_exists = os.path.exists(repo_config_path)
# 检查必填配置项是否存在
config_valid = True
if global_config_exists and repo_config_exists:
# 读取配置
global_config = {}
repo_config = {}
try:
with open(global_config_path, "r", encoding="utf-8") as f:
global_config = json.load(f)
with open(repo_config_path, "r", encoding="utf-8") as f:
repo_config = json.load(f)
except Exception:
config_valid = False
# 检查必填项
if (
not global_config.get("api_testing_server_url")
or not global_config.get("api_testing_server_username")
or not global_config.get("api_testing_server_password")
or not repo_config.get("test_api_project_id")
or not repo_config.get("test_api_openapi_url")
):
config_valid = False
else:
config_valid = False
if not config_valid:
print("缺少API测试所需的配置将启动配置向导...")
workflow_call("/test.api.config")
# 重新检查配置是否已创建并包含必要项
try:
if os.path.exists(global_config_path) and os.path.exists(repo_config_path):
with open(global_config_path, "r", encoding="utf-8") as f:
global_config = json.load(f)
with open(repo_config_path, "r", encoding="utf-8") as f:
repo_config = json.load(f)
if (
global_config.get("api_testing_server_url")
and global_config.get("api_testing_server_username")
and global_config.get("api_testing_server_password")
and repo_config.get("test_api_project_id")
and repo_config.get("test_api_openapi_url")
):
return True
print("配置失败")
return False
except Exception:
print("配置失败")
return False
return True
# 读取配置
result = ensure_config()
if not result:
print("配置失败,工作流不能继续执行")
exit(0)
SERVER_URL, USERNAME, PASSWORD, PROJECT_ID, OPENAPI_URL, VERSION_URL = read_config()
def login():
global _is_login
if _is_login:
return
session.post(
f"{SERVER_URL}/user/auth/login",
data={"username": USERNAME, "password": PASSWORD, "grant_type": "password"},
)
_is_login = True
def get_path_op_id(keyword: str, method: str):
res = session.get(
f"{SERVER_URL}/autotest/projects/{PROJECT_ID}/apipathops",
params={"keyword": keyword, "page": 1, "size": 20},
)
for pathop in res.json()["pathops"]:
if pathop["method"].lower().strip() == method.lower().strip():
return pathop["id"]
login()

View File

@ -26,7 +26,6 @@ class Step(AbstractContextManager):
def __enter__(self):
print(f"\n```Step\n# {self.title}", flush=True)
print("\n```", flush=True)
def __exit__(self, exc_type, exc_val, exc_tb):
# close the step
@ -34,3 +33,4 @@ class Step(AbstractContextManager):
IDEService().ide_logging(
"debug", f"Step {self.title} took {end_time - self.enter_time:.2f} seconds"
)
print("\n```", flush=True)

View File

@ -1,3 +0,0 @@
from .call import workflow_call
__all__ = ["workflow_call"]

View File

@ -1,149 +0,0 @@
import os
import re
import subprocess
import sys
import yaml
def find_workflow_script(command_name: str) -> str:
"""
根据命令名称查找对应的工作流脚本路径
Args:
command_name: 工作流命令名称 "github.commit"
Returns:
找到的脚本路径如果未找到则返回空字符串
"""
# 工作流目录优先级: custom > community > merico
workflow_base_dirs = [
os.path.expanduser("~/.chat/scripts/custom"),
os.path.expanduser("~/.chat/scripts/community"),
os.path.expanduser("~/.chat/scripts/merico"),
]
# 解析命令名称,处理子命令
parts = command_name.split(".")
for base_dir in workflow_base_dirs:
# 检查custom目录下是否有config.yml定义命名空间
if base_dir.endswith("/custom"):
config_path = os.path.join(base_dir, "config.yml")
namespaces = []
if os.path.exists(config_path):
try:
with open(config_path, "r", encoding="utf-8") as f:
config = yaml.safe_load(f)
namespaces = config.get("namespaces", [])
except Exception:
pass
# 在每个命名空间下查找
for namespace in namespaces:
namespace_dir = os.path.join(base_dir, namespace)
script_path = find_script_in_path(namespace_dir, parts)
if script_path:
return script_path
else:
# 直接在base_dir下查找
script_path = find_script_in_path(base_dir, parts)
if script_path:
return script_path
return ""
def find_script_in_path(base_dir: str, command_parts: list) -> str:
"""
在指定路径下查找工作流脚本
Args:
base_dir: 基础目录
command_parts: 命令名称拆分的部分
Returns:
找到的脚本路径如果未找到则返回空字符串
"""
# 构建工作流目录路径
workflow_dir = os.path.join(base_dir, *command_parts)
# 检查目录是否存在
if not os.path.isdir(workflow_dir):
return ""
# 查找目录下的Python脚本
py_files = [f for f in os.listdir(workflow_dir) if f.endswith(".py")]
# 如果只有一个Python脚本直接返回
if len(py_files) == 1:
return os.path.join(workflow_dir, py_files[0])
# 如果有多个Python脚本查找command.yml
yml_path = os.path.join(workflow_dir, "command.yml")
if os.path.exists(yml_path):
try:
with open(yml_path, "r", encoding="utf-8") as f:
yml_content = f.read()
# 查找steps部分中的脚本名称
for py_file in py_files:
py_file_name = os.path.splitext(py_file)[0]
# 查找类似 $command_path/script_name.py 的模式
if re.search(rf"\$command_path/{py_file_name}\.py", yml_content):
return os.path.join(workflow_dir, py_file)
except Exception:
pass
# 尝试查找与最后一个命令部分同名的脚本
last_part_script = f"{command_parts[-1]}.py"
if last_part_script in py_files:
return os.path.join(workflow_dir, last_part_script)
# 尝试查找名为command.py的脚本
if "command.py" in py_files:
return os.path.join(workflow_dir, "command.py")
# 没有找到合适的脚本
return ""
def workflow_call(command: str) -> int:
"""
调用工作流命令
Args:
command: 完整的工作流命令 "/github.commit message"
Returns:
命令执行的返回码
"""
# 解析命令和参数
parts = command.strip().split(maxsplit=1)
cmd_name = parts[0].lstrip("/")
cmd_args = parts[1] if len(parts) > 1 else ""
# 查找对应的工作流脚本
script_path = find_workflow_script(cmd_name)
if not script_path:
print(f"找不到工作流命令: {cmd_name}")
return 1
# 使用Popen并将标准输入输出错误流连接到父进程
process = subprocess.Popen(
[sys.executable, script_path, cmd_args],
stdin=sys.stdin, # 父进程的标准输入传递给子进程
stdout=sys.stdout, # 子进程的标准输出传递给父进程
stderr=sys.stderr, # 子进程的标准错误传递给父进程
text=True, # 使用文本模式
bufsize=1, # 行缓冲,确保输出及时显示
)
# 等待子进程完成并获取返回码
return_code = process.wait()
if return_code != 0:
print(f"命令执行失败,返回码: {return_code}")
return return_code

View File

@ -1,47 +0,0 @@
# 在 ask/command.py 中
import os
import sys
from devchat.llm import chat
from lib.ide_service import IDEService
ROOT_WORKFLOW_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.append(ROOT_WORKFLOW_DIR)
from chatflow.util.contexts import CONTEXTS # noqa: E402
PROMPT = (
f"""
{CONTEXTS.replace("{", "{{").replace("}", "}}")}
"""
+ """
当前选中代码{selected_code}
当前打开文件路径{file_path}
用户要求或问题{question}
"""
)
@chat(prompt=PROMPT, stream_out=True)
def ask(question, selected_code, file_path):
pass
def get_selected_code():
"""Retrieves the selected lines of code from the user's selection."""
selected_data = IDEService().get_selected_range().dict()
return selected_data
def main(question):
selected_text = get_selected_code()
file_path = selected_text.get("abspath", "")
code_text = selected_text.get("text", "")
ask(question=question, selected_code=code_text, file_path=file_path)
sys.exit(0)
if __name__ == "__main__":
main(sys.argv[1])

View File

@ -1,4 +0,0 @@
description: 'Generate workflow command, input command information.'
input: required
steps:
- run: $devchat_python $command_path/command.py "$input"

View File

@ -1,424 +0,0 @@
"""
生成工作流命令实现
步骤1: 根据用户输入的工作流命令定义信息生成相关工作流实现步骤描述展示相关信息等待用户确认
步骤2: 根据用户确认的工作流实现步骤描述生成工作流命令实现代码并保存到指定文件中
"""
#!/usr/bin/env python3
import os
import subprocess
import sys
import yaml
from devchat.llm import chat, chat_json
from lib.chatmark import Form, Radio, Step, TextEditor
from lib.ide_service import IDEService
ROOT_WORKFLOW_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.append(ROOT_WORKFLOW_DIR)
from chatflow.util.contexts import CONTEXTS # noqa: E402
# 工作流命令定义模板
COMMAND_YML_TEMPLATE = """description: {description}
{input_section}
{help_section}{workflow_python_section}
steps:
- run: ${python_cmd} $command_path/command.py{input_param}
"""
# 工作流命令实现模板
COMMAND_PY_TEMPLATE = """#!/usr/bin/env python3
import os
import sys
from pathlib import Path
{imports}
def main():
{main_code}
if __name__ == "__main__":
main()
"""
# 从用户输入提取工作流信息的提示
EXTRACT_INFO_PROMPT = """
请从以下用户输入中提取创建工作流命令所需的关键信息
用户输入:
{user_input}
工作流上下文信息:
{contexts}
请提取以下信息并以JSON格式返回
1. command_name: 工作流命令名称应以/开头"/example""/category.command"
2. description: 工作流命令的简短描述
3. input_required: 工作流是否需要输入参数(true/false)
4. custom_env: 是否需要自定义Python环境(true/false)
5. env_name: 如果需要自定义环境环境名称是什么
6. dependencies: 如果需要自定义环境依赖文件名是什么(如requirements.txt)
7. purpose: 工作流的主要目的和功能
8. implementation_ideas: 实现思路的简要描述
如果某项信息在用户输入中未明确指定请根据上下文合理推断
返回格式示例:
{{
"command_name": "/example.command",
"description": "这是一个示例命令",
"input_required": true,
"custom_env": false,
"env_name": "",
"dependencies": "",
"purpose": "这个命令的主要目的是...",
"implementation_ideas": "可以通过以下步骤实现..."
}}
"""
# 工作流步骤生成提示
WORKFLOW_STEPS_PROMPT = """
请为以下工作流命令对应脚本文件生成详细的实现步骤描述
工作流命令名称: {command_name}
工作流命令描述: {description}
输入要求: {input_requirement}
工作流目的: {purpose}
实现思路: {implementation_ideas}
工作流上下文信息:
{contexts}
请提供清晰的步骤描述包括
1. 每个步骤需要完成的具体任务
2. 每个步骤可能需要的输入和输出
3. 每个步骤可能需要使用的IDE Service或ChatMark组件
4. 任何其他实现细节
返回格式应为markdown块包裹的步骤列表每个步骤都有详细描述输出示例如下
```steps
步骤1: 获取用户输入的重构任务要求
步骤2: 调用IDE Service获取选中代码
步骤3: 根据用户重构任务要求调用大模型生成选中代码的重构代码
步骤4: 调用IDE Service将生成的重构代码通过DIFF VIEW方式展示给用户
```
不要输出工作流命令的其他构建步骤只需要清洗描述工作流命令对应command.py中对应的步骤实现即可
"""
# 代码实现生成提示
CODE_IMPLEMENTATION_PROMPT = """
请为以下工作流命令生成Python实现代码
工作流命令名称: {command_name}
工作流命令描述: {description}
输入要求: {input_requirement}
自定义环境: {custom_env}
工作流实现步骤:
{workflow_steps}
工作流上下文信息:
{contexts}
请生成完整的Python代码实现包括
1. 必要的导入语句
2. 主函数实现
3. 按照工作流步骤实现具体功能
4. 适当的错误处理
5. 必要的注释说明
6. 所有markdown代码块都要有明确的语言标识如pythonjsonyamlcode等
代码应该使用IDE Service接口与IDE交互使用ChatMark组件与用户交互
输出格式应为markdown代码块语言标识为python仅输出工作流实现对应的脚本代码块不需要输出其他逻辑信息
只需要输出最终PYTHON代码块不需要其他信息例如
```python
....
```
"""
@chat_json(prompt=EXTRACT_INFO_PROMPT)
def extract_workflow_info(user_input, contexts):
"""从用户输入中提取工作流信息"""
pass
@chat(prompt=WORKFLOW_STEPS_PROMPT, stream_out=False)
def generate_workflow_steps(
command_name, description, input_requirement, purpose, implementation_ideas, contexts
):
"""生成工作流实现步骤描述"""
pass
@chat(prompt=CODE_IMPLEMENTATION_PROMPT, stream_out=False)
def generate_workflow_code(
command_name, description, input_requirement, custom_env, workflow_steps, contexts
):
"""生成工作流实现代码"""
pass
def parse_command_path(command_name):
"""
解析命令路径返回目录结构和最终命令名
确保工作流创建在custom目录下的有效namespace中
"""
parts = command_name.strip("/").split(".")
# 获取custom目录路径
custom_dir = os.path.join(os.path.expanduser("~"), ".chat", "scripts", "custom")
# 获取custom目录下的有效namespace
valid_namespaces = []
config_path = os.path.join(custom_dir, "config.yml")
if os.path.exists(config_path):
try:
with open(config_path, "r") as f:
config = yaml.safe_load(f)
if config and "namespaces" in config:
valid_namespaces = config["namespaces"]
except Exception as e:
print(f"读取custom配置文件失败: {str(e)}")
# 如果没有找到有效namespace使用默认namespace
if not valid_namespaces:
print("警告: 未找到有效的custom namespace将使用默认namespace 'default'")
valid_namespaces = ["default"]
# 确保default namespace存在于config.yml中
try:
os.makedirs(os.path.dirname(config_path), exist_ok=True)
with open(config_path, "w") as f:
yaml.dump({"namespaces": ["default"]}, f)
except Exception as e:
print(f"创建默认namespace配置失败: {str(e)}")
# 使用第一个有效namespace
namespace = valid_namespaces[0]
# 创建最终命令目录路径
command_dir = os.path.join(custom_dir, namespace)
# 创建目录结构
for part in parts[:-1]:
command_dir = os.path.join(command_dir, part)
return command_dir, parts[-1]
def parse_markdown_block(response, block_type="steps"):
"""
从AI响应中解析指定类型的Markdown代码块内容支持处理嵌套的代码块
Args:
response (str): AI生成的响应文本
block_type (str): 要解析的代码块类型默认为"steps"
Returns:
str: 解析出的代码块内容
Raises:
Exception: 解析失败时抛出异常
"""
try:
# 处理可能存在的思考过程
if response.find("</think>") != -1:
response = response.split("</think>")[-1]
# 构建起始标记
start_marker = f"```{block_type}"
end_marker = "```"
# 查找起始位置
start_pos = response.find(start_marker)
if start_pos == -1:
# 如果没有找到指定类型的标记,直接返回原文本
return response.strip()
# 从标记后开始的位置
content_start = start_pos + len(start_marker)
# 从content_start开始找到第一个未配对的```
pos = content_start
open_blocks = 1 # 已经有一个开放的块
while True:
# 找到下一个```
next_marker = response.find(end_marker, pos)
if next_marker == -1:
# 如果没有找到结束标记,返回剩余所有内容
break
# 检查这是开始还是结束标记
# 向后看是否跟着语言标识符
after_marker = response[next_marker + 3 :]
# 检查是否是新的代码块开始 - 只要```后面跟着非空白字符,就认为是新代码块开始
if after_marker.strip() and not after_marker.startswith("\n"):
first_word = after_marker.split()[0]
if not any(c in first_word for c in ",.;:!?()[]{}"):
open_blocks += 1
else:
open_blocks -= 1
else:
open_blocks -= 1
if open_blocks == 0:
# 找到匹配的结束标记
return response[content_start:next_marker].strip()
pos = next_marker + 3
# 如果没有找到匹配的结束标记返回从content_start到末尾的内容
return response[content_start:].strip()
except Exception as e:
import logging
logging.info(f"Response: {response}")
logging.error(f"Exception in parse_markdown_block: {str(e)}")
raise Exception(f"解析{block_type}内容失败: {str(e)}") from e
def create_workflow_files(command_dir, command_name, description, input_required, code):
"""创建工作流命令文件"""
# 创建命令目录
os.makedirs(command_dir, exist_ok=True)
# 创建command.yml
input_section = f"input: {'required' if input_required else 'optional'}"
help_section = "help: README.md"
input_param = ' "$input"' if input_required else ""
# 添加自定义环境配置
workflow_python_section = ""
python_cmd = "devchat_python"
yml_content = COMMAND_YML_TEMPLATE.format(
description=description,
input_section=input_section,
help_section=help_section,
workflow_python_section=workflow_python_section,
python_cmd=python_cmd,
input_param=input_param,
)
with open(os.path.join(command_dir, "command.yml"), "w") as f:
f.write(yml_content)
# 创建command.py
with open(os.path.join(command_dir, "command.py"), "w") as f:
f.write(code)
# 设置执行权限
os.chmod(os.path.join(command_dir, "command.py"), 0o755)
# 创建README.md
readme_content = f"# {command_name}\n\n{description}\n"
with open(os.path.join(command_dir, "README.md"), "w") as f:
f.write(readme_content)
def main():
# 获取用户输入
user_input = sys.argv[1] if len(sys.argv) > 1 else ""
# 步骤1: 通过AI分析用户输入提取必要信息
with Step("分析用户输入,提取工作流信息..."):
workflow_info = extract_workflow_info(user_input=user_input, contexts=CONTEXTS)
# 步骤3: 生成工作流实现步骤描述
with Step("生成工作流实现步骤描述..."):
workflow_steps = generate_workflow_steps(
command_name=workflow_info.get("command_name", ""),
description=workflow_info.get("description", ""),
input_requirement="可选",
purpose=workflow_info.get("purpose", ""),
implementation_ideas=workflow_info.get("implementation_ideas", ""),
contexts=CONTEXTS,
)
workflow_steps = parse_markdown_block(workflow_steps, block_type="steps")
# 步骤2: 使用Form组件一次性展示所有信息让用户编辑
print("\n## 工作流信息\n")
# 创建所有编辑组件
command_name_editor = TextEditor(workflow_info.get("command_name", "/example.command"))
description_editor = TextEditor(workflow_info.get("description", "工作流命令描述"))
input_radio = Radio(["必选 (required)", "可选 (optional)"])
purpose_editor = TextEditor(workflow_info.get("purpose", "请描述工作流的主要目的和功能"))
# ideas_editor = TextEditor(workflow_info.get("implementation_ideas", "请描述实现思路"))
steps_editor = TextEditor(workflow_steps)
# 使用Form组件一次性展示所有编辑组件
form = Form(
[
"命令名称:",
command_name_editor,
"输入要求:",
input_radio,
"描述:",
description_editor,
"工作流目的:",
purpose_editor,
"实现步骤:",
steps_editor,
]
)
form.render()
# 获取用户编辑后的值
command_name = command_name_editor.new_text
description = description_editor.new_text
input_required = input_radio.selection == 0
# implementation_ideas = ideas_editor.new_text
workflow_steps = steps_editor.new_text
# 步骤4: 生成工作流实现代码
with Step("生成工作流实现代码..."):
code = generate_workflow_code(
command_name=command_name,
description=description,
input_requirement="必选" if input_required else "可选",
custom_env=False,
workflow_steps=workflow_steps,
contexts=CONTEXTS,
)
code = code.strip()
start_index = code.find("```python")
end_index = code.rfind("```")
code = code[start_index + len("```python") : end_index].strip()
# code = parse_markdown_block(code, block_type="python")
# 解析命令路径
command_dir, final_command = parse_command_path(command_name)
full_command_dir = os.path.join(command_dir, final_command)
# 步骤5: 创建工作流文件
with Step(f"创建工作流文件到 {full_command_dir}"):
create_workflow_files(full_command_dir, command_name, description, input_required, code)
# 更新斜杠命令
with Step("更新斜杠命令..."):
IDEService().update_slash_commands()
# 在新窗口中打开工作流目录
try:
subprocess.run(["code", full_command_dir], check=True)
except subprocess.SubprocessError:
print(f"无法自动打开编辑器,请手动打开工作流目录: {full_command_dir}")
print(f"\n✅ 工作流命令 {command_name} 已成功创建!")
print(f"命令文件位置: {full_command_dir}")
print("你现在可以在DevChat中使用这个命令了。")
if __name__ == "__main__":
main()

View File

@ -1,4 +0,0 @@
description: 'Generate workflow command, input command information.'
input: required
steps:
- run: $devchat_python $command_path/command.py "$input"

View File

@ -1,64 +0,0 @@
工作流开发中封装了部分基础函数,方便开发者在工作流中使用。
**大模型调用**
针对大模型调用封装了几个装饰器函数分别用于调用大模型生成文本和生成json格式的文本。
- chat装饰器
用于调用大模型生成文本,并将生成的文本返回给调用者。具体使用示例如下:
```python
from devchat.llm import chat
PROMPT = """
对以下代码段进行解释:
{code}
"""
@chat(prompt=PROMPT, stream_out=True)
# pylint: disable=unused-argument
def explain(code):
"""
call ai to explain selected code
"""
pass
ai_explanation = explain(code="def foo(): pass")
```
调用explain函数时需要使用参数名称=参数值的方式传递参数参数名称必须与PROMPT中使用的参数名称一致。
- chat_json装饰器
用于调用大模型生成json对象。具体使用示例如下
```python
from devchat.llm import chat_json
PROMPT = (
"Give me 5 different git branch names, "
"mainly hoping to express: {task}, "
"Good branch name should looks like: <type>/<main content>,"
"the final result is output in JSON format, "
'as follows: {{"names":["name1", "name2", .. "name5"]}}\n'
)
@chat_json(prompt=PROMPT)
def generate_branch_name(task):
pass
task = "fix bug"
branch_names = generate_branch_name(task=task)
print(branch_names["names"])
```
调用generate_branch_name函数时需要使用参数名称=参数值的方式传递参数参数名称必须与PROMPT中使用的参数名称一致。
使用chat, chat_json的注意
1. 使用chat_json装饰器时返回的结果是一个字典对象在PROMPT中描述这个字典对象的结构时需要使用{{}}来表示{}。因为后续操作中会使用类似f-string的方式替代PROMPT中的参数所以在PROMPT中使用{}时需要使用{{}}来表示{}。
2. 使用这两个装饰器时PROMPT中不要使用markdown的代码块语法。
**工作流调用**
目的是对已有工作流进行复用在A工作流中调用B工作流。
- workflow_call函数
调用指定的工作流,并将指定的参数传递给被调用的工作流。具体使用示例如下:
```python
from lib.workflow import workflow_call
ret_code = workflow_call("/helloworld some name")
if ret_code == 0:
print("workflow call success")
else:
print("workflow call failed")
```

View File

@ -1,100 +0,0 @@
"""
为workflow命令提供上下文信息
编写工作流实现代码需要的上下文
1. IDE Service接口定义
2. ChatMark接口定义
3. 工作流命令列表
4. 工作流命令组织实现规范
5. 基础可用的函数信息
"""
import os
def load_file_in_user_scripts(filename: str) -> str:
"""
从用户脚本目录中加载文件内容
"""
user_path = os.path.expanduser("~/.chat/scripts")
file_path = os.path.join(user_path, filename)
with open(file_path, "r") as f:
return f.read()
def load_local_file(filename: str) -> str:
"""
从当前脚本所在目录的相对目录加载文件内容
"""
script_dir = os.path.dirname(os.path.abspath(__file__))
file_path = os.path.join(script_dir, filename)
with open(file_path, "r") as f:
return f.read()
def load_existing_workflow_defines() -> str:
"""从user scripts目录遍历找到所有command.yml文件并加载其内容"""
merico_path = os.path.expanduser("~/.chat/scripts/merico")
community_path = os.path.expanduser("~/.chat/scripts/community")
custom_path = os.path.expanduser("~/.chat/scripts/custom")
root_paths = [merico_path]
# 遍历community_path、custom_path下直接子目录将其添加到root_paths作为下一步的根目录
for path in [community_path, custom_path]:
for root, dirs, files in os.walk(path):
if root == path:
root_paths.extend([os.path.join(root, d) for d in dirs])
break
wrkflow_defines = []
# 遍历所有根目录对每个根目录进行递归遍历找到所有command.yml文件并加载其内容
# 将目录名称与command.yml内容拼接成一个字符串添加到wrkflow_defines列表中
# 例如:~/.chat/scripts/merico/github/commit/command.yml被找到那么拼接的字符串为
# 工作流命令/github.commit的定义\n<command.yml内容>\n\n
for root_path in root_paths:
for root, dirs, files in os.walk(root_path):
if "command.yml" in files:
with open(os.path.join(root, "command.yml"), "r") as f:
wrkflow_defines.append(
(
f"工作流命令/{root[len(root_path) + 1 :].replace(os.sep, '.')}的定义:"
f"\n{f.read()}\n\n"
)
)
return "\n".join(wrkflow_defines)
CONTEXTS = f"""
工作流开发需要的上下文信息
# IDE Service接口定义及使用示例
IDE Service用于在工作流命令中访问与IDE相关的数据以及调用IDE提供的功能
## IDEService接口定义
接口定义
{load_file_in_user_scripts("lib/ide_service/service.py")}
涉及类型定义
{load_file_in_user_scripts("lib/ide_service/types.py")}
## IDEService接口示例
{load_local_file("ide_service_demo.py")}
# ChatMark接口使用示例
ChatMark用于在工作流命令中与用户交互展示信息获取用户输入
{load_file_in_user_scripts("lib/chatmark/chatmark_example/main.py")}
ChatMark Form组件类似于HTML表单用于组合多个组件获取相关设置结果
# 工作流命令规范
{load_local_file("workflow_guide.md")}
# 已有工作流命令定义
{load_existing_workflow_defines()}
# 工作流内部函数定义
{load_local_file("base_functions_guide.md")}
"""

View File

@ -1,3 +0,0 @@
from lib.ide_service import IDEService
IDEService().ide_logging("debug", "Hello IDE Service!")

View File

@ -1,52 +0,0 @@
一个工作流对应一个目录目录可以进行嵌套每个工作流在UI视图中对应了一个斜杠命令例如/github.commit。
github工作流目录结构简化后示意如下
github
-- commit
|-- command.yml
|-- command.py
|-- README.md
-- new_pr
|-- command.yml
|-- command.py
......
"command.yml"文件定义了工作流命令的元信息,例如命令的名称、描述、参数等。
"command.py"文件定义了工作流命令的实现逻辑。
拿一个hello world工作流命令为例目录结构如下
helloworld
-- command.yml
-- command.py
-- README.md
"command.yml"文件内容如下:
```yaml
description: Hello Workflow, use as /helloworld <name>
input: required
steps:
- run: $devchat_python $command_path/command.py "$input"
````
"command.py"文件内容如下:
```python
import sys
print(sys.argv[1])
```
使用一个工作流时在DevChat AI智能问答视图中输入斜杠命令例如/helloworld <name>,即可执行该工作流命令。/helloworld表示对应系统可用的工作流。<name>表示工作流的输入是可选的如果工作流定义中input为required则必须输入否则可以不输入。
工作流命令有重载优先级merico目录下工作流 < community目录下工作流 < custom目录下工作流
例如如果merico与community目录下都有github工作流那么在DevChat AI智能问答视图中输入/github命令时会优先执行community目录下的github工作流命令。
在custom目录下通过config.yml文件定义custom目录下的工作流目录。例如
namespaces:
- bobo
那么custom/bobo就是一个工作流存储目录可以在该目录下定义工作流命令。
例如:
custom/bobo
| ---- hello
|------ command.yml
|------ command.py
那么custom/bobo/hello对应的工作流命令就是/hello.

30
merico/commit/README.md Normal file
View File

@ -0,0 +1,30 @@
# Commit 命令
该命令用于为选中的代码更改编写格式良好的提交信息,并通过Git提交这些更改。
## 用法
/commit [to close #issue_number]
## 说明
- 该命令会引导你完成以下步骤:
1. 选择要包含在本次提交中的修改文件
2. 查看并编辑AI生成的提交信息
3. 确认并执行Git提交
- 你可以选择性地在命令中包含要关闭的issue编号,例如:
/commit to close #12
- 如果不指定issue编号,命令仍会正常执行,只是不会在提交信息中包含关闭issue的引用。
- 该命令会调用Git来执行实际的提交操作,因此请确保你的系统中已正确安装并配置了Git。
## 提示
- 在执行该命令之前,请确保你已经对要提交的文件进行了必要的修改。
- 仔细检查AI生成的提交信息,并根据需要进行编辑,以确保信息准确反映了你的更改。
- 养成经常小批量提交的好习惯,这有助于更好地跟踪项目历史。

View File

@ -0,0 +1,6 @@
description: 'Writes a well-formatted commit message for selected code changes and commits them via Git. Include an issue number if desired (e.g., input "/commit to close #12").'
hint: to close Issue #issue_number
input: optional
help: README.md
steps:
- run: $devchat_python $command_path/commit.py "$input"

436
merico/commit/commit.py Normal file
View File

@ -0,0 +1,436 @@
# flake8: noqa: E402
import os
import re
import subprocess
import sys
# from llm_api import chat_completion_stream # noqa: E402
from devchat.llm import chat_completion_stream
from lib.chatmark import Checkbox, Form, TextEditor
from lib.ide_service import IDEService
diff_too_large_message_en = (
"Commit failed. The modified content is too long "
"and exceeds the model's length limit. "
"You can try to make partial changes to the file and submit multiple times. "
"Making small changes and submitting them multiple times is a better practice."
)
diff_too_large_message_zh = (
"提交失败。修改内容太长,超出模型限制长度,"
"可以尝试选择部分修改文件多次提交,小修改多提交是更好的做法。"
)
COMMIT_PROMPT_LIMIT_SIZE = 20000
def _T(en_text, zh_text):
"""
Returns a text in the current language.
:param en_text: The English version of the text
:param zh_text: The Chinese version of the text
:return: The text in the current language
"""
if IDEService().ide_language() == "zh":
return zh_text
else:
return en_text
def extract_markdown_block(text):
"""
Extracts the first Markdown code block from the given text without the language specifier.
:param text: A string containing Markdown text
:return: The content of the first Markdown code block, or None if not found
"""
# 正则表达式匹配Markdown代码块忽略可选的语言类型标记
pattern = r"```(?:\w+)?\s*\n(.*?)\n```"
match = re.search(pattern, text, re.DOTALL)
if match:
# 返回第一个匹配的代码块内容,去除首尾的反引号和语言类型标记
# 去除块结束标记前的一个换行符,但保留其他内容
block_content = match.group(1)
return block_content
else:
return text
# Read the prompt from the diffCommitMessagePrompt.txt file
def read_prompt_from_file(filename):
"""
Reads the content of a file and returns it as a string.
This function is designed to read a prompt message from a text file.
It expects the file to be encoded in UTF-8 and will strip any leading
or trailing whitespace from the content of the file. If the file does
not exist or an error occurs during reading, the function logs an error
message and exits the script.
Parameters:
- filename (str): The path to the file that contains the prompt message.
Returns:
- str: The content of the file as a string.
Raises:
- FileNotFoundError: If the file does not exist.
- Exception: If any other error occurs during file reading.
"""
s = IDEService()
try:
with open(filename, "r", encoding="utf-8") as file:
return file.read().strip()
except FileNotFoundError:
s.ide_logging(
"info",
f"File {filename} not found. "
"Please make sure it exists in the same directory as the script.",
)
sys.exit(1)
except Exception as e:
s.ide_logging("info", f"An error occurred while reading the file {filename}: {e}")
sys.exit(1)
# Read the prompt content from the file
script_path = os.path.dirname(__file__)
PROMPT_FILENAME = os.path.join(script_path, "diffCommitMessagePrompt.txt")
PROMPT_COMMIT_MESSAGE_BY_DIFF_USER_INPUT = read_prompt_from_file(PROMPT_FILENAME)
prompt_commit_message_by_diff_user_input_llm_config = {
"model": os.environ.get("LLM_MODEL", "gpt-3.5-turbo-1106")
}
language = ""
def assert_value(value, message):
"""
判断给定的value是否为True如果是则输出指定的message并终止程序
Args:
value: 用于判断的值
message: 如果value为True时需要输出的信息
Returns:
无返回值
"""
if value:
print(message, file=sys.stderr, flush=True)
sys.exit(-1)
def decode_path(encoded_path):
octal_pattern = re.compile(r"\\[0-7]{3}")
if octal_pattern.search(encoded_path):
bytes_path = encoded_path.encode("utf-8").decode("unicode_escape").encode("latin1")
decoded_path = bytes_path.decode("utf-8")
return decoded_path
else:
return encoded_path
def get_modified_files():
"""
获取当前修改文件列表以及已经staged的文件列表
Args:
Returns:
tuple: 包含两个list的元组第一个list包含当前修改过的文件第二个list包含已经staged的文件
"""
""" 获取当前修改文件列表以及已经staged的文件列表"""
output = subprocess.check_output(["git", "status", "-s", "-u"], text=True, encoding="utf-8")
lines = output.split("\n")
modified_files = []
staged_files = []
def strip_file_name(file_name):
file = file_name.strip()
if file.startswith('"'):
file = file[1:-1]
return file
for line in lines:
if len(line) > 2:
status, filename = line[:2], decode_path(line[3:])
# check wether filename is a directory
if os.path.isdir(filename):
continue
modified_files.append((os.path.normpath(strip_file_name(filename)), status[1:2]))
if status[0:1] == "M" or status[0:1] == "A" or status[0:1] == "D":
staged_files.append((os.path.normpath(strip_file_name(filename)), status[0:1]))
return modified_files, staged_files
def get_marked_files(modified_files, staged_files):
"""
根据给定的参数获取用户选中以供提交的文件
Args:
modified_files (List[str]): 用户已修改文件列表
staged_files (List[str]): 用户已staged文件列表
Returns:
List[str]: 用户选中的文件列表
"""
# Create two Checkbox instances for staged and unstaged files
staged_files_show = [f'{file[1] if file[1]!="?" else "U"} {file[0]}' for file in staged_files]
staged_checkbox = Checkbox(staged_files_show, [True] * len(staged_files_show))
unstaged_files = [file for file in modified_files if file[1].strip() != ""]
unstaged_files_show = [
f'{file[1] if file[1]!="?" else "U"} {file[0]}' for file in unstaged_files
]
unstaged_checkbox = Checkbox(unstaged_files_show, [False] * len(unstaged_files_show))
# Create a Form with both Checkbox instances
form_list = []
if len(staged_files) > 0:
form_list.append("Staged:\n\n")
form_list.append(staged_checkbox)
if len(unstaged_files) > 0:
form_list.append("Unstaged:\n\n")
form_list.append(unstaged_checkbox)
form = Form(form_list, submit_button_name="Continue")
# Render the Form and get user input
form.render()
# Retrieve the selected files from both Checkbox instances
staged_checkbox_selections = staged_checkbox.selections if staged_checkbox.selections else []
unstaged_selections = unstaged_checkbox.selections if unstaged_checkbox.selections else []
selected_staged_files = [staged_files[idx][0] for idx in staged_checkbox_selections]
selected_unstaged_files = [unstaged_files[idx][0] for idx in unstaged_selections]
return selected_staged_files, selected_unstaged_files
def rebuild_stage_list(staged_select_files, unstaged_select_files):
"""
根据用户选中文件重新构建stage列表
Args:
staged_select_files: 当前选中的已staged文件列表
unstaged_select_files: 当前选中的未staged文件列表
Returns:
None
"""
# 获取当前所有staged文件
current_staged_files = subprocess.check_output(
["git", "diff", "--name-only", "--cached"], text=True
).splitlines()
# 添加unstaged_select_files中的文件到staged
for file in unstaged_select_files:
subprocess.check_output(["git", "add", file])
# 将不在staged_select_files中的文件从staged移除
user_selected_files = staged_select_files + unstaged_select_files
files_to_unstage = [file for file in current_staged_files if file not in user_selected_files]
for file in files_to_unstage:
subprocess.check_output(["git", "reset", file])
def get_diff():
"""
获取暂存区文件的Diff信息
Args:
Returns:
bytes: 返回bytes类型是git diff --cached命令的输出结果
"""
return subprocess.check_output(["git", "diff", "--cached"])
def get_current_branch():
try:
# 使用git命令获取当前分支名称
result = subprocess.check_output(
["git", "branch", "--show-current"], stderr=subprocess.STDOUT
).strip()
# 将结果从bytes转换为str
current_branch = result.decode("utf-8")
return current_branch
except subprocess.CalledProcessError:
# 如果发生错误,打印错误信息
return None
except FileNotFoundError:
# 如果未找到git命令可能是没有安装git或者不在PATH中
return None
def generate_commit_message_base_diff(user_input, diff):
"""
根据diff信息通过AI生成一个commit消息
Args:
user_input (str): 用户输入的commit信息
diff (str): 提交的diff信息
Returns:
str: 生成的commit消息
"""
global language
language_prompt = "You must response commit message in chinese。\n" if language == "zh" else ""
prompt = PROMPT_COMMIT_MESSAGE_BY_DIFF_USER_INPUT.replace("{__DIFF__}", f"{diff}").replace(
"{__USER_INPUT__}", f"{user_input + language_prompt}"
)
model_token_limit_error = (
diff_too_large_message_en if language == "en" else diff_too_large_message_zh
)
if len(str(prompt)) > COMMIT_PROMPT_LIMIT_SIZE:
print(model_token_limit_error, flush=True)
sys.exit(0)
messages = [{"role": "user", "content": prompt}]
response = chat_completion_stream(messages, prompt_commit_message_by_diff_user_input_llm_config)
if (
not response["content"]
and response.get("error", None)
and f'{response["error"]}'.find("This model's maximum context length is") > 0
):
print(model_token_limit_error)
sys.exit(0)
assert_value(not response["content"], response.get("error", ""))
response["content"] = extract_markdown_block(response["content"])
return response
def display_commit_message_and_commit(commit_message):
"""
展示提交信息并提交
Args:
commit_message: 提交信息
Returns:
None
"""
text_editor = TextEditor(commit_message, submit_button_name="Commit")
text_editor.render()
new_commit_message = text_editor.new_text
if not new_commit_message:
return None
return subprocess.check_output(["git", "commit", "-m", new_commit_message])
def check_git_installed():
try:
subprocess.run(
["git", "--version"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
check=True,
)
return True
except subprocess.CalledProcessError:
print("Git is not installed on your system.", file=sys.stderr, flush=True)
except FileNotFoundError:
print("Git is not installed on your system.", file=sys.stderr, flush=True)
except Exception:
print("Git is not installed on your system.", file=sys.stderr, flush=True)
return False
def main():
global language
try:
start_msg = _T("Let's follow the steps below.\n\n", "开始按步骤操作。\n\n")
print(start_msg)
# Ensure enough command line arguments are provided
if len(sys.argv) < 2:
print("Usage: python script.py <user_input>", file=sys.stderr, flush=True)
sys.exit(-1)
user_input = sys.argv[1]
language = IDEService().ide_language()
if not check_git_installed():
sys.exit(-1)
step1_msg = _T(
"Step 1/2: Select the files you've changed that you wish to include in this commit, "
"then click 'Submit'.",
"第一步/2选择您希望包含在这次提交中的文件然后点击“提交”。",
)
print(step1_msg, end="\n\n", flush=True)
modified_files, staged_files = get_modified_files()
if len(modified_files) == 0:
print("There are no files to commit.", flush=True)
sys.exit(0)
staged_select_files, unstaged_select_files = get_marked_files(modified_files, staged_files)
if not staged_select_files and not unstaged_select_files:
no_files_msg = _T(
"No files selected, the commit has been aborted.",
"没有选择任何文件,提交已中止。",
)
print(no_files_msg)
return
rebuild_stage_list(staged_select_files, unstaged_select_files)
step2_msg = _T(
"Step 2/2: Review the commit message I've drafted for you. "
"Edit it below if needed. Then click 'Commit' to proceed with "
"the commit using this message.",
"第二步/2查看我为您起草的提交消息。如果需要请在下面编辑它。然后单击“提交”以使用此消息进行提交。",
)
print(step2_msg, end="\n\n", flush=True)
diff = get_diff()
branch_name = get_current_branch()
if branch_name:
user_input += "\ncurrent repo branch name is:" + branch_name
commit_message = generate_commit_message_base_diff(user_input, diff)
if not commit_message:
sys.exit(1)
# TODO
# remove Closes #IssueNumber in commit message
commit_message["content"] = (
commit_message["content"]
.replace("Closes #IssueNumber", "")
.replace("No specific issue to close", "")
.replace("No specific issue mentioned.", "")
)
commit_result = display_commit_message_and_commit(commit_message["content"])
if not commit_result:
commit_abort_msg = _T(
"Commit aborted.",
"提交已中止。",
)
print(commit_abort_msg, flush=True)
else:
commit_completed_msg = _T(
"Commit completed.",
"提交已完成。",
)
print(commit_completed_msg, flush=True)
sys.exit(0)
except Exception as err:
print("Exception:", err, file=sys.stderr, flush=True)
sys.exit(-1)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,37 @@
Objective:** Generate a commit message that succinctly describes the codebase changes reflected in the provided diff, while incorporating any extra context or guidance from the user.
**Commit Message Structure:**
1. **Title Line:** Choose a type such as `feat`, `fix`, `docs`, `style`, `refactor`, `perf`, `test`, `build`, `ci`, `chore`, and so on, and couple it with a succinct title. Use the format: `type: Title`. Only one title line is permissible.
2. **Summary:** Summarize all adjustments concisely within a maximum of three detailed message lines. Prefix each line with a \"-\".
3. **Closing Reference (Conditional):** Include the line `Closes #IssueNumber` only if a specific, relevant issue number has been mentioned in the user input.
**Response Format:**
```
type: Title
- Detail message line 1
- Detail message line 2
- Detail message line 3
Closes #IssueNumber
```
Only append the \"Closes #IssueNumber\" if the user input explicitly references an issue to close.
**Constraints:**
- Exclude markdown code block indicators (```) and the placeholder \"commit_message\" from your response.
- Follow commit message best practices:
- Limit the title length to 50 characters.
- Limit each summary line to 72 characters.
- If the precise issue number is not known or not stated by the user, do not include the closing reference.
**User Input:** `{__USER_INPUT__}`
Determine if `{__USER_INPUT__}` contains a reference to closing an issue. If so, include the closing reference in the commit message. Otherwise, exclude it.
**Code Changes:**
```
{__DIFF__}
```
Utilize the provided format to craft a commit message that adheres to the stipulated criteria.

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,9 @@
prompts:
[
'../../diffCommitMessagePrompt.txt'
]
providers:
- id: "exec:python commit_message_run.py"
tests:
- commit_message_tests.yaml

View File

@ -0,0 +1,86 @@
import json
import os
import sys
import requests
from devchat.llm import chat_completion_stream
def get_script_path():
"""Return the directory path of the current script."""
return os.path.dirname(__file__)
def load_commit_cache():
"""Load or initialize the commit cache."""
try:
cache_filepath = os.path.join(get_script_path(), "commit_cache.json")
if not os.path.exists(cache_filepath):
return {}
with open(cache_filepath, "r", encoding="utf-8") as cache_file:
return json.load(cache_file)
except (FileNotFoundError, json.JSONDecodeError):
return None
def save_commit_cache(commit_cache):
"""Save the commit cache to a JSON file."""
try:
cache_filepath = os.path.join(get_script_path(), "commit_cache.json")
with open(cache_filepath, "w", encoding="utf-8") as cache_file:
json.dump(commit_cache, cache_file)
except IOError as e:
print(f"Error saving commit cache: {e}")
def get_commit_diff(url, commit_cache):
"""Extract commit diff from the given URL."""
parts = url.split("/")
user = parts[3]
repo = parts[4]
commit_hash = parts[6]
api_url = f"https://api.github.com/repos/{user}/{repo}/commits/{commit_hash}"
for _1 in range(5):
try:
if commit_hash in commit_cache and "diff" in commit_cache[commit_hash]:
return commit_cache[commit_hash]["diff"]
response = requests.get(
api_url,
headers={"Accept": "application/vnd.github.v3.diff"},
timeout=20,
)
if response.status_code == 200:
if commit_hash not in commit_cache:
commit_cache[commit_hash] = {}
commit_cache[commit_hash]["diff"] = response.text
return response.text
except Exception:
continue
raise Exception("Error: Unable to fetch the commit diff.")
def get_commit_messages():
"""Compose commit messages based on the provided commit URL."""
commit_cache = load_commit_cache()
if commit_cache is None:
sys.exit(-1)
prompt = sys.argv[1]
context = json.loads(sys.argv[3])
commit_url = context["vars"]["commit_url"]
diff = get_commit_diff(commit_url, commit_cache)
save_commit_cache(commit_cache)
prompt = prompt.replace("{__USER_INPUT__}", "").replace("{__DIFF__}", diff)
messages = [{"role": "user", "content": prompt}]
response = chat_completion_stream(messages, {"model": "gpt-4-1106-preview"})
print(response.get("content", "")) if response.get("content", "") else print(response)
if __name__ == "__main__":
get_commit_messages()

View File

@ -0,0 +1,126 @@
- vars:
commit_message: The commit fixes a use-after-free error in the AFS volume tree
due to a race condition between volume lookup and removal. The patch prevents
looking up volumes with zero reference count and ensures that once a volume's
refcount reaches zero, it is flagged and not looked up again. This resolves
crashes and warnings caused by the race.
commit_url: https://github.com/torvalds/linux/commit/9a6b294ab496650e9f270123730df37030911b55
- vars:
commit_message: Fix implemented in IDA to prevent crashes during `ida_free` when
the bitmap is empty, addressing overlooked double-free detection for NULL bitmaps.
New tests added, albeit with noisy output, to avoid future regressions. Reported
by Zhenghan Wang and signed by Matthew Wilcox and Linus Torvalds.
commit_url: https://github.com/torvalds/linux/commit/af73483f4e8b6f5c68c9aa63257bdd929a9c194a
- vars:
commit_message: This update allows DNS-related keys to be immediately reclaimed
upon expiry without delay, facilitating faster DNS lookups. Previously, expired
keys had a default 5-minute buffer period. Now, DNS keys expire instantly and
negative DNS results have a 1-second default expiry if not otherwise specified.
commit_url: https://github.com/torvalds/linux/commit/39299bdd2546688d92ed9db4948f6219ca1b9542
- vars:
commit_message: The commit updates the names of constants for USB PIDs in the
ftdi_sio driver to match the new products assigned to those IDs, ensuring accuracy
in the identification of Actisense products.
commit_url: https://github.com/torvalds/linux/commit/513d88a88e0203188a38f4647dd08170aebd85df
- vars:
commit_message: The IPv6 network code is reverting a previous change due to race
conditions and use-after-free errors related to managing expired routes. The
problematic commit aimed to handle expired routes with a separate list but introduced
issues. It will be revisited in a future release. Fixes and reviews are noted.
commit_url: https://github.com/torvalds/linux/commit/dade3f6a1e4e35a5ae916d5e78b3229ec34c78ec
- vars:
commit_message: Confirmed successful generation of events2table.c.
commit_url: https://github.com/ruby/ruby/commit/7016ab873eaa68d1dfe1af50198c157e451c784b
- vars:
commit_message: Introduced a new `errno_ptr` property to the Universal Parser
to facilitate error tracking, allowing the parser to communicate and store error
codes more efficiently during parsing operations.
commit_url: https://github.com/ruby/ruby/commit/4374236e959c1e585611acfc7a2e3d2142265ab0
- vars:
commit_message: The commit introduces an `ary_modify` property to the Universal
Parser, enabling flexible array modifications during data parsing.
commit_url: https://github.com/ruby/ruby/commit/73fa3224975c42e1c4e2231212a64ac325054130
- vars:
commit_message: 'Fixed the ARCH_FLAG issue to enable correct cross-compilation
as mandated by bug report #20088.'
commit_url: https://github.com/ruby/ruby/commit/2a4a84664a1972c48c4365c29e73be83f8004139
- vars:
commit_message: Added basic documentation for constants in the Ruby 'etc' library,
enhancing code understandability and usability by providing information on their
purpose and usage.
commit_url: https://github.com/ruby/ruby/commit/c027dcfde2bf40c45dfb0fe1b79f97b8827d89f3
- vars:
commit_message: "Implemented automatic import feature for the `imports` field\
\ in `package.json`, enhancing the module resolution process for projects, with\
\ contributions from Mateusz Burzy\u0144ski and Andrew Branch."
commit_url: https://github.com/microsoft/TypeScript/commit/fbcdb8cf4fbbbea0111a9adeb9d0d2983c088b7c
- vars:
commit_message: 'Implemented checks to ensure enums are consistent across different
versions to maintain compatibility in project issue #55924.'
commit_url: https://github.com/microsoft/TypeScript/commit/93e6b9da0c4cb164ca90a5a1b07415e81e97f2b1
- vars:
commit_message: This commit provides type suggestions for `@types/bun` when the
`Bun` global is not detected, aiding in proper type hinting and code completion.
(#56795)
commit_url: https://github.com/microsoft/TypeScript/commit/0e5927d5d38399bac1818ad4160f2ff91c33c174
- vars:
commit_message: Added functionality to support the 'const' modifier on type parameters
within JSDoc, enhancing TypeScript compatibility and type-checking features.
(#56649)
commit_url: https://github.com/microsoft/TypeScript/commit/a36d04fc63c83b6ec5a8099942b653a5caa29eb1
- vars:
commit_message: 'Corrected a formatting issue where a space was missing after
the ''implements'' or ''extends'' keywords when followed by generics in typescript
files, resolving issue #56699.'
commit_url: https://github.com/microsoft/TypeScript/commit/2c134db31df48ba5f158f490168dea733a11ae44
- vars:
commit_message: The commit updates the configuration to ensure that functions
major(), makedev(), and minor() are enabled on HP-UX systems, by enforcing the
inclusion order of <sys/types.h> before <sys/sysmacros.h>. Co-authored by Serhiy
Storchaka.
commit_url: https://github.com/python/cpython/commit/f108468970bf4e70910862476900f924fb701399
- vars:
commit_message: Fixed the `--with-openssl-rpath` option for macOS to correctly
use the platform-specific `-rpath` linker flag, resolving compatibility issues
on that OS. (#113441)
commit_url: https://github.com/python/cpython/commit/cc13eabc7ce08accf49656e258ba500f74a1dae8
- vars:
commit_message: The commit deprecates the `_enablelegacywindowsfsencoding` method
(#107729) which is tracked in issue gh-73427.
commit_url: https://github.com/python/cpython/commit/bfee2f77e16f01a718c1044564ee624f1f2bc328
- vars:
commit_message: 'Moved the `cpp.py` script to the `libclinic` directory for better
organization as part of Pull Request #113526, which addresses issue gh-113299.'
commit_url: https://github.com/python/cpython/commit/7ab9efdd6a2fb21cddca1ccd70175f1ac6bd9168
- vars:
commit_message: Refactored error handling in Argument Clinic's C preprocessor
helper by creating a subclass of ClinicError for improved exception management,
and relocated ClinicError into a separate errors module, streamlining preparations
to integrate cpp.py into libclinic.
commit_url: https://github.com/python/cpython/commit/87295b4068762f9cbdfcae5fed5ff54aadd3cb62
- vars:
commit_message: 'Updated the ''lib/time'' package to the 2023d version. This update
was part of addressing issue #22487 and included a code review process with
multiple reviewers before auto-submission.'
commit_url: https://github.com/golang/go/commit/36a2463e7c01151b05fff9a1f1c6fb08d764c82e
- vars:
commit_message: 'This commit fixes issue #64826 by ensuring the Go compiler correctly
handles the constant-folding of jump table indices that are out of range, even
for statically unreachable code.'
commit_url: https://github.com/golang/go/commit/f6509cf5cdbb5787061b784973782933c47f1782
- vars:
commit_message: 'The `go mod init` command no longer imports configurations from
other vendoring tools as of Go 1.22, with the change documented and support
removed. This addresses issues #61422 and fixes #53327.'
commit_url: https://github.com/golang/go/commit/9dd1cde9ac0f1e935ed44d33f6b4668be538c1ed
- vars:
commit_message: 'The commit reverts a change in the ''go'' command that tried
to improve flag parsing for ''go run'' and ''go install''. It reintroduces Go
1.21 behavior and defers the decision for a fix to a later date due to new problems.
It addresses issue #64738 and rolls back to a previous commit.'
commit_url: https://github.com/golang/go/commit/52dbffeac86863e1e0c9455b5b216ec50c828946
- vars:
commit_message: Removed the 'interfacecycles' debug flag from Go compiler since
no related issues have emerged since Go 1.20, eliminating checks for anonymous
interface cycles. The relevant tests have been updated to reflect this change.
commit_url: https://github.com/golang/go/commit/6fe0d3758b35afcc342832e376d8d985a5a29070

View File

@ -0,0 +1,89 @@
## Overview
This README provides information about the workflow used for generating commit messages based on commit differences. The workflow involves recording prompts, defining test cases, and running scripts to process these cases.
## Dependency Installation
Before you can run the tests for commit message generation, you need to install the necessary dependencies. This involves setting up both Node.js and Python environments. Follow the steps below to install the dependencies.
### Node.js Dependency
To install the `promptfoo` tool using `npx`, run the following command:
```bash
npx promptfoo@latest
```
This command will ensure that you are using the latest version of `promptfoo`.
### Python Dependencies
The test script requires certain Python packages to be installed. To install these dependencies, navigate to the directory containing the `requirements-test.txt` file and run:
```bash
pip install -r commit/test/prompt/requirements-test.txt
```
This will install all the Python packages listed in the `requirements-test.txt` file, which are necessary for the test script to function correctly.
After completing these steps, you should have all the required dependencies installed and be ready to proceed with the test execution as described in the **Getting Started** section of this README.
## Getting Started
To get started with the commit message generation workflow:
1. Ensure you have `npx` and `promptfoo` installed and configured on your system.
2. Navigate to the directory containing the workflow files.
3. Review and update the test cases in `commit_message_tests.yaml` as needed.
4. Run the test execution command to generate commit messages.
5. Use the test results viewing command to analyze the results.
For a quick way to execute the current test cases, you can use the provided script file `./run.sh`. This script will automate the process of installation and test execution for you. Simply run the following command in your terminal:
```bash
./run.sh
```
This will ensure that all necessary dependencies are installed, and then it will proceed to run the tests as defined in your test cases file.
For any additional help or to report issues, please refer to the project's issue tracker or contact the repository administrator.
## File Structure
- **Prompt Record File**: `./commit/diffCommitMessagePrompt.txt`
- This file contains the prompts that are used to generate commit messages.
- **Test Cases File**: `./commit/test/prompt/commit_message_tests.yaml`
- This YAML file holds the test cases for the commit message generation tests. Each test case includes the `commit_url` which points to the specific commit differences to be used for generating the commit message.
- **Test Script**: `./commit/test/prompt/commit_message_run.py`
- This is the Python script that processes the test cases. It fetches the commit differences from the provided `commit_url` and generates the commit messages accordingly.
- **Test Configuration File**: `./commit/test/prompt/commit_message_prompt_config.yaml`
- This YAML file provides the overall description of the tests and configurations needed for the test execution.
## Test Execution
To execute the tests for commit message generation, use the following command:
```bash
npx promptfoo eval -c commit/test/prompt/commit_message_prompt_config.yaml
```
This command will run the tests as defined in the `commit_message_prompt_config.yaml` file.
## Viewing Test Results
After running the tests, you can display the results using the command below:
```bash
npx promptfoo view
```
This command will present the outcomes of the tests, allowing you to review the generated commit messages and their respective test cases.

View File

@ -0,0 +1,3 @@
requests>=2.31.0
openai>=1.6.1
pyyaml>=6.0.1

View File

@ -0,0 +1,33 @@
#!/bin/bash
# 获取脚本所在目录
SCRIPT_DIR=$(dirname "$0")
TEST_DIR="${SCRIPT_DIR}"
REQUIREMENTS_FILE="${TEST_DIR}/requirements-test.txt"
CONFIG_FILE="${TEST_DIR}/commit_message_prompt_config.yaml"
# 安装Node.js依赖
echo "Installing promptfoo using npx..."
npx promptfoo@latest
# 检查Python是否安装
if ! command -v python3 &> /dev/null
then
echo "Python could not be found, please install Python 3."
exit 1
fi
# 安装Python依赖
echo "Installing Python dependencies..."
pip3 install -r ${REQUIREMENTS_FILE}
# 执行测试用例
echo "Running test cases..."
npx promptfoo eval -c ${CONFIG_FILE}
# 打开测试结果视图
echo "Opening test results view..."
npx promptfoo view
# 执行结束
echo "Test execution and result viewing completed."

View File

@ -1,4 +1,4 @@
description: 'Generate code task summary.'
description: 'Create new branch based current branch, and checkout new branch.'
input: optional
help: README.md
steps:

View File

@ -0,0 +1 @@
description: Root of git commands.

View File

@ -6,20 +6,13 @@ import sys
from devchat.llm import chat_completion_stream
from lib.chatmark import Button, Checkbox, Form, TextEditor
from lib.chatmark import Checkbox, Form, TextEditor
from lib.ide_service import IDEService
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), ".."))
from common_util import assert_exit # noqa: E402
from git_api import (
get_github_repo,
get_github_repo_issues,
get_github_username,
get_issue_info,
subprocess_check_output,
subprocess_run,
)
from git_api import get_issue_info, subprocess_check_output, subprocess_run
diff_too_large_message_en = (
"Commit failed. The modified content is too long "
@ -293,7 +286,7 @@ def generate_commit_message_base_diff(user_input, diff, issue):
if (
not response["content"]
and response.get("error", None)
and f"{response['error']}".find("This model's maximum context length is") > 0
and f'{response["error"]}'.find("This model's maximum context length is") > 0
):
print(model_token_limit_error)
sys.exit(0)
@ -362,112 +355,23 @@ def check_git_installed():
return False
def ask_for_push():
"""
询问用户是否要推送(push)更改到远程仓库
Returns:
bool: 用户是否选择推送
"""
print(
"Step 3/3: Would you like to push your commit to the remote repository?",
end="\n\n",
flush=True,
)
button = Button(["Yes, push now", "No, I'll push later"])
button.render()
return button.clicked == 0 # 如果用户点击第一个按钮(Yes)则返回True
def push_changes():
"""
推送更改到远程仓库
Returns:
bool: 推送是否成功
"""
try:
current_branch = get_current_branch()
if not current_branch:
print(
"Could not determine current branch. Push failed.",
end="\n\n",
file=sys.stderr,
flush=True,
)
return False
print(f"Pushing changes to origin/{current_branch}...", end="\n\n", flush=True)
result = subprocess_run(
["git", "push", "origin", current_branch],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
if result.returncode != 0:
print(f"Push failed: {result.stderr}", end="\n\n", flush=True)
return False
print("Push completed successfully.", end="\n\n", flush=True)
return True
except subprocess.CalledProcessError as e:
print(f"Push failed: {str(e)}", end="\n\n", file=sys.stderr, flush=True)
return False
except Exception as e:
print(
f"An unexpected error occurred: {str(e)}",
end="\n\n",
file=sys.stderr,
flush=True,
)
return False
def get_selected_issue_ids():
"""
获取用户选中的issue id
Returns:
list: 用户选中的issue id列表
"""
name = get_github_username()
issue_repo = get_github_repo(True)
issues = get_github_repo_issues(issue_repo, assignee=name, state="open")
if issues:
checkbox = Checkbox(
[f"#{issue['number']}: {issue['title']}" for issue in issues],
title="Select the issues you want to close",
)
checkbox.render()
return [issues[idx]["number"] for idx in checkbox.selections]
def main():
global language
try:
print("Let's follow the steps below.\n\n")
# Ensure enough command line arguments are provided
if len(sys.argv) < 2:
print(
"Usage: python script.py <user_input> <language>",
file=sys.stderr,
flush=True,
)
if len(sys.argv) < 3:
print("Usage: python script.py <user_input> <language>", file=sys.stderr, flush=True)
sys.exit(-1)
user_input = sys.argv[1]
language = "english"
if len(sys.argv) > 2:
language = sys.argv[2]
language = sys.argv[2]
if not check_git_installed():
sys.exit(-1)
print(
"Step 1/3: Select the files you've changed that you wish to include in this commit, "
"Step 1/2: Select the files you've changed that you wish to include in this commit, "
"then click 'Submit'.",
end="\n\n",
flush=True,
@ -485,7 +389,7 @@ def main():
rebuild_stage_list(selected_files)
print(
"Step 2/3: Review the commit message I've drafted for you. "
"Step 2/2: Review the commit message I've drafted for you. "
"Edit it below if needed. Then click 'Commit' to proceed with "
"the commit using this message.",
end="\n\n",
@ -507,27 +411,11 @@ def main():
.replace("No specific issue to close", "")
.replace("No specific issue mentioned.", "")
)
# add closes #IssueNumber in commit message from issues from user selected
issue_ids = get_selected_issue_ids()
if issue_ids:
issue_repo = get_github_repo(True)
owner_repo = get_github_repo()
closes_issue_contents = []
for issue_id in issue_ids:
closes_issue_contents.append(
f"#{issue_id}" if owner_repo == issue_repo else f"{issue_repo}#{issue_id}"
)
commit_message["content"] += f"\n\nCloses {', '.join(closes_issue_contents)}"
commit_result = display_commit_message_and_commit(commit_message["content"])
if not commit_result:
print("Commit aborted.", flush=True)
else:
# 添加推送步骤
if ask_for_push():
if not push_changes():
print("Push failed.", flush=True)
sys.exit(-1)
print("Commit completed.", flush=True)
sys.exit(0)
except Exception as err:

View File

@ -6,18 +6,16 @@ Objective:** Generate a commit message that succinctly describes the codebase ch
3. **Closing Reference (Conditional):** Include the line `Closes #IssueNumber` only if a specific, relevant issue number has been mentioned in the user input.
**Response Format:**
Response should be in the following markdown codeblock format:
```commit
```
type: Title
- Detail message line 1
- Detail message line 2
- Detail message line 3
Closes <#IssueNumber>
Closes #IssueNumber
```
Only append the \"Closes #IssueNumber\" if the user input explicitly references an issue to close.
Only output the commit message codeblock, don't include any other text.
**Constraints:**
- Exclude markdown code block indicators (```) and the placeholder \"commit_message\" from your response.
@ -39,11 +37,3 @@ Related issue:
{__ISSUE__}
Utilize the provided format to craft a commit message that adheres to the stipulated criteria.
example output:
```commit
feature: add update user info API
- add post method api /user/update
- implement update user info logic
```

View File

@ -203,10 +203,7 @@ def check_git_installed():
"""
try:
subprocess_run(
["git", "--version"],
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
["git", "--version"], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
return True
except subprocess.CalledProcessError:
@ -307,8 +304,7 @@ def get_parent_branch():
try:
# 使用git命令获取当前分支的父分支引用
result = subprocess_check_output(
["git", "rev-parse", "--abbrev-ref", f"{current_branch}@{1}"],
stderr=subprocess.STDOUT,
["git", "rev-parse", "--abbrev-ref", f"{current_branch}@{1}"], stderr=subprocess.STDOUT
).strip()
# 将结果从bytes转换为str
parent_branch_ref = result.decode("utf-8")
@ -398,10 +394,7 @@ def get_commit_messages(base_branch):
def create_pull_request(title, body, head, base, repo_name):
url = f"{GITHUB_API_URL}/repos/{repo_name}/pulls"
print("url:", url, end="\n\n")
headers = {
"Authorization": f"token {GITHUB_ACCESS_TOKEN}",
"Content-Type": "application/json",
}
headers = {"Authorization": f"token {GITHUB_ACCESS_TOKEN}", "Content-Type": "application/json"}
payload = {"title": title, "body": body, "head": head, "base": base}
response = requests.post(url, headers=headers, data=json.dumps(payload))
if response.status_code == 201:
@ -486,10 +479,7 @@ def get_recently_pr(repo):
def update_pr(pr_number, title, body, repo_name):
url = f"{GITHUB_API_URL}/repos/{repo_name}/pulls/{pr_number}"
headers = {
"Authorization": f"token {GITHUB_ACCESS_TOKEN}",
"Content-Type": "application/json",
}
headers = {"Authorization": f"token {GITHUB_ACCESS_TOKEN}", "Content-Type": "application/json"}
payload = {"title": title, "body": body}
response = requests.patch(url, headers=headers, data=json.dumps(payload))
@ -536,52 +526,3 @@ def save_last_base_branch(base_branch=None):
base_branch = get_current_branch()
project_config_path = os.path.join(os.getcwd(), ".chat", ".workflow_config.json")
save_config_item(project_config_path, "last_base_branch", base_branch)
def get_github_username():
url = f"{GITHUB_API_URL}/user"
headers = {
"Authorization": f"token {GITHUB_ACCESS_TOKEN}",
"Accept": "application/vnd.github.v3+json",
}
response = requests.get(url, headers=headers)
return response.json()["login"]
def get_github_repo_issues(
owner_repo,
milestone=None,
state=None,
assignee=None,
creator=None,
mentioned=None,
labels=None,
sort=None,
direction=None,
since=None,
per_page=None,
page=None,
):
url = f"{GITHUB_API_URL}/repos/{owner_repo}/issues"
headers = {
"Authorization": f"token {GITHUB_ACCESS_TOKEN}",
"Accept": "application/vnd.github.v3+json",
}
params = {
"milestone": milestone,
"state": state,
"assignee": assignee,
"creator": creator,
"mentioned": mentioned,
"labels": labels,
"sort": sort,
"direction": direction,
"since": since,
"per_page": per_page,
"page": page,
}
response = requests.get(url, headers=headers, params=params)
if response.status_code == 200:
return response.json()
else:
return None

View File

@ -1,4 +1,4 @@
description: 'Update PR.'
description: 'Create new PR.'
input: required
help: README.md
steps:

View File

@ -1,4 +1,4 @@
description: 'Generate code task summary.'
description: 'Create new branch based current branch, and checkout new branch.'
input: optional
help: README.md
steps:

View File

@ -0,0 +1 @@
description: Root of git commands.

View File

@ -6,7 +6,7 @@ import sys
from devchat.llm import chat_completion_stream
from lib.chatmark import Button, Checkbox, Form, TextEditor
from lib.chatmark import Checkbox, Form, TextEditor
from lib.ide_service import IDEService
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), ".."))
@ -286,7 +286,7 @@ def generate_commit_message_base_diff(user_input, diff, issue):
if (
not response["content"]
and response.get("error", None)
and f"{response['error']}".find("This model's maximum context length is") > 0
and f'{response["error"]}'.find("This model's maximum context length is") > 0
):
print(model_token_limit_error)
sys.exit(0)
@ -355,84 +355,23 @@ def check_git_installed():
return False
def ask_for_push():
"""
询问用户是否要推送(push)更改到远程仓库
Returns:
bool: 用户是否选择推送
"""
print(
"Step 3/3: Would you like to push your commit to the remote repository?",
end="\n\n",
flush=True,
)
button = Button(["Yes, push now", "No, I'll push later"])
button.render()
return button.clicked == 0 # 如果用户点击第一个按钮(Yes)则返回True
def push_changes():
"""
推送更改到远程仓库
Returns:
bool: 推送是否成功
"""
try:
current_branch = get_current_branch()
if not current_branch:
print(
"Could not determine current branch. Push failed.",
end="\n\n",
file=sys.stderr,
flush=True,
)
return False
print(f"Pushing changes to origin/{current_branch}...", end="\n\n", flush=True)
result = subprocess_run(
["git", "push", "origin", current_branch],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
if result.returncode != 0:
print(f"Push failed: {result.stderr}", end="\n\n", flush=True)
return False
print("Push completed successfully.", end="\n\n", flush=True)
return True
except subprocess.CalledProcessError as e:
print(f"Push failed: {str(e)}", end="\n\n", file=sys.stderr, flush=True)
return False
except Exception as e:
print(f"An unexpected error occurred: {str(e)}", end="\n\n", file=sys.stderr, flush=True)
return False
def main():
global language
try:
print("Let's follow the steps below.\n\n")
# Ensure enough command line arguments are provided
if len(sys.argv) < 2:
if len(sys.argv) < 3:
print("Usage: python script.py <user_input> <language>", file=sys.stderr, flush=True)
sys.exit(-1)
user_input = sys.argv[1]
language = "english"
if len(sys.argv) > 2:
language = sys.argv[2]
language = sys.argv[2]
if not check_git_installed():
sys.exit(-1)
print(
"Step 1/3: Select the files you've changed that you wish to include in this commit, "
"Step 1/2: Select the files you've changed that you wish to include in this commit, "
"then click 'Submit'.",
end="\n\n",
flush=True,
@ -450,7 +389,7 @@ def main():
rebuild_stage_list(selected_files)
print(
"Step 2/3: Review the commit message I've drafted for you. "
"Step 2/2: Review the commit message I've drafted for you. "
"Edit it below if needed. Then click 'Commit' to proceed with "
"the commit using this message.",
end="\n\n",
@ -477,12 +416,6 @@ def main():
if not commit_result:
print("Commit aborted.", flush=True)
else:
# 添加推送步骤
if ask_for_push():
if not push_changes():
print("Push failed.", flush=True)
sys.exit(-1)
print("Commit completed.", flush=True)
sys.exit(0)
except Exception as err:

View File

@ -6,21 +6,23 @@ Objective:** Generate a commit message that succinctly describes the codebase ch
3. **Closing Reference (Conditional):** Include the line `Closes #IssueNumber` only if a specific, relevant issue number has been mentioned in the user input.
**Response Format:**
Response should be in the following markdown codeblock format:
```commit
```
type: Title
- Detail message line 1
- Detail message line 2
- Detail message line 3
Closes #IssueNumber
```
Only output the commit message codeblock, don't include any other text.
Only append the \"Closes #IssueNumber\" if the user input explicitly references an issue to close.
**Constraints:**
- Exclude markdown code block indicators (```) and the placeholder \"commit_message\" from your response.
- Follow commit message best practices:
- Limit the title length to 50 characters.
- Limit each summary line to 72 characters.
- If the precise issue number is not known or not stated by the user, do not include the closing reference.
**User Input:** `{__USER_INPUT__}`
@ -35,11 +37,3 @@ Related issue:
{__ISSUE__}
Utilize the provided format to craft a commit message that adheres to the stipulated criteria.
example output:
```commit
feature: add update user info API
- add post method api /user/update
- implement update user info logic
```

Some files were not shown because too many files have changed in this diff Show More