add /test.api.upload

This commit is contained in:
bobo.yang 2025-03-11 13:31:50 +08:00
parent 344fbff719
commit 99b7aca0fc
8 changed files with 562 additions and 0 deletions

View File

View File

View File

@ -0,0 +1,28 @@
### test.api.config
配置API测试工作流所需的全局和仓库相关设置。
#### 用途
- 配置服务器连接信息SERVER_URL, USERNAME, PASSWORD
- 配置项目相关信息PROJECT_ID, OPENAPI_URL, VERSION_URL
#### 使用方法
执行命令: `/test.api.config`
#### 操作流程
1. 输入服务器URL例如: http://kagent.merico.cn:8000
2. 输入用户名
3. 输入密码
4. 输入项目ID例如: 37
5. 输入OpenAPI文档URL例如: http://kagent.merico.cn:8080/openapi.json
6. 输入版本信息URL例如: http://kagent.merico.cn:8080/version
7. 保存配置信息
#### 配置信息存储位置
- 全局配置SERVER_URL, USERNAME, PASSWORD保存在 `~/.chat/.workflow_config.json`
- 仓库配置PROJECT_ID, OPENAPI_URL, VERSION_URL保存在当前仓库的 `.chat/.workflow_config.json`
#### 注意事项
- 密码信息应妥善保管,不要泄露
- 配置完成后其他API测试工作流将自动使用这些配置信息
- 如需修改配置,重新运行此命令即可

View File

@ -0,0 +1,149 @@
import json
import os
import sys
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), ".."))
from lib.chatmark import Form, TextEditor # 导入 ChatMark 组件
def read_global_config():
"""读取全局配置信息"""
config_path = os.path.join(os.path.expanduser("~/.chat"), ".workflow_config.json")
config_data = {}
if os.path.exists(config_path):
with open(config_path, "r", encoding="utf-8") as f:
config_data = json.load(f)
server_url = config_data.get("api_testing_server_url", "")
username = config_data.get("api_testing_server_username", "")
password = config_data.get("api_testing_server_password", "")
return server_url, username, password
def save_global_config(server_url, username, password):
"""保存全局配置信息"""
config_path = os.path.join(os.path.expanduser("~/.chat"), ".workflow_config.json")
# 确保目录存在
os.makedirs(os.path.dirname(config_path), exist_ok=True)
config_data = {}
if os.path.exists(config_path):
with open(config_path, "r", encoding="utf-8") as f:
config_data = json.load(f)
config_data["api_testing_server_url"] = server_url
config_data["api_testing_server_username"] = username
config_data["api_testing_server_password"] = password
with open(config_path, "w+", encoding="utf-8") as f:
json.dump(config_data, f, indent=4)
def read_repo_config():
"""读取仓库相关配置信息"""
config_path = os.path.join(os.getcwd(), ".chat", ".workflow_config.json")
config_data = {}
if os.path.exists(config_path):
with open(config_path, "r", encoding="utf-8") as f:
config_data = json.load(f)
project_id = config_data.get("test_api_project_id", "")
openapi_url = config_data.get("test_api_openapi_url", "")
version_url = config_data.get("test_api_version_url", "")
return project_id, openapi_url, version_url
def save_repo_config(project_id, openapi_url, version_url):
"""保存仓库相关配置信息"""
config_path = os.path.join(os.getcwd(), ".chat", ".workflow_config.json")
# 确保目录存在
os.makedirs(os.path.dirname(config_path), exist_ok=True)
config_data = {}
if os.path.exists(config_path):
with open(config_path, "r", encoding="utf-8") as f:
config_data = json.load(f)
config_data["test_api_project_id"] = project_id
config_data["test_api_openapi_url"] = openapi_url
config_data["test_api_version_url"] = version_url
with open(config_path, "w+", encoding="utf-8") as f:
json.dump(config_data, f, indent=4)
def main():
print("开始配置 API 测试所需的设置...", end="\n\n", flush=True)
# 读取全局配置
server_url, username, password = read_global_config()
# 读取仓库配置
project_id, openapi_url, version_url = read_repo_config()
# 创建表单组件
server_url_editor = TextEditor(server_url)
username_editor = TextEditor(username)
password_editor = TextEditor(password)
project_id_editor = TextEditor(project_id)
openapi_url_editor = TextEditor(openapi_url)
version_url_editor = TextEditor(version_url)
# 创建表单
form = Form([
"## DevChat API 测试服务器配置",
"请输入服务器 URL (例如: http://kagent.merico.cn:8000):",
server_url_editor,
"请输入用户名:",
username_editor,
"请输入密码:",
password_editor,
"## 仓库配置",
"请输入DevChat API 测试服务器中项目 ID (例如: 37):",
project_id_editor,
"请输入 OpenAPI URL (例如: http://kagent.merico.cn:8080/openapi.json):",
openapi_url_editor,
"请输入版本 URL (例如: http://kagent.merico.cn:8080/version),不输入表示不需要检查服务器测试环境版本:",
version_url_editor,
])
# 渲染表单
form.render()
# 获取用户输入
server_url = server_url_editor.new_text.strip()
username = username_editor.new_text.strip()
password = password_editor.new_text.strip()
project_id = project_id_editor.new_text.strip()
openapi_url = openapi_url_editor.new_text.strip()
version_url = version_url_editor.new_text.strip()
# 保存全局配置
if server_url and username and password:
save_global_config(server_url, username, password)
else:
print("请提供完整的全局配置信息 (SERVER_URL, USERNAME, PASSWORD)。")
sys.exit(1)
# 保存仓库配置
if project_id and openapi_url and version_url:
save_repo_config(project_id, openapi_url, version_url)
else:
print("请提供完整的仓库配置信息 (PROJECT_ID, OPENAPI_URL, VERSION_URL)。")
sys.exit(1)
print("\n配置信息已成功保存!")
print(f"全局配置: SERVER_URL={server_url}, USERNAME={username}, PASSWORD={'*' * len(password)}")
print(f"仓库配置: PROJECT_ID={project_id}, OPENAPI_URL={openapi_url}, VERSION_URL={version_url}")
print("\n您现在可以使用其他 API 测试工作流了。")
sys.exit(0)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,4 @@
description: '配置 API 测试所需的全局和仓库相关设置'
help: README.md
steps:
- run: $devchat_python $command_path/command.py

