
- Implement `get_selected_issue_ids()` to allow users to select and close GitHub issues during commit - Add support for automatically appending "Closes #IssueNumber" to commit messages - Enhance commit workflow with dynamic issue tracking and resolution - Integrate new GitHub API functions to retrieve user's assigned open issues
582 lines
18 KiB
Python
582 lines
18 KiB
Python
import json
|
||
import os
|
||
import subprocess
|
||
import sys
|
||
import time
|
||
|
||
import requests
|
||
|
||
from lib.chatmark import TextEditor
|
||
from lib.ide_service import IDEService
|
||
|
||
|
||
def read_github_token():
|
||
config_path = os.path.join(os.path.expanduser("~/.chat"), ".workflow_config.json")
|
||
if os.path.exists(config_path):
|
||
with open(config_path, "r", encoding="utf-8") as f:
|
||
config_data = json.load(f)
|
||
if "github_token" in config_data:
|
||
return config_data["github_token"]
|
||
|
||
# ask user to input github token
|
||
server_access_token_editor = TextEditor("", "Please input your GITHUB access TOKEN to access:")
|
||
server_access_token_editor.render()
|
||
|
||
server_access_token = server_access_token_editor.new_text
|
||
if not server_access_token:
|
||
print("Please input your GITHUB access TOKEN to continue.")
|
||
sys.exit(-1)
|
||
return server_access_token
|
||
|
||
|
||
current_repo_dir = None
|
||
|
||
|
||
def get_current_repo():
|
||
"""
|
||
获取当前文件所在的仓库信息
|
||
"""
|
||
global current_repo_dir
|
||
|
||
if not current_repo_dir:
|
||
selected_data = IDEService().get_selected_range().dict()
|
||
current_file = selected_data.get("abspath", None)
|
||
if not current_file:
|
||
return None
|
||
current_dir = os.path.dirname(current_file)
|
||
try:
|
||
# 获取仓库根目录
|
||
current_repo_dir = (
|
||
subprocess.check_output(
|
||
["git", "rev-parse", "--show-toplevel"],
|
||
stderr=subprocess.DEVNULL,
|
||
cwd=current_dir,
|
||
)
|
||
.decode("utf-8")
|
||
.strip()
|
||
)
|
||
except subprocess.CalledProcessError:
|
||
# 如果发生错误,可能不在git仓库中
|
||
return None
|
||
return current_repo_dir
|
||
|
||
|
||
def subprocess_check_output(*popenargs, timeout=None, **kwargs):
|
||
# 将 current_dir 添加到 kwargs 中的 cwd 参数
|
||
current_repo = get_current_repo()
|
||
if current_repo:
|
||
kwargs["cwd"] = kwargs.get("cwd", current_repo)
|
||
|
||
# 调用 subprocess.check_output
|
||
return subprocess.check_output(*popenargs, timeout=timeout, **kwargs)
|
||
|
||
|
||
def subprocess_run(
|
||
*popenargs, input=None, capture_output=False, timeout=None, check=False, **kwargs
|
||
):
|
||
current_repo = get_current_repo()
|
||
if current_repo:
|
||
kwargs["cwd"] = kwargs.get("cwd", current_repo)
|
||
|
||
# 调用 subprocess.run
|
||
return subprocess.run(
|
||
*popenargs,
|
||
input=input,
|
||
capture_output=capture_output,
|
||
timeout=timeout,
|
||
check=check,
|
||
**kwargs,
|
||
)
|
||
|
||
|
||
def subprocess_call(*popenargs, timeout=None, **kwargs):
|
||
current_repo = get_current_repo()
|
||
if current_repo:
|
||
kwargs["cwd"] = kwargs.get("cwd", current_repo)
|
||
|
||
# 调用 subprocess.call
|
||
return subprocess.call(*popenargs, timeout=timeout, **kwargs)
|
||
|
||
|
||
def subprocess_check_call(*popenargs, timeout=None, **kwargs):
|
||
current_repo = get_current_repo()
|
||
if current_repo:
|
||
kwargs["cwd"] = kwargs.get("cwd", current_repo)
|
||
|
||
# 调用 subprocess.check_call
|
||
return subprocess.check_call(*popenargs, timeout=timeout, **kwargs)
|
||
|
||
|
||
GITHUB_ACCESS_TOKEN = read_github_token()
|
||
GITHUB_API_URL = "https://api.github.com"
|
||
|
||
|
||
def create_issue(title, body):
|
||
headers = {
|
||
"Authorization": f"token {GITHUB_ACCESS_TOKEN}",
|
||
"Accept": "application/vnd.github.v3+json",
|
||
}
|
||
data = {
|
||
"title": title,
|
||
"body": body,
|
||
}
|
||
issue_api_url = f"https://api.github.com/repos/{get_github_repo(True)}/issues"
|
||
response = requests.post(issue_api_url, headers=headers, data=json.dumps(data))
|
||
|
||
if response.status_code == 201:
|
||
print("Issue created successfully!")
|
||
return response.json()
|
||
else:
|
||
print(f"Failed to create issue: {response.content}", file=sys.stderr, end="\n\n")
|
||
return None
|
||
|
||
|
||
def update_issue_body(issue_url, issue_body):
|
||
"""
|
||
Update the body text of a GitHub issue.
|
||
|
||
:param issue_url: The API URL of the issue to update.
|
||
:param issue_body: The new body text for the issue.
|
||
"""
|
||
headers = {
|
||
"Authorization": f"token {GITHUB_ACCESS_TOKEN}",
|
||
"Accept": "application/vnd.github.v3+json",
|
||
}
|
||
data = {
|
||
"body": issue_body,
|
||
}
|
||
|
||
issue_api_url = f"https://api.github.com/repos/{get_github_repo(True)}/issues"
|
||
api_url = f"{issue_api_url}/{issue_url.split('/')[-1]}"
|
||
response = requests.patch(api_url, headers=headers, data=json.dumps(data))
|
||
|
||
if response.status_code == 200:
|
||
print("Issue updated successfully!")
|
||
return response.json()
|
||
else:
|
||
print(f"Failed to update issue: {response.status_code}")
|
||
return None
|
||
|
||
|
||
# parse sub tasks in issue body
|
||
def parse_sub_tasks(body):
|
||
sub_tasks = []
|
||
lines = body.split("\n")
|
||
for line in lines:
|
||
if line.startswith("- ["):
|
||
sub_tasks.append(line[2:])
|
||
return sub_tasks
|
||
|
||
|
||
def update_sub_tasks(body, tasks):
|
||
# remove all existing tasks
|
||
lines = body.split("\n")
|
||
updated_body = "\n".join(line for line in lines if not line.startswith("- ["))
|
||
|
||
# add new tasks
|
||
updated_body += "\n" + "\n".join(f"- {task}" for task in tasks)
|
||
|
||
return updated_body
|
||
|
||
|
||
def update_task_issue_url(body, task, issue_url):
|
||
# task is like:
|
||
# [ ] task name
|
||
# [x] task name
|
||
# replace task name with issue url, like:
|
||
# [ ] [task name](url)
|
||
# [x] [task name](url)
|
||
if task.find("] ") == -1:
|
||
return None
|
||
task = task[task.find("] ") + 2 :]
|
||
return body.replace(task, f"[{task}]({issue_url})")
|
||
|
||
|
||
def check_git_installed():
|
||
"""
|
||
Check if Git is installed on the local machine.
|
||
|
||
Tries to execute 'git --version' command to determine the presence of Git.
|
||
|
||
Returns:
|
||
bool: True if Git is installed, False otherwise.
|
||
"""
|
||
try:
|
||
subprocess_run(
|
||
["git", "--version"],
|
||
check=True,
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.PIPE,
|
||
)
|
||
return True
|
||
except subprocess.CalledProcessError:
|
||
print("Git is not installed on your system.")
|
||
return False
|
||
|
||
|
||
def create_and_checkout_branch(branch_name):
|
||
subprocess_run(["git", "checkout", "-b", branch_name], check=True)
|
||
|
||
|
||
def is_issue_url(task):
|
||
issue_url = f"https://github.com/{get_github_repo(True)}/issues"
|
||
return task.strip().startswith(issue_url)
|
||
|
||
|
||
def read_issue_by_url(issue_url):
|
||
issue_number = issue_url.split("/")[-1]
|
||
|
||
# Construct the API endpoint URL
|
||
issue_api_url = f"https://api.github.com/repos/{get_github_repo(True)}/issues"
|
||
api_url = f"{issue_api_url}/{issue_number}"
|
||
|
||
# Send a GET request to the API endpoint
|
||
headers = {
|
||
"Accept": "application/vnd.github.v3+json",
|
||
"Authorization": f"token {GITHUB_ACCESS_TOKEN}",
|
||
}
|
||
response = requests.get(api_url, headers=headers)
|
||
|
||
if response.status_code == 200:
|
||
return response.json()
|
||
else:
|
||
return None
|
||
|
||
|
||
def get_github_repo(issue_repo=False):
|
||
try:
|
||
config_path = os.path.join(os.getcwd(), ".chat", ".workflow_config.json")
|
||
if os.path.exists(config_path) and issue_repo:
|
||
with open(config_path, "r", encoding="utf-8") as f:
|
||
config_data = json.load(f)
|
||
if "issue_repo" in config_data:
|
||
print(
|
||
"current issue repo:",
|
||
config_data["issue_repo"],
|
||
end="\n\n",
|
||
file=sys.stderr,
|
||
flush=True,
|
||
)
|
||
return config_data["issue_repo"]
|
||
|
||
# 使用git命令获取当前仓库的URL
|
||
result = subprocess_check_output(
|
||
["git", "remote", "get-url", "origin"], stderr=subprocess.STDOUT
|
||
).strip()
|
||
# 将结果从bytes转换为str并提取出仓库信息
|
||
repo_url = result.decode("utf-8")
|
||
# 假设repo_url的格式为:https://github.com/username/repo.git
|
||
parts = repo_url.split("/")
|
||
repo = parts[-1].replace(".git", "")
|
||
username = parts[-2].split(":")[-1]
|
||
github_repo = f"{username}/{repo}"
|
||
IDEService().ide_logging("debug", f"current github repo: {github_repo}")
|
||
return github_repo
|
||
except subprocess.CalledProcessError as e:
|
||
print(e)
|
||
# 如果发生错误,打印错误信息
|
||
return None
|
||
except FileNotFoundError:
|
||
# 如果未找到git命令,可能是没有安装git或者不在PATH中
|
||
print("==> File not found...")
|
||
return None
|
||
|
||
|
||
# 获取当前分支名称
|
||
def get_current_branch():
|
||
try:
|
||
# 使用git命令获取当前分支名称
|
||
result = subprocess_check_output(
|
||
["git", "branch", "--show-current"], stderr=subprocess.STDOUT
|
||
).strip()
|
||
# 将结果从bytes转换为str
|
||
current_branch = result.decode("utf-8")
|
||
return current_branch
|
||
except subprocess.CalledProcessError:
|
||
# 如果发生错误,打印错误信息
|
||
return None
|
||
except FileNotFoundError:
|
||
# 如果未找到git命令,可能是没有安装git或者不在PATH中
|
||
return None
|
||
|
||
|
||
def get_parent_branch():
|
||
current_branch = get_current_branch()
|
||
if current_branch is None:
|
||
return None
|
||
try:
|
||
# 使用git命令获取当前分支的父分支引用
|
||
result = subprocess_check_output(
|
||
["git", "rev-parse", "--abbrev-ref", f"{current_branch}@{1}"],
|
||
stderr=subprocess.STDOUT,
|
||
).strip()
|
||
# 将结果从bytes转换为str
|
||
parent_branch_ref = result.decode("utf-8")
|
||
print("==>", parent_branch_ref)
|
||
if parent_branch_ref == current_branch:
|
||
# 如果父分支引用和当前分支相同,说明当前分支可能是基于一个没有父分支的提交创建的
|
||
return None
|
||
# 使用git命令获取父分支的名称
|
||
result = subprocess_check_output(
|
||
["git", "name-rev", "--name-only", "--exclude=tags/*", parent_branch_ref],
|
||
stderr=subprocess.STDOUT,
|
||
).strip()
|
||
parent_branch_name = result.decode("utf-8")
|
||
return parent_branch_name
|
||
except subprocess.CalledProcessError as e:
|
||
print(e)
|
||
# 如果发生错误,打印错误信息
|
||
return None
|
||
except FileNotFoundError:
|
||
# 如果未找到git命令,可能是没有安装git或者不在PATH中
|
||
print("==> File not found...")
|
||
return None
|
||
|
||
|
||
def get_issue_info(issue_id):
|
||
# Construct the API endpoint URL
|
||
issue_api_url = f"https://api.github.com/repos/{get_github_repo(True)}/issues"
|
||
api_url = f"{issue_api_url}/{issue_id}"
|
||
|
||
# Send a GET request to the API endpoint
|
||
headers = {
|
||
"Accept": "application/vnd.github.v3+json",
|
||
"Authorization": f"token {GITHUB_ACCESS_TOKEN}",
|
||
}
|
||
response = requests.get(api_url, headers=headers)
|
||
|
||
if response.status_code == 200:
|
||
return response.json()
|
||
else:
|
||
return None
|
||
|
||
|
||
def get_issue_info_by_url(issue_url):
|
||
# get issue id from issue_url
|
||
def get_issue_id(issue_url):
|
||
# Extract the issue id from the issue_url
|
||
issue_id = issue_url.split("/")[-1]
|
||
return issue_id
|
||
|
||
return get_issue_info(get_issue_id(issue_url))
|
||
|
||
|
||
# 获取当前分支自从与base_branch分叉以来的历史提交信息
|
||
def get_commit_messages(base_branch):
|
||
# 找到当前分支与base_branch的分叉点
|
||
merge_base = subprocess_run(
|
||
["git", "merge-base", "HEAD", base_branch],
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.PIPE,
|
||
text=True,
|
||
)
|
||
|
||
# 检查是否成功找到分叉点
|
||
if merge_base.returncode != 0:
|
||
raise RuntimeError(f"Error finding merge base: {merge_base.stderr.strip()}")
|
||
|
||
# 获取分叉点的提交哈希
|
||
merge_base_commit = merge_base.stdout.strip()
|
||
|
||
# 获取从分叉点到当前分支的所有提交信息
|
||
result = subprocess_run(
|
||
["git", "log", f"{merge_base_commit}..HEAD"],
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.PIPE,
|
||
text=True,
|
||
)
|
||
|
||
# 检查git log命令是否成功执行
|
||
if result.returncode != 0:
|
||
raise RuntimeError(f"Error retrieving commit messages: {result.stderr.strip()}")
|
||
|
||
# 返回提交信息列表
|
||
return result.stdout
|
||
|
||
|
||
# 创建PR
|
||
def create_pull_request(title, body, head, base, repo_name):
|
||
url = f"{GITHUB_API_URL}/repos/{repo_name}/pulls"
|
||
print("url:", url, end="\n\n")
|
||
headers = {
|
||
"Authorization": f"token {GITHUB_ACCESS_TOKEN}",
|
||
"Content-Type": "application/json",
|
||
}
|
||
payload = {"title": title, "body": body, "head": head, "base": base}
|
||
response = requests.post(url, headers=headers, data=json.dumps(payload))
|
||
if response.status_code == 201:
|
||
return response.json()
|
||
print(response.text, end="\n\n", file=sys.stderr)
|
||
return None
|
||
|
||
|
||
def run_command_with_retries(command, retries=3, delay=5):
|
||
for attempt in range(retries):
|
||
try:
|
||
subprocess_check_call(command)
|
||
return True
|
||
except subprocess.CalledProcessError as e:
|
||
print(f"Command failed: {e}")
|
||
if attempt < retries - 1:
|
||
print(f"Retrying... (attempt {attempt + 1}/{retries})")
|
||
time.sleep(delay)
|
||
else:
|
||
print("All retries failed.")
|
||
return False
|
||
|
||
|
||
def check_unpushed_commits():
|
||
try:
|
||
# 获取当前分支的本地提交和远程提交的差异
|
||
result = subprocess_check_output(["git", "cherry", "-v"]).decode("utf-8").strip()
|
||
# 如果结果不为空,说明存在未push的提交
|
||
return bool(result)
|
||
except subprocess.CalledProcessError as e:
|
||
print(f"Error checking for unpushed commits: {e}")
|
||
return True
|
||
|
||
|
||
def auto_push():
|
||
# 获取当前分支名
|
||
if not check_unpushed_commits():
|
||
return True
|
||
try:
|
||
branch = (
|
||
subprocess_check_output(["git", "rev-parse", "--abbrev-ref", "HEAD"])
|
||
.strip()
|
||
.decode("utf-8")
|
||
)
|
||
except subprocess.CalledProcessError as e:
|
||
print(f"Error getting current branch: {e}")
|
||
return False
|
||
|
||
# 检查当前分支是否有对应的远程分支
|
||
remote_branch_exists = subprocess_call(["git", "ls-remote", "--exit-code", "origin", branch])
|
||
|
||
push_command = ["git", "push", "origin", branch]
|
||
if remote_branch_exists == 0:
|
||
# 如果存在远程分支,则直接push提交
|
||
return run_command_with_retries(push_command)
|
||
else:
|
||
# 如果不存在远程分支,则发布并push提交
|
||
push_command.append("-u")
|
||
return run_command_with_retries(push_command)
|
||
|
||
|
||
def get_recently_pr(repo):
|
||
url = f"{GITHUB_API_URL}/repos/{repo}/pulls?state=open&sort=updated"
|
||
headers = {
|
||
"Authorization": f"token {GITHUB_ACCESS_TOKEN}",
|
||
"Accept": "application/vnd.github.v3+json",
|
||
}
|
||
response = requests.get(url, headers=headers)
|
||
print("=>:", url)
|
||
|
||
branch_name = get_current_branch()
|
||
|
||
if response.status_code == 200:
|
||
prs = response.json()
|
||
for pr in prs:
|
||
if pr["head"]["ref"] == branch_name:
|
||
return pr
|
||
return None
|
||
else:
|
||
return None
|
||
|
||
|
||
def update_pr(pr_number, title, body, repo_name):
|
||
url = f"{GITHUB_API_URL}/repos/{repo_name}/pulls/{pr_number}"
|
||
headers = {
|
||
"Authorization": f"token {GITHUB_ACCESS_TOKEN}",
|
||
"Content-Type": "application/json",
|
||
}
|
||
payload = {"title": title, "body": body}
|
||
response = requests.patch(url, headers=headers, data=json.dumps(payload))
|
||
|
||
if response.status_code == 200:
|
||
print(f"PR updated successfully: {response.json()['html_url']}")
|
||
return response.json()
|
||
else:
|
||
print("Failed to update PR.")
|
||
return None
|
||
|
||
|
||
def get_last_base_branch(default_branch):
|
||
"""read last base branch from config file"""
|
||
|
||
def read_config_item(config_path, item):
|
||
if os.path.exists(config_path):
|
||
with open(config_path, "r", encoding="utf-8") as f:
|
||
config = json.load(f)
|
||
return config.get(item)
|
||
return None
|
||
|
||
project_config_path = os.path.join(os.getcwd(), ".chat", ".workflow_config.json")
|
||
last_base_branch = read_config_item(project_config_path, "last_base_branch")
|
||
if last_base_branch:
|
||
return last_base_branch
|
||
return default_branch
|
||
|
||
|
||
def save_last_base_branch(base_branch=None):
|
||
"""save last base branch to config file"""
|
||
|
||
def save_config_item(config_path, item, value):
|
||
if os.path.exists(config_path):
|
||
with open(config_path, "r", encoding="utf-8") as f:
|
||
config = json.load(f)
|
||
else:
|
||
config = {}
|
||
|
||
config[item] = value
|
||
with open(config_path, "w", encoding="utf-8") as f:
|
||
json.dump(config, f, indent=4)
|
||
|
||
if not base_branch:
|
||
base_branch = get_current_branch()
|
||
project_config_path = os.path.join(os.getcwd(), ".chat", ".workflow_config.json")
|
||
save_config_item(project_config_path, "last_base_branch", base_branch)
|
||
|
||
|
||
def get_git_username():
|
||
return subprocess_check_output(["git", "config", "--get", "user.name"]).decode("utf-8").strip()
|
||
|
||
|
||
def get_github_repo_issues(
|
||
owner_repo,
|
||
milestone=None,
|
||
state=None,
|
||
assignee=None,
|
||
creator=None,
|
||
mentioned=None,
|
||
labels=None,
|
||
sort=None,
|
||
direction=None,
|
||
since=None,
|
||
per_page=None,
|
||
page=None,
|
||
):
|
||
url = f"{GITHUB_API_URL}/repos/{owner_repo}/issues"
|
||
headers = {
|
||
"Authorization": f"token {GITHUB_ACCESS_TOKEN}",
|
||
"Accept": "application/vnd.github.v3+json",
|
||
}
|
||
params = {
|
||
"milestone": milestone,
|
||
"state": state,
|
||
"assignee": assignee,
|
||
"creator": creator,
|
||
"mentioned": mentioned,
|
||
"labels": labels,
|
||
"sort": sort,
|
||
"direction": direction,
|
||
"since": since,
|
||
"per_page": per_page,
|
||
"page": page,
|
||
}
|
||
response = requests.get(url, headers=headers, params=params)
|
||
if response.status_code == 200:
|
||
return response.json()
|
||
else:
|
||
return None
|