605 lines
19 KiB
Python
605 lines
19 KiB
Python
import json
|
||
import os
|
||
import re
|
||
import subprocess
|
||
import sys
|
||
import time
|
||
|
||
import requests
|
||
|
||
from lib.chatmark import TextEditor
|
||
from lib.ide_service import IDEService
|
||
|
||
|
||
def read_gitlab_token():
|
||
config_path = os.path.join(os.path.expanduser("~/.chat"), ".workflow_config.json")
|
||
if os.path.exists(config_path):
|
||
with open(config_path, "r", encoding="utf-8") as f:
|
||
config_data = json.load(f)
|
||
if "gitlab_token" in config_data:
|
||
return config_data["gitlab_token"]
|
||
|
||
# ask user to input gitlab token
|
||
server_access_token_editor = TextEditor("", "Please input your GitLab access TOKEN to access:")
|
||
server_access_token_editor.render()
|
||
|
||
server_access_token = server_access_token_editor.new_text
|
||
if not server_access_token:
|
||
print("Please input your GitLab access TOKEN to continue.")
|
||
sys.exit(-1)
|
||
return server_access_token
|
||
|
||
|
||
current_repo_dir = None
|
||
|
||
|
||
def get_current_repo():
|
||
"""
|
||
获取当前文件所在的仓库信息
|
||
"""
|
||
global current_repo_dir
|
||
|
||
if not current_repo_dir:
|
||
selected_data = IDEService().get_selected_range().dict()
|
||
current_file = selected_data.get("abspath", None)
|
||
if not current_file:
|
||
return None
|
||
current_dir = os.path.dirname(current_file)
|
||
try:
|
||
# 获取仓库根目录
|
||
current_repo_dir = (
|
||
subprocess.check_output(
|
||
["git", "rev-parse", "--show-toplevel"],
|
||
stderr=subprocess.DEVNULL,
|
||
cwd=current_dir,
|
||
)
|
||
.decode("utf-8")
|
||
.strip()
|
||
)
|
||
except subprocess.CalledProcessError:
|
||
# 如果发生错误,可能不在git仓库中
|
||
return None
|
||
return current_repo_dir
|
||
|
||
|
||
def subprocess_check_output(*popenargs, timeout=None, **kwargs):
|
||
# 将 current_dir 添加到 kwargs 中的 cwd 参数
|
||
current_repo = get_current_repo()
|
||
if current_repo:
|
||
kwargs["cwd"] = kwargs.get("cwd", current_repo)
|
||
|
||
# 调用 subprocess.check_output
|
||
return subprocess.check_output(*popenargs, timeout=timeout, **kwargs)
|
||
|
||
|
||
def subprocess_run(
|
||
*popenargs, input=None, capture_output=False, timeout=None, check=False, **kwargs
|
||
):
|
||
current_repo = get_current_repo()
|
||
if current_repo:
|
||
kwargs["cwd"] = kwargs.get("cwd", current_repo)
|
||
|
||
# 调用 subprocess.run
|
||
return subprocess.run(
|
||
*popenargs,
|
||
input=input,
|
||
capture_output=capture_output,
|
||
timeout=timeout,
|
||
check=check,
|
||
**kwargs,
|
||
)
|
||
|
||
|
||
def subprocess_call(*popenargs, timeout=None, **kwargs):
|
||
current_repo = get_current_repo()
|
||
if current_repo:
|
||
kwargs["cwd"] = kwargs.get("cwd", current_repo)
|
||
|
||
# 调用 subprocess.call
|
||
return subprocess.call(*popenargs, timeout=timeout, **kwargs)
|
||
|
||
|
||
def subprocess_check_call(*popenargs, timeout=None, **kwargs):
|
||
current_repo = get_current_repo()
|
||
if current_repo:
|
||
kwargs["cwd"] = kwargs.get("cwd", current_repo)
|
||
|
||
# 调用 subprocess.check_call
|
||
return subprocess.check_call(*popenargs, timeout=timeout, **kwargs)
|
||
|
||
|
||
GITLAB_ACCESS_TOKEN = read_gitlab_token()
|
||
GITLAB_API_URL = "https://gitlab.com/api/v4"
|
||
|
||
|
||
def create_issue(title, description):
|
||
headers = {
|
||
"Private-Token": GITLAB_ACCESS_TOKEN,
|
||
"Content-Type": "application/json",
|
||
}
|
||
data = {
|
||
"title": title,
|
||
"description": description,
|
||
}
|
||
project_id = get_gitlab_project_id()
|
||
issue_api_url = f"{GITLAB_API_URL}/projects/{project_id}/issues"
|
||
response = requests.post(issue_api_url, headers=headers, json=data)
|
||
|
||
if response.status_code == 201:
|
||
print("Issue created successfully!")
|
||
return response.json()
|
||
else:
|
||
print(f"Failed to create issue: {response.content}", file=sys.stderr, end="\n\n")
|
||
return None
|
||
|
||
|
||
def update_issue_body(issue_iid, issue_body):
|
||
headers = {
|
||
"Private-Token": GITLAB_ACCESS_TOKEN,
|
||
"Content-Type": "application/json",
|
||
}
|
||
data = {
|
||
"description": issue_body,
|
||
}
|
||
|
||
project_id = get_gitlab_project_id()
|
||
api_url = f"{GITLAB_API_URL}/projects/{project_id}/issues/{issue_iid}"
|
||
response = requests.put(api_url, headers=headers, json=data)
|
||
|
||
if response.status_code == 200:
|
||
print("Issue updated successfully!")
|
||
return response.json()
|
||
else:
|
||
print(f"Failed to update issue: {response.status_code}")
|
||
return None
|
||
|
||
|
||
def get_gitlab_project_id():
|
||
try:
|
||
result = subprocess_check_output(
|
||
["git", "remote", "get-url", "origin"], stderr=subprocess.STDOUT
|
||
).strip()
|
||
repo_url = result.decode("utf-8")
|
||
print(f"Original repo URL: {repo_url}", file=sys.stderr)
|
||
|
||
if repo_url.startswith("git@"):
|
||
# Handle SSH URL format
|
||
parts = repo_url.split(':')
|
||
project_path = parts[1].replace(".git", "")
|
||
elif repo_url.startswith("https://"):
|
||
# Handle HTTPS URL format
|
||
parts = repo_url.split('/')
|
||
project_path = '/'.join(parts[3:]).replace(".git", "")
|
||
else:
|
||
raise ValueError(f"Unsupported Git URL format: {repo_url}")
|
||
|
||
print(f"Extracted project path: {project_path}", file=sys.stderr)
|
||
encoded_project_path = requests.utils.quote(project_path, safe='')
|
||
print(f"Encoded project path: {encoded_project_path}", file=sys.stderr)
|
||
return encoded_project_path
|
||
except subprocess.CalledProcessError as e:
|
||
print(f"Error executing git command: {e}", file=sys.stderr)
|
||
return None
|
||
except Exception as e:
|
||
print(f"Error in get_gitlab_project_id: {e}", file=sys.stderr)
|
||
return None
|
||
|
||
# parse sub tasks in issue description
|
||
def parse_sub_tasks(description):
|
||
sub_tasks = []
|
||
lines = description.split("\n")
|
||
for line in lines:
|
||
if line.startswith("- ["):
|
||
sub_tasks.append(line[2:])
|
||
return sub_tasks
|
||
|
||
|
||
def update_sub_tasks(description, tasks):
|
||
# remove all existing tasks
|
||
lines = description.split("\n")
|
||
updated_body = "\n".join(line for line in lines if not line.startswith("- ["))
|
||
|
||
# add new tasks
|
||
updated_body += "\n" + "\n".join(f"- {task}" for task in tasks)
|
||
|
||
return updated_body
|
||
|
||
|
||
def update_task_issue_url(description, task, issue_url):
|
||
# task is like:
|
||
# [ ] task name
|
||
# [x] task name
|
||
# replace task name with issue url, like:
|
||
# [ ] [task name](url)
|
||
# [x] [task name](url)
|
||
if task.find("] ") == -1:
|
||
return None
|
||
task = task[task.find("] ") + 2 :]
|
||
return description.replace(task, f"[{task}]({issue_url})")
|
||
|
||
|
||
def check_git_installed():
|
||
"""
|
||
Check if Git is installed on the local machine.
|
||
|
||
Tries to execute 'git --version' command to determine the presence of Git.
|
||
|
||
Returns:
|
||
bool: True if Git is installed, False otherwise.
|
||
"""
|
||
try:
|
||
subprocess_run(
|
||
["git", "--version"], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
|
||
)
|
||
return True
|
||
except subprocess.CalledProcessError:
|
||
print("Git is not installed on your system.")
|
||
return False
|
||
|
||
|
||
def create_and_checkout_branch(branch_name):
|
||
subprocess_run(["git", "checkout", "-b", branch_name], check=True)
|
||
|
||
|
||
def is_issue_url(task):
|
||
task = task.strip()
|
||
|
||
# 使用正则表达式匹配 http 或 https 开头,issues/数字 结尾的 URL
|
||
pattern = r'^(http|https)://.*?/issues/\d+$'
|
||
|
||
is_issue = bool(re.match(pattern, task))
|
||
|
||
# print(f"Task to check: {task}", file=sys.stderr)
|
||
# print(f"Is issue URL: {is_issue}", file=sys.stderr)
|
||
|
||
return is_issue
|
||
|
||
|
||
def read_issue_by_url(issue_url):
|
||
# Extract the issue number and project path from the URL
|
||
issue_url = issue_url.replace("/-/", "/")
|
||
parts = issue_url.split('/')
|
||
issue_number = parts[-1]
|
||
project_path = '/'.join(parts[3:-2]) # Assumes URL format: https://gitlab.com/project/path/-/issues/number
|
||
|
||
# URL encode the project path
|
||
encoded_project_path = requests.utils.quote(project_path, safe='')
|
||
|
||
# Construct the API endpoint URL
|
||
api_url = f"{GITLAB_API_URL}/projects/{encoded_project_path}/issues/{issue_number}"
|
||
|
||
# Send a GET request to the API endpoint
|
||
headers = {
|
||
"Private-Token": GITLAB_ACCESS_TOKEN,
|
||
"Content-Type": "application/json",
|
||
}
|
||
response = requests.get(api_url, headers=headers)
|
||
|
||
if response.status_code == 200:
|
||
return response.json()
|
||
else:
|
||
print(f"Error fetching issue: {response.status_code}", file=sys.stderr)
|
||
print(f"Response content: {response.text}", file=sys.stderr)
|
||
return None
|
||
|
||
|
||
def get_gitlab_issue_repo(issue_repo=False):
|
||
try:
|
||
config_path = os.path.join(os.getcwd(), ".chat", ".workflow_config.json")
|
||
if os.path.exists(config_path) and issue_repo:
|
||
with open(config_path, "r", encoding="utf-8") as f:
|
||
config_data = json.load(f)
|
||
|
||
if "git_issue_repo" in config_data:
|
||
issue_repo = requests.utils.quote(config_data["git_issue_repo"], safe='')
|
||
print(
|
||
"current issue repo:",
|
||
config_data["git_issue_repo"],
|
||
end="\n\n",
|
||
file=sys.stderr,
|
||
flush=True,
|
||
)
|
||
return config_data["git_issue_repo"]
|
||
|
||
return get_gitlab_project_id()
|
||
except subprocess.CalledProcessError as e:
|
||
print(e)
|
||
# 如果发生错误,打印错误信息
|
||
return None
|
||
except FileNotFoundError:
|
||
# 如果未找到git命令,可能是没有安装git或者不在PATH中
|
||
print("==> File not found...")
|
||
return None
|
||
|
||
|
||
# 获取当前分支名称
|
||
def get_current_branch():
|
||
try:
|
||
# 使用git命令获取当前分支名称
|
||
result = subprocess_check_output(
|
||
["git", "branch", "--show-current"], stderr=subprocess.STDOUT
|
||
).strip()
|
||
# 将结果从bytes转换为str
|
||
current_branch = result.decode("utf-8")
|
||
return current_branch
|
||
except subprocess.CalledProcessError:
|
||
# 如果发生错误,打印错误信息
|
||
return None
|
||
except FileNotFoundError:
|
||
# 如果未找到git命令,可能是没有安装git或者不在PATH中
|
||
return None
|
||
|
||
|
||
def get_parent_branch():
|
||
current_branch = get_current_branch()
|
||
if current_branch is None:
|
||
return None
|
||
try:
|
||
# 使用git命令获取当前分支的父分支引用
|
||
result = subprocess_check_output(
|
||
["git", "rev-parse", "--abbrev-ref", f"{current_branch}@{1}"], stderr=subprocess.STDOUT
|
||
).strip()
|
||
# 将结果从bytes转换为str
|
||
parent_branch_ref = result.decode("utf-8")
|
||
if parent_branch_ref == current_branch:
|
||
# 如果父分支引用和当前分支相同,说明当前分支可能是基于一个没有父分支的提交创建的
|
||
return None
|
||
# 使用git命令获取父分支的名称
|
||
result = subprocess_check_output(
|
||
["git", "name-rev", "--name-only", "--exclude=tags/*", parent_branch_ref],
|
||
stderr=subprocess.STDOUT,
|
||
).strip()
|
||
parent_branch_name = result.decode("utf-8")
|
||
return parent_branch_name
|
||
except subprocess.CalledProcessError as e:
|
||
print(e)
|
||
# 如果发生错误,打印错误信息
|
||
return None
|
||
except FileNotFoundError:
|
||
# 如果未找到git命令,可能是没有安装git或者不在PATH中
|
||
print("==> File not found...")
|
||
return None
|
||
|
||
|
||
def get_issue_info(issue_id):
|
||
# 获取 GitLab 项目 ID
|
||
project_id = get_gitlab_issue_repo()
|
||
# 构造 GitLab API 端点 URL
|
||
api_url = f"{GITLAB_API_URL}/projects/{project_id}/issues/{issue_id}"
|
||
|
||
# 发送 GET 请求到 API 端点
|
||
headers = {
|
||
"Private-Token": GITLAB_ACCESS_TOKEN,
|
||
"Content-Type": "application/json",
|
||
}
|
||
response = requests.get(api_url, headers=headers)
|
||
|
||
if response.status_code == 200:
|
||
return response.json()
|
||
else:
|
||
print(f"Failed to get issue info. Status code: {response.status_code}", file=sys.stderr)
|
||
print(f"Response content: {response.text}", file=sys.stderr)
|
||
return None
|
||
|
||
|
||
def get_issue_info_by_url(issue_url):
|
||
# get issue id from issue_url
|
||
def get_issue_id(issue_url):
|
||
# Extract the issue id from the issue_url
|
||
issue_id = issue_url.split("/")[-1]
|
||
return issue_id
|
||
|
||
return get_issue_info(get_issue_id(issue_url))
|
||
|
||
|
||
# 获取当前分支自从与base_branch分叉以来的历史提交信息
|
||
def get_commit_messages(base_branch):
|
||
# 找到当前分支与base_branch的分叉点
|
||
merge_base = subprocess_run(
|
||
["git", "merge-base", "HEAD", base_branch],
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.PIPE,
|
||
text=True,
|
||
)
|
||
|
||
# 检查是否成功找到分叉点
|
||
if merge_base.returncode != 0:
|
||
raise RuntimeError(f"Error finding merge base: {merge_base.stderr.strip()}")
|
||
|
||
# 获取分叉点的提交哈希
|
||
merge_base_commit = merge_base.stdout.strip()
|
||
|
||
# 获取从分叉点到当前分支的所有提交信息
|
||
result = subprocess_run(
|
||
["git", "log", f"{merge_base_commit}..HEAD"],
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.PIPE,
|
||
text=True,
|
||
)
|
||
|
||
# 检查git log命令是否成功执行
|
||
if result.returncode != 0:
|
||
raise RuntimeError(f"Error retrieving commit messages: {result.stderr.strip()}")
|
||
|
||
# 返回提交信息列表
|
||
return result.stdout
|
||
|
||
|
||
# 创建PR
|
||
def create_pull_request(title, description, source_branch, target_branch, project_id):
|
||
url = f"{GITLAB_API_URL}/projects/{project_id}/merge_requests"
|
||
headers = {"Private-Token": GITLAB_ACCESS_TOKEN, "Content-Type": "application/json"}
|
||
payload = {
|
||
"title": title,
|
||
"description": description,
|
||
"source_branch": source_branch,
|
||
"target_branch": target_branch
|
||
}
|
||
|
||
response = requests.post(url, headers=headers, json=payload)
|
||
if response.status_code == 201:
|
||
response_json = response.json()
|
||
return response_json
|
||
|
||
print(response.text, end="\n\n", file=sys.stderr)
|
||
return None
|
||
|
||
|
||
def get_recently_mr(project_id):
|
||
project_id = requests.utils.quote(project_id, safe='')
|
||
url = f"{GITLAB_API_URL}/projects/{project_id}/merge_requests?state=opened&order_by=updated_at&sort=desc"
|
||
headers = {
|
||
"Private-Token": GITLAB_ACCESS_TOKEN,
|
||
"Content-Type": "application/json",
|
||
}
|
||
response = requests.get(url, headers=headers)
|
||
|
||
branch_name = get_current_branch()
|
||
|
||
if response.status_code == 200:
|
||
mrs = response.json()
|
||
for mr in mrs:
|
||
if mr["source_branch"] == branch_name:
|
||
return mr
|
||
return None
|
||
else:
|
||
return None
|
||
|
||
|
||
def run_command_with_retries(command, retries=3, delay=5):
|
||
for attempt in range(retries):
|
||
try:
|
||
subprocess_check_call(command)
|
||
return True
|
||
except subprocess.CalledProcessError as e:
|
||
print(f"Command failed: {e}")
|
||
if attempt < retries - 1:
|
||
print(f"Retrying... (attempt {attempt + 1}/{retries})")
|
||
time.sleep(delay)
|
||
else:
|
||
print("All retries failed.")
|
||
return False
|
||
|
||
|
||
def update_mr(project_id, mr_iid, title, description):
|
||
project_id = requests.utils.quote(project_id, safe='')
|
||
url = f"{GITLAB_API_URL}/projects/{project_id}/merge_requests/{mr_iid}"
|
||
headers = {"Private-Token": GITLAB_ACCESS_TOKEN, "Content-Type": "application/json"}
|
||
payload = {"title": title, "description": description}
|
||
response = requests.put(url, headers=headers, json=payload)
|
||
|
||
if response.status_code == 200:
|
||
print(f"MR updated successfully: {response.json()['web_url']}")
|
||
return response.json()
|
||
else:
|
||
print("Failed to update MR.")
|
||
return None
|
||
|
||
def check_unpushed_commits():
|
||
try:
|
||
# 获取当前分支的本地提交和远程提交的差异
|
||
result = subprocess_check_output(["git", "cherry", "-v"]).decode("utf-8").strip()
|
||
# 如果结果不为空,说明存在未push的提交
|
||
return bool(result)
|
||
except subprocess.CalledProcessError as e:
|
||
print(f"Error checking for unpushed commits: {e}")
|
||
return True
|
||
|
||
|
||
def auto_push():
|
||
# 获取当前分支名
|
||
if not check_unpushed_commits():
|
||
return True
|
||
try:
|
||
branch = (
|
||
subprocess_check_output(["git", "rev-parse", "--abbrev-ref", "HEAD"])
|
||
.strip()
|
||
.decode("utf-8")
|
||
)
|
||
except subprocess.CalledProcessError as e:
|
||
print(f"Error getting current branch: {e}")
|
||
return False
|
||
|
||
# 检查当前分支是否有对应的远程分支
|
||
remote_branch_exists = subprocess_call(["git", "ls-remote", "--exit-code", "origin", branch])
|
||
|
||
push_command = ["git", "push", "origin", branch]
|
||
if remote_branch_exists == 0:
|
||
# 如果存在远程分支,则直接push提交
|
||
return run_command_with_retries(push_command)
|
||
else:
|
||
# 如果不存在远程分支,则发布并push提交
|
||
push_command.append("-u")
|
||
return run_command_with_retries(push_command)
|
||
|
||
|
||
def get_recently_pr(repo):
|
||
url = f"{GITLAB_API_URL}/repos/{repo}/pulls?state=open&sort=updated"
|
||
headers = {
|
||
"Authorization": f"token {GITLAB_ACCESS_TOKEN}",
|
||
"Accept": "application/vnd.github.v3+json",
|
||
}
|
||
response = requests.get(url, headers=headers)
|
||
|
||
branch_name = get_current_branch()
|
||
|
||
if response.status_code == 200:
|
||
prs = response.json()
|
||
for pr in prs:
|
||
if pr["head"]["ref"] == branch_name:
|
||
return pr
|
||
return None
|
||
else:
|
||
return None
|
||
|
||
|
||
def update_pr(pr_number, title, description, repo_name):
|
||
url = f"{GITLAB_API_URL}/repos/{repo_name}/pulls/{pr_number}"
|
||
headers = {"Authorization": f"token {GITLAB_ACCESS_TOKEN}", "Content-Type": "application/json"}
|
||
payload = {"title": title, "description": description}
|
||
response = requests.patch(url, headers=headers, data=json.dumps(payload))
|
||
|
||
if response.status_code == 200:
|
||
print(f"PR updated successfully: {response.json()['web_url']}")
|
||
return response.json()
|
||
else:
|
||
print("Failed to update PR.")
|
||
return None
|
||
|
||
|
||
def get_last_base_branch(default_branch):
|
||
"""read last base branch from config file"""
|
||
|
||
def read_config_item(config_path, item):
|
||
if os.path.exists(config_path):
|
||
with open(config_path, "r", encoding="utf-8") as f:
|
||
config = json.load(f)
|
||
return config.get(item)
|
||
return None
|
||
|
||
project_config_path = os.path.join(os.getcwd(), ".chat", ".workflow_config.json")
|
||
last_base_branch = read_config_item(project_config_path, "last_base_branch")
|
||
if last_base_branch:
|
||
return last_base_branch
|
||
return default_branch
|
||
|
||
|
||
def save_last_base_branch(base_branch=None):
|
||
"""save last base branch to config file"""
|
||
|
||
def save_config_item(config_path, item, value):
|
||
if os.path.exists(config_path):
|
||
with open(config_path, "r", encoding="utf-8") as f:
|
||
config = json.load(f)
|
||
else:
|
||
config = {}
|
||
|
||
config[item] = value
|
||
with open(config_path, "w", encoding="utf-8") as f:
|
||
json.dump(config, f, indent=4)
|
||
|
||
if not base_branch:
|
||
base_branch = get_current_branch()
|
||
project_config_path = os.path.join(os.getcwd(), ".chat", ".workflow_config.json")
|
||
save_config_item(project_config_path, "last_base_branch", base_branch)
|