long2ice 21fe8dd34f refactor: Update delete_old_apidocs function to accept apidocs as a parameter
This commit modifies the delete_old_apidocs function to take a list of apidocs as an argument, ensuring it operates on the current set of API documentation. Additionally, it updates the main function to pass the retrieved apidocs when calling delete_old_apidocs.
2025-03-21 18:10:21 +08:00

249 lines
7.8 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import subprocess
import sys
import time
import requests
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.append(ROOT_DIR)
# noqa: I001
from api.utils import ( # noqa: E402
OPENAPI_URL,
PROJECT_ID,
SERVER_URL,
VERSION_URL,
get_path_op_id,
session,
)
# noqa: E402
from lib.chatmark.step import Step # noqa: E402
def get_apidocs():
res = session.get(
f"{SERVER_URL}/autotest/projects/{PROJECT_ID}/apidocs",
params={"page": 1, "size": 100},
)
return res.json()["docs"]
def delete_old_apidocs(apidocs):
for apidoc in apidocs:
session.delete(f"{SERVER_URL}/autotest/projects/{PROJECT_ID}/apidocs/{apidoc['id']}")
def get_local_version():
cmd = "git rev-parse HEAD"
res = subprocess.run(cmd, shell=True, check=True, stdout=subprocess.PIPE)
return res.stdout.decode("utf-8").strip()
def check_api_version():
# 如果没有配置VERSION_URL则跳过版本检查
if not VERSION_URL:
print("未配置VERSION_URL跳过API版本检查...")
return
local_version = get_local_version()
print("检查被测服务器文档是否已经更新到最新版本...")
while True:
try:
res = session.get(VERSION_URL)
version = res.json()["version"]
if version == local_version:
print(f"API 文档已更新,当前版本为 {version},开始上传 OpenAPI 文档...")
break
else:
print(
".",
end="",
flush=True,
)
time.sleep(5)
except Exception as e:
print(f"检查 API 版本失败!{e}", flush=True)
time.sleep(5)
def wait_for_testcase_done(testcase_id):
while True:
try:
res = session.get(
f"{SERVER_URL}/autotest/projects/{PROJECT_ID}/testcases/{testcase_id}"
)
data = res.json()
status = data["status"]
if status == "content_ready":
print("文本用例生成完成!", flush=True)
break
else:
print(
".",
end="",
flush=True,
)
time.sleep(5)
except Exception as e:
print(f"检查文本用例状态失败!{e}", flush=True)
time.sleep(5)
def wait_for_testcode_done(task_id):
while True:
try:
res = session.get(f"{SERVER_URL}/tasks/{task_id}")
data = res.json()
status = data["status"]
if status == "succeeded":
print("自动测试脚本生成完成!", flush=True)
break
else:
print(
".",
end="",
flush=True,
)
time.sleep(5)
except Exception as e:
print(f"检查自动测试脚本生成失败!{e}", flush=True)
time.sleep(5)
def get_testcode(testcase_id):
res = session.get(
f"{SERVER_URL}/autotest/projects/{PROJECT_ID}/testcodes",
params={"testcase_id": testcase_id},
)
return res.json()["testcodes"][0]
def wait_for_task_done(task_id):
while True:
try:
res = session.get(f"{SERVER_URL}/tasks/{task_id}")
data = res.json()
status = data["status"]
if status == "succeeded":
print("自动测试脚本执行完成!", flush=True)
break
else:
print(
".",
end="",
flush=True,
)
time.sleep(5)
except Exception as e:
print(f"检查自动测试脚本状态失败!{e}", flush=True)
time.sleep(5)
def get_testcase(api_path_id):
res = session.get(
f"{SERVER_URL}/autotest/projects/{PROJECT_ID}/testcases",
params={"page": 1, "size": 100, "pathop_id": api_path_id},
)
return res.json()["testcases"][0]
def main():
error_msg = "请输入要测试的API名称和测试目标/test.api.upload api_path method test_target"
if len(sys.argv) < 2:
print(error_msg)
return
args = sys.argv[1].strip().split(" ")
if len(args) < 3:
print(error_msg)
return
api_path = args[0]
method = args[1]
test_target = " ".join(args[2:])
docs = get_apidocs()
with Step("检查 API 版本是否更新..."):
check_api_version()
delete_old_apidocs(docs)
with Step(f"上传 OpenAPI 文档,并且触发 API {api_path} 的测试用例和自动测试脚本生成任务..."):
# 使用配置的OPENAPI_URL
if not OPENAPI_URL:
print("错误未配置OPENAPI_URL无法获取OpenAPI文档")
return
res = requests.get(
OPENAPI_URL,
)
res = session.post(
f"{SERVER_URL}/autotest/projects/{PROJECT_ID}/apidocs",
files={"file": ("openapi.json", res.content, "application/json")},
data={"apiauth_id": docs[0]["apiauth_id"]},
)
if res.status_code == 200:
print("上传 OpenAPI 文档成功!\n")
else:
print(f"上传 OpenAPI 文档失败!{res.text}", flush=True)
return
apipathop_id = get_path_op_id(api_path, method)
with Step("开始生成文本用例..."):
res = session.post(
f"{SERVER_URL}/autotest/projects/{PROJECT_ID}/testcases",
params={"generate_content": True},
json={"apipathop_id": apipathop_id, "title": test_target},
)
if res.status_code == 200:
print("提交生成文本用例成功!等待生成完成...", flush=True)
testcase_id = res.json()["id"]
wait_for_testcase_done(testcase_id)
else:
print(f"提交生成文本用例失败!{res.text}", flush=True)
return
with Step("开始生成自动测试脚本..."):
res = session.post(
f"{SERVER_URL}/autotest/projects/{PROJECT_ID}/tasks/testcode",
params={"testcase_id": testcase_id},
)
if res.status_code == 200:
print("提交生成自动测试脚本成功!等待生成完成...", flush=True)
task_id = res.json()["id"]
wait_for_testcode_done(task_id)
else:
print(f"提交生成自动测试脚本失败!{res.text}", flush=True)
return
with Step("开始执行自动测试脚本..."):
testcode = get_testcode(testcase_id)
testcode_id = testcode["id"]
res = session.post(
f"{SERVER_URL}/autotest/projects/{PROJECT_ID}/testcodes/{testcode_id}/exec",
)
if res.status_code == 200:
print("提交执行自动测试脚本成功!", flush=True)
else:
print(f"提交执行自动测试脚本失败!{res.text}")
return
api_path_id = get_path_op_id(api_path, method)
with Step("开始查询测试脚本执行结果..."):
while True:
res = session.get(
f"{SERVER_URL}/autotest/projects/{PROJECT_ID}/tasks",
params={"page": 1, "size": 1},
)
ret = res.json()
task = ret["tasks"][0]
task_id = task["id"]
wait_for_task_done(task_id)
testcase = get_testcase(api_path_id)
last_testcode_passed = testcase["last_testcode_passed"]
if last_testcode_passed:
print("测试脚本执行成功!", flush=True)
break
else:
print("测试脚本执行失败!", flush=True)
break
if __name__ == "__main__":
main()