View File

@ -0,0 +1,245 @@
import os
import subprocess
import sys
import time
import requests
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.append(ROOT_DIR)
from api.utils import PROJECT_ID, SERVER_URL, OPENAPI_URL, VERSION_URL, get_path_op_id, session
from lib.chatmark.step import Step
def get_apidocs():
res = session.get(
f"{SERVER_URL}/autotest/projects/{PROJECT_ID}/apidocs",
params={"page": 1, "size": 100},
)
return res.json()["docs"]
def delete_old_apidocs():
apidocs = get_apidocs()
for apidoc in apidocs:
session.delete(
f"{SERVER_URL}/autotest/projects/{PROJECT_ID}/apidocs/{apidoc['id']}"
)
def get_local_version():
cmd = "git rev-parse HEAD"
res = subprocess.run(cmd, shell=True, check=True, stdout=subprocess.PIPE)
return res.stdout.decode("utf-8").strip()
def check_api_version():
# 如果没有配置VERSION_URL则跳过版本检查
if not VERSION_URL:
print("未配置VERSION_URL跳过API版本检查...")
return
local_version = get_local_version()
print("检查被测服务器文档是否已经更新到最新版本...")
while True:
try:
res = session.get(VERSION_URL)
version = res.json()["version"]
if version == local_version:
print(f"API 文档已更新,当前版本为 {version},开始上传 OpenAPI 文档...")
break
else:
print(
f".",
end="",
flush=True,
)
time.sleep(5)
except Exception as e:
print(f"检查 API 版本失败!{e}", flush=True)
time.sleep(5)
def wait_for_testcase_done(testcase_id):
while True:
try:
res = session.get(
f"{SERVER_URL}/autotest/projects/{PROJECT_ID}/testcases/{testcase_id}"
)
data = res.json()
status = data["status"]
if status == "content_ready":
print("文本用例生成完成!", flush=True)
break
else:
print(
f".",
end="",
flush=True,
)
time.sleep(5)
except Exception as e:
print(f"检查文本用例状态失败!{e}", flush=True)
time.sleep(5)
def wait_for_testcode_done(task_id):
while True:
try:
res = session.get(f"{SERVER_URL}/tasks/{task_id}")
data = res.json()
status = data["status"]
if status == "succeeded":
print("自动测试脚本生成完成!", flush=True)
break
else:
print(
f".",
end="",
flush=True,
)
time.sleep(5)
except Exception as e:
print(f"检查自动测试脚本生成失败!{e}", flush=True)
time.sleep(5)
def get_testcode(testcase_id):
res = session.get(
f"{SERVER_URL}/autotest/projects/{PROJECT_ID}/testcodes",
params={"testcase_id": testcase_id},
)
return res.json()["testcodes"][0]
def wait_for_task_done(task_id):
while True:
try:
res = session.get(f"{SERVER_URL}/tasks/{task_id}")
data = res.json()
status = data["status"]
if status == "succeeded":
print("自动测试脚本执行完成!", flush=True)
break
else:
print(
f".",
end="",
flush=True,
)
time.sleep(5)
except Exception as e:
print(f"检查自动测试脚本状态失败!{e}", flush=True)
time.sleep(5)
def get_testcase(api_path_id):
res = session.get(
f"{SERVER_URL}/autotest/projects/{PROJECT_ID}/testcases",
params={"page": 1, "size": 100, "pathop_id": api_path_id},
)
return res.json()["testcases"][0]
def main():
error_msg = (
"请输入要测试的API名称和测试目标/test.api.upload api_path method test_target"
)
if len(sys.argv) < 2:
print(error_msg)
return
args = sys.argv[1].strip().split(" ")
if len(args) < 3:
print(error_msg)
return
api_path = args[0]
method = args[1]
test_target = " ".join(args[2:])
with Step("检查 API 版本是否更新..."):
check_api_version()
delete_old_apidocs()
with Step(
f"上传 OpenAPI 文档,并且触发 API {api_path} 的测试用例和自动测试脚本生成任务..."
):
# 使用配置的OPENAPI_URL
if not OPENAPI_URL:
print("错误未配置OPENAPI_URL无法获取OpenAPI文档")
return
res = requests.get(
OPENAPI_URL,
)
res = session.post(
f"{SERVER_URL}/autotest/projects/{PROJECT_ID}/apidocs",
files={"file": ("openapi.json", res.content, "application/json")},
data={"apiauth_id": 46},
)
if res.status_code == 200:
print("上传 OpenAPI 文档成功!\n")
else:
print(f"上传 OpenAPI 文档失败!{res.text}", flush=True)
return
apipathop_id = get_path_op_id(api_path, method)
with Step("开始生成文本用例..."):
res = session.post(
f"{SERVER_URL}/autotest/projects/{PROJECT_ID}/testcases",
params={"generate_content": True},
json={"apipathop_id": apipathop_id, "title": test_target},
)
if res.status_code == 200:
print("提交生成文本用例成功!等待生成完成...", flush=True)
testcase_id = res.json()["id"]
wait_for_testcase_done(testcase_id)
else:
print(f"提交生成文本用例失败!{res.text}", flush=True)
return
with Step("开始生成自动测试脚本..."):
res = session.post(
f"{SERVER_URL}/autotest/projects/{PROJECT_ID}/tasks/testcode",
params={"testcase_id": testcase_id},
)
if res.status_code == 200:
print("提交生成自动测试脚本成功!等待生成完成...", flush=True)
task_id = res.json()["id"]
wait_for_testcode_done(task_id)
else:
print(f"提交生成自动测试脚本失败!{res.text}", flush=True)
return
with Step("开始执行自动测试脚本..."):
testcode = get_testcode(testcase_id)
testcode_id = testcode["id"]
res = session.post(
f"{SERVER_URL}/autotest/projects/{PROJECT_ID}/testcodes/{testcode_id}/exec",
)
if res.status_code == 200:
print("提交执行自动测试脚本成功!", flush=True)
else:
print(f"提交执行自动测试脚本失败!{res.text}")
return
api_path_id = get_path_op_id(api_path, method)
with Step("开始查询测试脚本执行结果..."):
while True:
res = session.get(
f"{SERVER_URL}/autotest/projects/{PROJECT_ID}/tasks",
params={"page": 1, "size": 1},
)
ret = res.json()
task = ret["tasks"][0]
task_id = task["id"]
wait_for_task_done(task_id)
testcase = get_testcase(api_path_id)
last_testcode_passed = testcase["last_testcode_passed"]
if last_testcode_passed:
print("测试脚本执行成功!", flush=True)
break
else:
print("测试脚本执行失败!", flush=True)
break
if __name__ == "__main__":
main()

View File

@ -0,0 +1,4 @@
description: 上传 API 文档,生成目标 API 的测试用例和测试脚本并执行测试代码。输入形式为APIPATH METHOD API重构描述
input: required
steps:
- run: $devchat_python $command_path/command.py "$input"

132
community/test/api/utils.py Normal file
View File

@ -0,0 +1,132 @@
import os
import json
import requests
import sys
from lib.workflow.call import workflow_call
# 默认配置,仅在无法读取配置文件时使用
session = requests.Session()
_is_login = False
def read_config():
"""读取配置文件中的设置"""
# 读取全局配置
global_config_path = os.path.join(os.path.expanduser("~/.chat"), ".workflow_config.json")
global_config = {}
if os.path.exists(global_config_path):
try:
with open(global_config_path, "r", encoding="utf-8") as f:
global_config = json.load(f)
except Exception:
pass
# 读取仓库配置
repo_config_path = os.path.join(os.getcwd(), ".chat", ".workflow_config.json")
repo_config = {}
if os.path.exists(repo_config_path):
try:
with open(repo_config_path, "r", encoding="utf-8") as f:
repo_config = json.load(f)
except Exception:
pass
# 获取配置值
server_url = global_config.get("api_testing_server_url", "")
username = global_config.get("api_testing_server_username", "")
password = global_config.get("api_testing_server_password", "")
project_id = repo_config.get("test_api_project_id", "")
openapi_url = repo_config.get("test_api_openapi_url", "")
version_url = repo_config.get("test_api_version_url", "")
return server_url, username, password, project_id, openapi_url, version_url
def ensure_config():
"""确保配置存在,如果不存在则调用配置工作流"""
global_config_path = os.path.join(os.path.expanduser("~/.chat"), ".workflow_config.json")
repo_config_path = os.path.join(os.getcwd(), ".chat", ".workflow_config.json")
# 检查全局配置和仓库配置是否存在
global_config_exists = os.path.exists(global_config_path)
repo_config_exists = os.path.exists(repo_config_path)
# 检查必填配置项是否存在
config_valid = True
if global_config_exists and repo_config_exists:
# 读取配置
global_config = {}
repo_config = {}
try:
with open(global_config_path, "r", encoding="utf-8") as f:
global_config = json.load(f)
with open(repo_config_path, "r", encoding="utf-8") as f:
repo_config = json.load(f)
except Exception:
config_valid = False
# 检查必填项
if (not global_config.get("api_testing_server_url") or
not global_config.get("api_testing_server_username") or
not global_config.get("api_testing_server_password") or
not repo_config.get("test_api_project_id") or
not repo_config.get("test_api_openapi_url")):
config_valid = False
else:
config_valid = False
if not config_valid:
print("缺少API测试所需的配置将启动配置向导...")
workflow_call("/test.api.config")
# 重新检查配置是否已创建并包含必要项
try:
if os.path.exists(global_config_path) and os.path.exists(repo_config_path):
with open(global_config_path, "r", encoding="utf-8") as f:
global_config = json.load(f)
with open(repo_config_path, "r", encoding="utf-8") as f:
repo_config = json.load(f)
if (global_config.get("api_testing_server_url") and
global_config.get("api_testing_server_username") and
global_config.get("api_testing_server_password") and
repo_config.get("test_api_project_id") and
repo_config.get("test_api_openapi_url")):
return True
print("配置失败")
return False
except Exception:
print("配置失败")
return False
return True
# 读取配置
result = ensure_config()
if not result:
print("配置失败,工作流不能继续执行")
exit(0)
SERVER_URL, USERNAME, PASSWORD, PROJECT_ID, OPENAPI_URL, VERSION_URL = read_config()
def login():
global _is_login
if _is_login:
return
session.post(
f"{SERVER_URL}/user/auth/login",
data={"username": USERNAME, "password": PASSWORD, "grant_type": "password"},
)
_is_login = True
def get_path_op_id(keyword: str, method: str):
res = session.get(
f"{SERVER_URL}/autotest/projects/{PROJECT_ID}/apipathops",
params={"keyword": keyword, "page": 1, "size": 20},
)
for pathop in res.json()["pathops"]:
if pathop["method"].lower().strip() == method.lower().strip():
return pathop["id"]
login()