Merge pull request #108 from devchat-ai/add_pr_git_workflow

Add pr git workflow
This commit is contained in:
Rankin Zheng 2024-05-18 14:18:18 +00:00 committed by GitHub
commit ff37ed592c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
48 changed files with 2500 additions and 0 deletions

View File

@ -0,0 +1,98 @@
import json
import sys
import os
from devchat.llm import chat_json
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), ".."))
from common_util import ui_edit, assert_exit # noqa: E402
from git_api import auto_push, check_git_installed, create_pull_request, get_commit_messages, get_current_branch, get_github_repo, get_issue_info, is_issue_url, read_issue_by_url
def extract_issue_id(branch_name):
if "#" in branch_name:
return branch_name.split("#")[-1]
return None
# Function to generate a random branch name
PROMPT = (
"You are a coding engineer, required to summarize the ISSUE description into a coding task description of no more than 50 words. \n"
"The ISSUE description is as follows: {issue_body}, please summarize the corresponding coding task description.\n"
"The coding task description should be output in JSON format, in the form of: {{\"summary\": \"code task summary\"}}\n"
)
@chat_json(prompt=PROMPT, model="gpt-4-1106-preview")
def generate_code_task_summary(issue_body):
pass
@ui_edit(ui_type="editor", description="Edit code task summary:")
def edit_code_task_summary(task_summary):
pass
def get_issue_or_task(task):
if is_issue_url(task):
issue = read_issue_by_url(task.strip())
assert_exit(not issue, "Failed to read issue.", exit_code=-1)
return json.dumps({"id": issue["number"], "title": issue["title"], "body": issue["body"]})
else:
return task
def get_issue_json(issue_id, task):
issue = {"id": "no issue id", "title": "", "body": task}
if issue_id:
issue = get_issue_info(issue_id)
assert_exit(not issue, "Failed to retrieve issue with ID: {issue_id}", exit_code=-1)
issue = {"id": issue_id, "html_url": issue["html_url"], "title": issue["title"], "body": issue["body"]}
return issue
# Main function
def main():
print("Start update code task summary ...", end="\n\n", flush=True)
is_git_installed = check_git_installed()
assert_exit(not is_git_installed, "Git is not installed.", exit_code=-1)
task = sys.argv[1]
repo_name = get_github_repo()
branch_name = get_current_branch()
issue_id = extract_issue_id(branch_name)
# print basic info, repo_name, branch_name, issue_id
print("repo name:", repo_name, end="\n\n")
print("branch name:", branch_name, end="\n\n")
print("issue id:", issue_id, end="\n\n")
issue = get_issue_json(issue_id, task)
assert_exit(not issue["body"], "Failed to retrieve issue with ID: {issue_id}", exit_code=-1)
# Generate 5 branch names
print("Generating code task summary ...", end="\n\n", flush=True)
code_task_summary = generate_code_task_summary(issue_body = issue["body"])
assert_exit(not code_task_summary, "Failed to generate code task summary.", exit_code=-1)
assert_exit(not code_task_summary.get("summary", None), "Failed to generate code task summary, missing summary field in result.", exit_code=-1)
code_task_summary = code_task_summary["summary"]
# Select branch name
code_task_summary = edit_code_task_summary(code_task_summary)
assert_exit(not code_task_summary, "Failed to edit code task summary.", exit_code=-1)
code_task_summary = code_task_summary[0]
# create and checkout branch
print(f"Updating code task summary to config:")
config_file = os.path.join(".chat", "complete.config")
if os.path.exists(config_file):
with open(config_file, "r") as f:
config = json.load(f)
config["taskDescription"] = code_task_summary
else:
config = {"taskDescription": code_task_summary}
with open(config_file, "w") as f:
json.dump(config, f, indent=4)
print("Code task summary has updated")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,4 @@
description: 'Create new branch based current branch, and checkout new branch.'
input: optional
steps:
- run: $devchat_python $command_path/command.py "$input"

View File

@ -0,0 +1 @@
description: Root of git commands.

View File

View File

@ -0,0 +1,5 @@
description: 'Writes a well-formatted commit message for selected code changes and commits them via Git. Include an issue number if desired (e.g., input "/commit to close #12").'
hint: to close Issue #issue_number
input: optional
steps:
- run: $devchat_python $command_path/commit.py "$input" "english"

View File

@ -0,0 +1,419 @@
# flake8: noqa: E402
import os
import re
import subprocess
import sys
from lib.chatmark import Checkbox, Form, TextEditor
from lib.ide_service import IDEService
from devchat.llm import chat_completion_stream
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), ".."))
from common_util import assert_exit # noqa: E402
from git_api import get_issue_info
diff_too_large_message_en = (
"Commit failed. The modified content is too long "
"and exceeds the model's length limit. "
"You can try to make partial changes to the file and submit multiple times. "
"Making small changes and submitting them multiple times is a better practice."
)
diff_too_large_message_zh = (
"提交失败。修改内容太长,超出模型限制长度,"
"可以尝试选择部分修改文件多次提交,小修改多提交是更好的做法。"
)
COMMIT_PROMPT_LIMIT_SIZE = 20000
def extract_markdown_block(text):
"""
Extracts the first Markdown code block from the given text without the language specifier.
:param text: A string containing Markdown text
:return: The content of the first Markdown code block, or None if not found
"""
# 正则表达式匹配Markdown代码块忽略可选的语言类型标记
pattern = r"```(?:\w+)?\s*\n(.*?)\n```"
match = re.search(pattern, text, re.DOTALL)
if match:
# 返回第一个匹配的代码块内容,去除首尾的反引号和语言类型标记
# 去除块结束标记前的一个换行符,但保留其他内容
block_content = match.group(1)
return block_content
else:
return text
# Read the prompt from the diffCommitMessagePrompt.txt file
def read_prompt_from_file(filename):
"""
Reads the content of a file and returns it as a string.
This function is designed to read a prompt message from a text file.
It expects the file to be encoded in UTF-8 and will strip any leading
or trailing whitespace from the content of the file. If the file does
not exist or an error occurs during reading, the function logs an error
message and exits the script.
Parameters:
- filename (str): The path to the file that contains the prompt message.
Returns:
- str: The content of the file as a string.
Raises:
- FileNotFoundError: If the file does not exist.
- Exception: If any other error occurs during file reading.
"""
try:
with open(filename, "r", encoding="utf-8") as file:
return file.read().strip()
except FileNotFoundError:
IDEService().log_info(
f"File {filename} not found. "
"Please make sure it exists in the same directory as the script."
)
sys.exit(1)
except Exception as e:
IDEService().log_info(f"An error occurred while reading the file {filename}: {e}")
sys.exit(1)
# Read the prompt content from the file
script_path = os.path.dirname(__file__)
PROMPT_FILENAME = os.path.join(script_path, "diffCommitMessagePrompt.txt")
PROMPT_COMMIT_MESSAGE_BY_DIFF_USER_INPUT = read_prompt_from_file(PROMPT_FILENAME)
prompt_commit_message_by_diff_user_input_llm_config = {
"model": os.environ.get("LLM_MODEL", "gpt-3.5-turbo-1106")
}
language = ""
def assert_value(value, message):
"""
判断给定的value是否为True如果是则输出指定的message并终止程序
Args:
value: 用于判断的值
message: 如果value为True时需要输出的信息
Returns:
无返回值
"""
if value:
print(message, file=sys.stderr, flush=True)
sys.exit(-1)
def decode_path(encoded_path):
octal_pattern = re.compile(r"\\[0-7]{3}")
if octal_pattern.search(encoded_path):
bytes_path = encoded_path.encode("utf-8").decode("unicode_escape").encode("latin1")
decoded_path = bytes_path.decode("utf-8")
return decoded_path
else:
return encoded_path
def get_modified_files():
"""
获取当前修改文件列表以及已经staged的文件列表
Args:
Returns:
tuple: 包含两个list的元组第一个list包含当前修改过的文件第二个list包含已经staged的文件
"""
""" 获取当前修改文件列表以及已经staged的文件列表"""
output = subprocess.check_output(["git", "status", "-s", "-u"], text=True, encoding="utf-8")
lines = output.split("\n")
modified_files = []
staged_files = []
def strip_file_name(file_name):
file = file_name.strip()
if file.startswith('"'):
file = file[1:-1]
return file
for line in lines:
if len(line) > 2:
status, filename = line[:2], decode_path(line[3:])
# check wether filename is a directory
if os.path.isdir(filename):
continue
modified_files.append(os.path.normpath(strip_file_name(filename)))
if status == "M " or status == "A " or status == "D ":
staged_files.append(os.path.normpath(strip_file_name(filename)))
return modified_files, staged_files
def get_marked_files(modified_files, staged_files):
"""
根据给定的参数获取用户选中以供提交的文件
Args:
modified_files (List[str]): 用户已修改文件列表
staged_files (List[str]): 用户已staged文件列表
Returns:
List[str]: 用户选中的文件列表
"""
# Create two Checkbox instances for staged and unstaged files
staged_checkbox = Checkbox(staged_files, [True] * len(staged_files))
unstaged_files = [file for file in modified_files if file not in staged_files]
unstaged_checkbox = Checkbox(unstaged_files, [False] * len(unstaged_files))
# Create a Form with both Checkbox instances
form_list = []
if len(staged_files) > 0:
form_list.append("Staged:\n\n")
form_list.append(staged_checkbox)
if len(unstaged_files) > 0:
form_list.append("Unstaged:\n\n")
form_list.append(unstaged_checkbox)
form = Form(form_list, submit_button_name="Continue")
# Render the Form and get user input
form.render()
# Retrieve the selected files from both Checkbox instances
staged_checkbox_selections = staged_checkbox.selections if staged_checkbox.selections else []
unstaged_selections = unstaged_checkbox.selections if unstaged_checkbox.selections else []
selected_staged_files = [staged_files[idx] for idx in staged_checkbox_selections]
selected_unstaged_files = [unstaged_files[idx] for idx in unstaged_selections]
# Combine the selections from both checkboxes
selected_files = selected_staged_files + selected_unstaged_files
return selected_files
def rebuild_stage_list(user_files):
"""
根据用户选中文件重新构建stage列表
Args:
user_files: 用户选中的文件列表
Returns:
None
"""
# Unstage all files
subprocess.check_output(["git", "reset"])
# Stage all user_files
for file in user_files:
os.system(f'git add "{file}"')
def get_diff():
"""
获取暂存区文件的Diff信息
Args:
Returns:
bytes: 返回bytes类型是git diff --cached命令的输出结果
"""
return subprocess.check_output(["git", "diff", "--cached"])
def get_current_branch():
try:
# 使用git命令获取当前分支名称
result = subprocess.check_output(
["git", "branch", "--show-current"], stderr=subprocess.STDOUT
).strip()
# 将结果从bytes转换为str
current_branch = result.decode("utf-8")
return current_branch
except subprocess.CalledProcessError:
# 如果发生错误,打印错误信息
return None
except FileNotFoundError:
# 如果未找到git命令可能是没有安装git或者不在PATH中
return None
def generate_commit_message_base_diff(user_input, diff, issue):
"""
根据diff信息通过AI生成一个commit消息
Args:
user_input (str): 用户输入的commit信息
diff (str): 提交的diff信息
Returns:
str: 生成的commit消息
"""
global language
language_prompt = "You must response commit message in chinese。\n" if language == "zh" else ""
prompt = PROMPT_COMMIT_MESSAGE_BY_DIFF_USER_INPUT.replace("{__DIFF__}", f"{diff}").replace(
"{__USER_INPUT__}", f"{user_input + language_prompt}"
).replace("{__ISSUE__}", f"{issue}")
model_token_limit_error = (
diff_too_large_message_en if language == "en" else diff_too_large_message_zh
)
if len(str(prompt)) > COMMIT_PROMPT_LIMIT_SIZE:
print(model_token_limit_error, flush=True)
sys.exit(0)
messages = [{"role": "user", "content": prompt}]
response = chat_completion_stream(messages, prompt_commit_message_by_diff_user_input_llm_config)
if (
not response["content"]
and response.get("error", None)
and f'{response["error"]}'.find("This model's maximum context length is") > 0
):
print(model_token_limit_error)
sys.exit(0)
assert_value(not response["content"], response.get("error", ""))
response["content"] = extract_markdown_block(response["content"])
return response
def display_commit_message_and_commit(commit_message):
"""
展示提交信息并提交
Args:
commit_message: 提交信息
Returns:
None
"""
text_editor = TextEditor(commit_message, submit_button_name="Commit")
text_editor.render()
new_commit_message = text_editor.new_text
if not new_commit_message:
return None
return subprocess.check_output(["git", "commit", "-m", new_commit_message])
def extract_issue_id(branch_name):
if "#" in branch_name:
return branch_name.split("#")[-1]
return None
def get_issue_json(issue_id):
issue = {"id": "no issue id", "title": "", "body": ""}
if issue_id:
issue = get_issue_info(issue_id)
assert_exit(not issue, "Failed to retrieve issue with ID: {issue_id}", exit_code=-1)
issue = {"id": issue_id, "html_url": issue["html_url"], "title": issue["title"], "body": issue["body"]}
return issue
def check_git_installed():
try:
subprocess.run(
["git", "--version"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
check=True,
)
return True
except subprocess.CalledProcessError:
print("Git is not installed on your system.", file=sys.stderr, flush=True)
except FileNotFoundError:
print("Git is not installed on your system.", file=sys.stderr, flush=True)
except Exception:
print("Git is not installed on your system.", file=sys.stderr, flush=True)
return False
def main():
global language
try:
print("Let's follow the steps below.\n\n")
# Ensure enough command line arguments are provided
if len(sys.argv) < 3:
print("Usage: python script.py <user_input> <language>", file=sys.stderr, flush=True)
sys.exit(-1)
user_input = sys.argv[1]
language = sys.argv[2]
if not check_git_installed():
sys.exit(-1)
print(
"Step 1/2: Select the files you've changed that you wish to include in this commit, "
"then click 'Submit'.",
end="\n\n",
flush=True,
)
modified_files, staged_files = get_modified_files()
if len(modified_files) == 0:
print("No files to commit.", file=sys.stderr, flush=True)
sys.exit(-1)
selected_files = get_marked_files(modified_files, staged_files)
if not selected_files:
print("No files selected, commit aborted.")
return
rebuild_stage_list(selected_files)
print(
"Step 2/2: Review the commit message I've drafted for you. "
"Edit it below if needed. Then click 'Commit' to proceed with "
"the commit using this message.",
end="\n\n",
flush=True,
)
diff = get_diff()
branch_name = get_current_branch()
issue_id = extract_issue_id(branch_name)
issue = str(get_issue_json(issue_id))
if branch_name:
user_input += "\ncurrent repo branch name is:" + branch_name
commit_message = generate_commit_message_base_diff(user_input, diff, issue)
# TODO
# remove Closes #IssueNumber in commit message
commit_message["content"] = (
commit_message["content"]
.replace("Closes #IssueNumber", "")
.replace("No specific issue to close", "")
.replace("No specific issue mentioned.", "")
)
commit_result = display_commit_message_and_commit(commit_message["content"])
if not commit_result:
print("Commit aborted.", flush=True)
else:
print("Commit completed.", flush=True)
sys.exit(0)
except Exception as err:
print("Exception:", err, file=sys.stderr, flush=True)
sys.exit(-1)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,39 @@
Objective:** Generate a commit message that succinctly describes the codebase changes reflected in the provided diff, while incorporating any extra context or guidance from the user.
**Commit Message Structure:**
1. **Title Line:** Choose a type such as `feat`, `fix`, `docs`, `style`, `refactor`, `perf`, `test`, `build`, `ci`, `chore`, and so on, and couple it with a succinct title. Use the format: `type: Title`. Only one title line is permissible.
2. **Summary:** Summarize all adjustments concisely within a maximum of three detailed message lines. Prefix each line with a \"-\".
3. **Closing Reference (Conditional):** Include the line `Closes #IssueNumber` only if a specific, relevant issue number has been mentioned in the user input.
**Response Format:**
```
type: Title
- Detail message line 1
- Detail message line 2
- Detail message line 3
Closes #IssueNumber
```
Only append the \"Closes #IssueNumber\" if the user input explicitly references an issue to close.
**Constraints:**
- Exclude markdown code block indicators (```) and the placeholder \"commit_message\" from your response.
- Follow commit message best practices:
- Limit the title length to 50 characters.
- Limit each summary line to 72 characters.
- If the precise issue number is not known or not stated by the user, do not include the closing reference.
**User Input:** `{__USER_INPUT__}`
Determine if `{__USER_INPUT__}` contains a reference to closing an issue. If so, include the closing reference in the commit message. Otherwise, exclude it.
**Code Changes:**
```
{__DIFF__}
```
Related issue:
{__ISSUE__}
Utilize the provided format to craft a commit message that adheres to the stipulated criteria.

View File

@ -0,0 +1,5 @@
description: '为你选定的代码变更生成格式规范的提交信息,并通过 Git 提交。如需要可包含对应 issue 编号(例如,输入“/commit to close #12”'
hint: to close Issue #issue_number
input: optional
steps:
- run: $devchat_python $command_path/../commit.py "$input" "chinese"

View File

@ -0,0 +1,81 @@
import functools
import sys
import os
from lib.chatmark import TextEditor, Form, Radio, Checkbox
def create_ui_objs(ui_decls, args):
ui_objs = []
editors = []
for i, ui in enumerate(ui_decls):
editor = ui[0](args[i])
if ui[1]:
# this is the title of UI object
editors.append(ui[1])
editors.append(editor)
ui_objs.append(editor)
return ui_objs, editors
def edit_form(uis, args):
ui_objs, editors = create_ui_objs(uis, args)
form = Form(editors)
form.render()
values = []
for obj in ui_objs:
if isinstance(obj, TextEditor):
values.append(obj.new_text)
elif isinstance(obj, Radio):
values.append(obj.selection)
else:
# TODO
pass
return values
def editor(description):
def decorator_edit(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
uis = wrapper.uis[::-1]
return edit_form(uis, args)
if hasattr(func, "uis"):
wrapper.uis = func.uis
else:
wrapper.uis = []
wrapper.uis.append((TextEditor, description))
return wrapper
return decorator_edit
def ui_edit(ui_type, description):
def decorator_edit(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
uis = wrapper.uis[::-1]
return edit_form(uis, args)
if hasattr(func, "uis"):
wrapper.uis = func.uis
else:
wrapper.uis = []
ui_type_class = {
"editor": TextEditor,
"radio": Radio,
"checkbox": Checkbox
}[ui_type]
wrapper.uis.append((ui_type_class, description))
return wrapper
return decorator_edit
def assert_exit(condition, message, exit_code = -1):
if condition:
if exit_code == 0:
print(message, end="\n\n", flush=True)
else:
print(message, end="\n\n", file=sys.stderr, flush=True)
sys.exit(exit_code)

View File

@ -0,0 +1,83 @@
import sys
import json
import os
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), ".."))
from common_util import editor # noqa: E402
def read_issue_url():
config_path = os.path.join(os.getcwd(), ".chat", ".workflow_config.json")
if os.path.exists(config_path):
with open(config_path, "r", encoding="utf-8") as f:
config_data = json.load(f)
if "issue_repo" in config_data:
return config_data["issue_repo"]
return ""
def save_issue_url(issue_url):
config_path = os.path.join(os.getcwd(), ".chat", ".workflow_config.json")
# make dirs
os.makedirs(os.path.dirname(config_path), exist_ok=True)
config_data = {}
if os.path.exists(config_path):
with open(config_path, "r", encoding="utf-8") as f:
config_data = json.load(f)
config_data["issue_repo"] = issue_url
with open(config_path, "w+", encoding="utf-8") as f:
json.dump(config_data, f, indent=4)
def read_github_token():
config_path = os.path.join(os.path.expanduser("~/.chat"), ".workflow_config.json")
if os.path.exists(config_path):
with open(config_path, "r", encoding="utf-8") as f:
config_data = json.load(f)
if "github_token" in config_data:
return config_data["github_token"]
return ""
def save_github_token(github_token):
config_path = os.path.join(os.path.expanduser("~/.chat"), ".workflow_config.json")
config_data = {}
if os.path.exists(config_path):
with open(config_path, "r", encoding="utf-8") as f:
config_data = json.load(f)
config_data["github_token"] = github_token
with open(config_path, "w+", encoding="utf-8") as f:
json.dump(config_data, f, indent=4)
@editor("Please specify the issue's repository, "
"If the issue is within this repository, no need to specify. "
"Otherwise, format as: username/repository-name")
@editor("Input your github TOKEN to access github api:")
def edit_issue(issue_url, github_token):
pass
def main():
print("start config git settings ...", end="\n\n", flush=True)
issue_url = read_issue_url()
github_token = read_github_token()
issue_url, github_token = edit_issue(issue_url, github_token)
if issue_url:
save_issue_url(issue_url)
if github_token:
save_github_token(github_token)
else:
print("Please specify the github token to access github api.")
sys.exit(0)
print("config git settings successfully.")
sys.exit(0)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,3 @@
description: 'Config required settings for GIT workflows.'
steps:
- run: $devchat_python $command_path/command.py

399
merico/github/git_api.py Normal file
View File

@ -0,0 +1,399 @@
import subprocess
import requests
import time
import json
import os
import sys
from lib.chatmark import TextEditor
def read_github_token():
config_path = os.path.join(os.path.expanduser("~/.chat"), ".workflow_config.json")
if os.path.exists(config_path):
with open(config_path, "r", encoding="utf-8") as f:
config_data = json.load(f)
if "github_token" in config_data:
return config_data["github_token"]
# ask user to input github token
server_access_token_editor = TextEditor(
"",
f"Please input your GITHUB access TOKEN to access:"
)
server_access_token_editor.render()
server_access_token = server_access_token_editor.new_text
if not server_access_token:
print("Please input your GITHUB access TOKEN to continue.")
sys.exit(-1)
return server_access_token
GITHUB_ACCESS_TOKEN = read_github_token()
GITHUB_API_URL = "https://api.github.com"
def create_issue(title, body):
headers = {
"Authorization": f"token {GITHUB_ACCESS_TOKEN}",
"Accept": "application/vnd.github.v3+json",
}
data = {
"title": title,
"body": body,
}
issue_api_url = f"https://api.github.com/repos/{get_github_repo(True)}/issues"
response = requests.post(issue_api_url, headers=headers, data=json.dumps(data))
if response.status_code == 201:
print("Issue created successfully!")
return response.json()
else:
print(f"Failed to create issue: {response.content}", file=sys.stderr, end="\n\n")
return None
def update_issue_body(issue_url, issue_body):
"""
Update the body text of a GitHub issue.
:param issue_url: The API URL of the issue to update.
:param issue_body: The new body text for the issue.
"""
headers = {
"Authorization": f"token {GITHUB_ACCESS_TOKEN}",
"Accept": "application/vnd.github.v3+json",
}
data = {
"body": issue_body,
}
issue_api_url = f"https://api.github.com/repos/{get_github_repo(True)}/issues"
api_url = f"{issue_api_url}/{issue_url.split('/')[-1]}"
response = requests.patch(api_url, headers=headers, data=json.dumps(data))
if response.status_code == 200:
print("Issue updated successfully!")
return response.json()
else:
print(f"Failed to update issue: {response.status_code}")
return None
# parse sub tasks in issue body
def parse_sub_tasks(body):
sub_tasks = []
lines = body.split("\n")
for line in lines:
if line.startswith("- ["):
sub_tasks.append(line[2:])
return sub_tasks
def update_sub_tasks(body, tasks):
# remove all existing tasks
lines = body.split("\n")
updated_body = "\n".join(line for line in lines if not line.startswith("- ["))
# add new tasks
updated_body += "\n" + "\n".join(f"- {task}" for task in tasks)
return updated_body
def update_task_issue_url(body, task, issue_url):
# task is like:
#[ ] task name
#[x] task name
# replace task name with issue url, like:
#[ ] [task name](url)
#[x] [task name](url)
if task.find('] ') == -1:
return None
task = task[task.find('] ')+2 : ]
return body.replace(task, f"[{task}]({issue_url})")
def check_git_installed():
"""
Check if Git is installed on the local machine.
Tries to execute 'git --version' command to determine the presence of Git.
Returns:
bool: True if Git is installed, False otherwise.
"""
try:
subprocess.run(
["git", "--version"], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
return True
except subprocess.CalledProcessError:
print("Git is not installed on your system.")
return False
def create_and_checkout_branch(branch_name):
subprocess.run(["git", "checkout", "-b", branch_name], check=True)
def is_issue_url(task):
issue_url = f"https://github.com/{get_github_repo(True)}/issues"
return task.strip().startswith(issue_url)
def read_issue_by_url(issue_url):
issue_number = issue_url.split("/")[-1]
# Construct the API endpoint URL
issue_api_url = f"https://api.github.com/repos/{get_github_repo(True)}/issues"
api_url = f"{issue_api_url}/{issue_number}"
# Send a GET request to the API endpoint
headers = {
"Accept": "application/vnd.github.v3+json",
"Authorization": f"token {GITHUB_ACCESS_TOKEN}",
}
response = requests.get(api_url, headers=headers)
if response.status_code == 200:
return response.json()
else:
return None
def get_github_repo(issue_repo=False):
try:
config_path = os.path.join(os.getcwd(), ".chat", ".workflow_config.json")
if os.path.exists(config_path) and issue_repo:
with open(config_path, "r", encoding="utf-8") as f:
config_data = json.load(f)
if "issue_repo" in config_data:
print("current issue repo:", config_data["issue_repo"], end="\n\n", file=sys.stderr, flush=True)
return config_data["issue_repo"]
# 使用git命令获取当前仓库的URL
result = subprocess.check_output(
["git", "remote", "get-url", "origin"], stderr=subprocess.STDOUT
).strip()
# 将结果从bytes转换为str并提取出仓库信息
repo_url = result.decode("utf-8")
# 假设repo_url的格式为https://github.com/username/repo.git
parts = repo_url.split("/")
repo = parts[-1].replace(".git", "")
username = parts[-2].split(":")[-1]
github_repo = f"{username}/{repo}"
print("current github repo:", github_repo, end="\n\n", file=sys.stderr, flush=True)
return github_repo
except subprocess.CalledProcessError as e:
print(e)
# 如果发生错误,打印错误信息
return None
except FileNotFoundError:
# 如果未找到git命令可能是没有安装git或者不在PATH中
print("==> File not found...")
return None
# 获取当前分支名称
def get_current_branch():
try:
# 使用git命令获取当前分支名称
result = subprocess.check_output(
["git", "branch", "--show-current"], stderr=subprocess.STDOUT
).strip()
# 将结果从bytes转换为str
current_branch = result.decode("utf-8")
return current_branch
except subprocess.CalledProcessError:
# 如果发生错误,打印错误信息
return None
except FileNotFoundError:
# 如果未找到git命令可能是没有安装git或者不在PATH中
return None
def get_parent_branch():
current_branch = get_current_branch()
if current_branch is None:
return None
try:
# 使用git命令获取当前分支的父分支引用
result = subprocess.check_output(
["git", "rev-parse", "--abbrev-ref", f"{current_branch}@{1}"], stderr=subprocess.STDOUT
).strip()
# 将结果从bytes转换为str
parent_branch_ref = result.decode("utf-8")
print("==>", parent_branch_ref)
if parent_branch_ref == current_branch:
# 如果父分支引用和当前分支相同,说明当前分支可能是基于一个没有父分支的提交创建的
return None
# 使用git命令获取父分支的名称
result = subprocess.check_output(
["git", "name-rev", "--name-only", "--exclude=tags/*", parent_branch_ref], stderr=subprocess.STDOUT
).strip()
parent_branch_name = result.decode("utf-8")
return parent_branch_name
except subprocess.CalledProcessError as e:
print(e)
# 如果发生错误,打印错误信息
return None
except FileNotFoundError:
# 如果未找到git命令可能是没有安装git或者不在PATH中
print("==> File not found...")
return None
def get_issue_info(issue_id):
# Construct the API endpoint URL
issue_api_url = f"https://api.github.com/repos/{get_github_repo(True)}/issues"
api_url = f"{issue_api_url}/{issue_id}"
# Send a GET request to the API endpoint
headers = {
"Accept": "application/vnd.github.v3+json",
"Authorization": f"token {GITHUB_ACCESS_TOKEN}",
}
response = requests.get(api_url, headers=headers)
if response.status_code == 200:
return response.json()
else:
return None
def get_issue_info_by_url(issue_url):
# get issue id from issue_url
def get_issue_id(issue_url):
# Extract the issue id from the issue_url
issue_id = issue_url.split("/")[-1]
return issue_id
return get_issue_info(get_issue_id(issue_url))
# 获取当前分支自从与base_branch分叉以来的历史提交信息
def get_commit_messages(base_branch):
# 找到当前分支与base_branch的分叉点
merge_base = subprocess.run(
["git", "merge-base", "HEAD", base_branch],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
# 检查是否成功找到分叉点
if merge_base.returncode != 0:
raise RuntimeError(f"Error finding merge base: {merge_base.stderr.strip()}")
# 获取分叉点的提交哈希
merge_base_commit = merge_base.stdout.strip()
# 获取从分叉点到当前分支的所有提交信息
result = subprocess.run(
["git", "log", f"{merge_base_commit}..HEAD"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
# 检查git log命令是否成功执行
if result.returncode != 0:
raise RuntimeError(f"Error retrieving commit messages: {result.stderr.strip()}")
# 返回提交信息列表
return result.stdout
# 创建PR
def create_pull_request(title, body, head, base, repo_name):
url = f"{GITHUB_API_URL}/repos/{repo_name}/pulls"
print("url:", url, end="\n\n")
headers = {"Authorization": f"token {GITHUB_ACCESS_TOKEN}", "Content-Type": "application/json"}
payload = {"title": title, "body": body, "head": head, "base": base}
response = requests.post(url, headers=headers, data=json.dumps(payload))
if response.status_code == 201:
return response.json()
print(response.text, end="\n\n", file=sys.stderr)
return None
def run_command_with_retries(command, retries=3, delay=5):
for attempt in range(retries):
try:
subprocess.check_call(command)
return True
except subprocess.CalledProcessError as e:
print(f"Command failed: {e}")
if attempt < retries - 1:
print(f"Retrying... (attempt {attempt + 1}/{retries})")
time.sleep(delay)
else:
print("All retries failed.")
return False
def check_unpushed_commits():
try:
# 获取当前分支的本地提交和远程提交的差异
result = subprocess.check_output(["git", "cherry", "-v"]).decode("utf-8").strip()
# 如果结果不为空说明存在未push的提交
return bool(result)
except subprocess.CalledProcessError as e:
print(f"Error checking for unpushed commits: {e}")
return True
def auto_push():
# 获取当前分支名
if not check_unpushed_commits():
return True
try:
branch = subprocess.check_output(["git", "rev-parse", "--abbrev-ref", "HEAD"]).strip().decode("utf-8")
except subprocess.CalledProcessError as e:
print(f"Error getting current branch: {e}")
return False
# 检查当前分支是否有对应的远程分支
remote_branch_exists = subprocess.call(["git", "ls-remote", "--exit-code", "origin", branch])
push_command = ["git", "push", "origin", branch]
if remote_branch_exists == 0:
# 如果存在远程分支则直接push提交
return run_command_with_retries(push_command)
else:
# 如果不存在远程分支则发布并push提交
push_command.append("-u")
return run_command_with_retries(push_command)
def get_recently_pr(repo):
url = f"{GITHUB_API_URL}/repos/{repo}/pulls?state=open&sort=updated"
headers = {"Authorization": f"token {GITHUB_ACCESS_TOKEN}", "Accept": "application/vnd.github.v3+json"}
response = requests.get(url, headers=headers)
print("=>:", url)
branch_name = get_current_branch()
if response.status_code == 200:
prs = response.json()
for pr in prs:
if pr['head']['ref'] == branch_name:
return pr
return None
else:
return None
def update_pr(pr_number, title, body, repo_name):
url = f"{GITHUB_API_URL}/repos/{repo_name}/pulls/{pr_number}"
headers = {"Authorization": f"token {GITHUB_ACCESS_TOKEN}", "Content-Type": "application/json"}
payload = {"title": title, "body": body}
response = requests.patch(url, headers=headers, data=json.dumps(payload))
if response.status_code == 200:
print(f"PR updated successfully: {response.json()['html_url']}")
return response.json()
else:
print("Failed to update PR.")
return None

View File

@ -0,0 +1,51 @@
import sys
import os
from devchat.llm import chat_json
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), ".."))
from git_api import create_issue # noqa: E402
from common_util import editor, assert_exit # noqa: E402
# Function to generate issue title and body using LLM
PROMPT = ("Based on the following description, "
"suggest a title and a detailed body for a GitHub issue:\n\n"
"Description: {description}\n\n"
"Output format: {{\"title\": \"<title>\", \"body\": \"<body>\"}} ")
@chat_json(prompt=PROMPT)
def generate_issue_content(description):
pass
@editor("Edit issue title:")
@editor("Edit issue body:")
def edit_issue(title, body):
pass
# Main function
def main():
print("start new_issue ...", end="\n\n", flush=True)
assert_exit(len(sys.argv) < 2, "Missing argument.", exit_code=-1)
description = sys.argv[1]
print("Generating issue content ...", end="\n\n", flush=True)
issue_json_ob = generate_issue_content(description=description)
assert_exit(not issue_json_ob, "Failed to generate issue content.", exit_code=-1)
issue_title, issue_body = edit_issue(issue_json_ob["title"], issue_json_ob["body"])
assert_exit(not issue_title, "Issue creation cancelled.", exit_code=0)
print("New Issue:", issue_title, "body:", issue_body, end="\n\n", flush=True)
print("Creating issue ...", end="\n\n", flush=True)
issue = create_issue(issue_title, issue_body)
assert_exit(not issue, "Failed to create issue.", exit_code=-1)
print("New Issue:", issue["html_url"], end="\n\n", flush=True)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,4 @@
description: 'Create new issue.'
input: required
steps:
- run: $devchat_python $command_path/command.py "$input"

View File

@ -0,0 +1,78 @@
import json
import sys
import os
from devchat.llm import chat_json
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), ".."))
from common_util import ui_edit, assert_exit # noqa: E402
from git_api import check_git_installed, create_and_checkout_branch, is_issue_url, read_issue_by_url # noqa: E402
# Function to generate a random branch name
PROMPT = (
"Give me 5 different git branch names, "
"mainly hoping to express: {task}, "
"Good branch name should looks like: <type>/<main content>-#<issue id>,"
"<issue id> is optional, add it only when you know the issue id clearly, "
"don't miss '#' before issue id. "
"the final result is output in JSON format, "
'as follows: {{"names":["name1", "name2", .. "name5"]}}\n'
)
@chat_json(prompt=PROMPT, model="gpt-4-1106-preview")
def generate_branch_name(task):
pass
@ui_edit(ui_type="radio", description="Select a branch name")
def select_branch_name_ui(branch_names):
pass
def select_branch_name(branch_names):
[branch_selection] = select_branch_name_ui(branch_names)
assert_exit(branch_selection is None, "No branch selected.", exit_code=0)
return branch_names[branch_selection]
def get_issue_or_task(task):
if is_issue_url(task):
issue = read_issue_by_url(task.strip())
assert_exit(not issue, "Failed to read issue.", exit_code=-1)
return json.dumps({"id": issue["number"], "title": issue["title"], "body": issue["body"]})
else:
return task
# Main function
def main():
print("Start create branch ...", end="\n\n", flush=True)
is_git_installed = check_git_installed()
assert_exit(not is_git_installed, "Git is not installed.", exit_code=-1)
task = sys.argv[1]
assert_exit(not task, "You need input something about the new branch, or input a issue url.", exit_code=-1)
# read issue by url
task = get_issue_or_task(task)
# Generate 5 branch names
print("Generating branch names ...", end="\n\n", flush=True)
branch_names = generate_branch_name(task = task)
assert_exit(not branch_names, "Failed to generate branch names.", exit_code=-1)
branch_names = branch_names["names"]
# Select branch name
selected_branch = select_branch_name(branch_names)
# create and checkout branch
print(f"Creating and checking out branch: {selected_branch}")
create_and_checkout_branch(selected_branch)
print("Branch has create and checkout")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,4 @@
description: 'Create new branch based current branch, and checkout new branch.'
input: required
steps:
- run: $devchat_python $command_path/command.py "$input"

View File

@ -0,0 +1,49 @@
import sys
import os
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), ".."))
from devchat.llm import chat_json # noqa: E402
from git_api import create_issue # noqa: E402
from common_util import editor, assert_exit # noqa: E402
# Function to generate issue title and body using LLM
PROMPT = ("Based on the following description, "
"suggest a title and a detailed body for a GitHub issue:\n\n"
"Description: {description}\n\n"
"Output as valid JSON format: {{\"title\": \"<title>\", \"body\": \"<body> use \\n as new line flag.\"}} ")
@chat_json(prompt=PROMPT)
def generate_issue_content(description):
pass
@editor("Edit issue title:")
@editor("Edit issue body:")
def edit_issue(title, body):
pass
# Main function
def main():
print("start new_issue ...", end="\n\n", flush=True)
assert_exit(len(sys.argv) < 2, "Missing argument.", exit_code=-1)
description = sys.argv[1]
print("Generating issue content ...", end="\n\n", flush=True)
issue_json_ob = generate_issue_content(description=description)
assert_exit(not issue_json_ob, "Failed to generate issue content.", exit_code=-1)
issue_title, issue_body = edit_issue(issue_json_ob["title"], issue_json_ob["body"])
assert_exit(not issue_title, "Issue creation cancelled.", exit_code=0)
print("New Issue:", issue_title, "body:", issue_body, end="\n\n", flush=True)
print("Creating issue ...", end="\n\n", flush=True)
issue = create_issue(issue_title, issue_body)
assert_exit(not issue, "Failed to create issue.", exit_code=-1)
print("New Issue:", issue["html_url"], end="\n\n", flush=True)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,4 @@
description: 'Create new issue.'
input: required
steps:
- run: $devchat_python $command_path/command.py "$input"

View File

@ -0,0 +1,81 @@
import sys
import os
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", ".."))
from devchat.llm import chat_json # noqa: E402
from git_api import create_issue, parse_sub_tasks, update_task_issue_url, update_issue_body, get_issue_info_by_url # noqa: E402
from common_util import editor, assert_exit, ui_edit # noqa: E402
# Function to generate issue title and body using LLM
PROMPT = (
"Following is parent issue content:\n"
"{issue_content}\n\n"
"Based on the following issue task: {task}"
"suggest a title and a detailed body for a GitHub issue:\n\n"
"Output format: {{\"title\": \"<title>\", \"body\": \"<body>\"}} ")
@chat_json(prompt=PROMPT)
def generate_issue_content(issue_content, task):
pass
@editor("Edit issue title:")
@editor("Edit issue body:")
def edit_issue(title, body):
pass
@ui_edit(ui_type="radio", description="Select a task to create issue:")
def select_task(tasks):
pass
def get_issue_json(issue_url):
issue = get_issue_info_by_url(issue_url)
assert_exit(not issue, "Failed to retrieve issue with ID: {issue_id}", exit_code=-1)
return {"id": issue["number"], "html_url": issue["html_url"], "title": issue["title"], "body": issue["body"]}
# Main function
def main():
print("start new_issue ...", end="\n\n", flush=True)
assert_exit(len(sys.argv) < 2, "Missing argument.", exit_code=-1)
issue_url = sys.argv[1]
old_issue = get_issue_json(issue_url)
assert_exit(not old_issue, "Failed to retrieve issue with: {issue_url}", exit_code=-1)
tasks = parse_sub_tasks(old_issue["body"])
assert_exit(not tasks, "No tasks in issue body.")
# select task from tasks
[task] = select_task(tasks)
assert_exit(task is None, "No task selected.")
task = tasks[task]
print("task:", task, end="\n\n", flush=True)
print("Generating issue content ...", end="\n\n", flush=True)
issue_json_ob = generate_issue_content(issue_content=old_issue, task=task)
assert_exit(not issue_json_ob, "Failed to generate issue content.", exit_code=-1)
issue_title, issue_body = edit_issue(issue_json_ob["title"], issue_json_ob["body"])
assert_exit(not issue_title, "Issue creation cancelled.", exit_code=0)
print("New Issue:", issue_title, "body:", issue_body, end="\n\n", flush=True)
print("Creating issue ...", end="\n\n", flush=True)
issue = create_issue(issue_title, issue_body)
assert_exit(not issue, "Failed to create issue.", exit_code=-1)
print("New Issue:", issue["html_url"], end="\n\n", flush=True)
# update issue task with new issue url
new_body = update_task_issue_url(old_issue["body"], task, issue["html_url"])
assert_exit(not new_body, f"{task} parse error.")
new_issue = update_issue_body(issue_url, new_body)
assert_exit(not new_issue, "Failed to update issue body.")
print("Issue tasks updated successfully!", end="\n\n", flush=True)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,4 @@
description: 'Create new issue.'
input: required
steps:
- run: $devchat_python $command_path/command.py "$input"

View File

@ -0,0 +1,96 @@
import subprocess
import sys
import requests
import json
import os
import time
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), ".."))
from lib.chatmark import TextEditor, Form # noqa: E402
from devchat.llm import chat_json # noqa: E402
from git_api import auto_push, check_git_installed, create_pull_request, get_commit_messages, get_current_branch, get_github_repo, get_issue_info, is_issue_url, read_issue_by_url
from common_util import assert_exit, ui_edit # noqa: E402
BASH_BRANCH = "main"
# 从分支名称中提取issue id
def extract_issue_id(branch_name):
if "#" in branch_name:
return branch_name.split("#")[-1]
return None
# 使用LLM模型生成PR内容
PROMPT = (
"Create a pull request title and body based on "
"the following issue and commit messages, if there is an "
"issue, close that issue in PR body as <user>/<repo>#issue_id:\n"
"Issue: {issue}\n"
"Commits:\n{commit_messages}\n"
"Other information:\n{user_input}\n\n"
"The response result should format as JSON object as following:\n"
'{{"title": "pr title", "body": "pr body"}}'
)
@chat_json(prompt=PROMPT, model="gpt-4-1106-preview")
def generate_pr_content_llm(issue, commit_message, user_input):
pass
def generate_pr_content(issue, commit_messages, user_input):
response = generate_pr_content_llm(issue = json.dumps(issue), commit_messages = commit_messages, user_input = user_input)
assert_exit(not response, "Failed to generate PR content.", exit_code=-1)
return response.get("title"), response.get("body")
@ui_edit(ui_type="editor", description="Edit PR title:")
@ui_edit(ui_type="editor", description="Edit PR body:")
def edit_pr(title, body):
pass
def get_issue_json(issue_id):
issue = {"id": "no issue id", "title": "", "body": ""}
if issue_id:
issue = get_issue_info(issue_id)
assert_exit(not issue, "Failed to retrieve issue with ID: {issue_id}", exit_code=-1)
issue = {"id": issue_id, "html_url": issue["html_url"], "title": issue["title"], "body": issue["body"]}
return issue
# 主函数
def main():
print("start new_pr ...", end="\n\n", flush=True)
repo_name = get_github_repo()
branch_name = get_current_branch()
issue_id = extract_issue_id(branch_name)
# print basic info, repo_name, branch_name, issue_id
print("repo name:", repo_name, end="\n\n")
print("branch name:", branch_name, end="\n\n")
print("issue id:", issue_id, end="\n\n")
issue = get_issue_json(issue_id)
commit_messages = get_commit_messages(BASH_BRANCH)
print("generating pr title and body ...", end="\n\n", flush=True)
user_input = sys.argv[1]
pr_title, pr_body = generate_pr_content(issue, commit_messages, user_input)
assert_exit(not pr_title, "Failed to generate PR content.", exit_code=-1)
pr_title, pr_body = edit_pr(pr_title, pr_body)
assert_exit(not pr_title, "PR creation cancelled.", exit_code=0)
is_push_success = auto_push()
assert_exit(not is_push_success, "Failed to push changes.", exit_code=-1)
pr = create_pull_request(pr_title, pr_body, branch_name, BASH_BRANCH, repo_name)
assert_exit(not pr, "Failed to create PR.", exit_code=-1)
print(f"PR created successfully: {pr['html_url']}")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,4 @@
description: 'Create new PR.'
input: optional
steps:
- run: $devchat_python $command_path/command.py "$input"

View File

@ -0,0 +1,86 @@
import sys
import os
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), ".."))
from devchat.llm import chat_json # noqa: E402
from git_api import get_issue_info_by_url, parse_sub_tasks, update_sub_tasks, update_issue_body # noqa: E402
from common_util import editor, assert_exit # noqa: E402
TASKS_PROMPT = (
"Following is my git issue content.\n"
"{issue_data}\n\n"
"Sub task in issue is like:- [ ] task name\n"
"'[ ] task name' will be as sub task content\n\n"
"Following is my idea to update sub tasks:\n"
"{user_input}\n\n"
"Please output all tasks in JSON format as:"
'{{"tasks": ["[ ] task1", "[ ] task2"]}}'
)
@chat_json(prompt=TASKS_PROMPT)
def generate_issue_tasks(issue_data, user_input):
pass
def to_task_str(tasks):
task_str = ""
for task in tasks:
task_str += task + "\n"
return task_str
@editor("Edit issue old tasks:")
@editor("Edit issue new tasks:")
def edit_issue_tasks(old_tasks, new_tasks):
pass
@editor("Input ISSUE url:")
def input_issue_url(url):
pass
@editor("How to update tasks:")
def update_tasks_input(user_input):
pass
def get_issue_json(issue_url):
issue = get_issue_info_by_url(issue_url)
assert_exit(not issue, "Failed to retrieve issue with ID: {issue_id}", exit_code=-1)
return {"id": issue["number"], "html_url": issue["html_url"], "title": issue["title"], "body": issue["body"]}
# Main function
def main():
print("start issue tasks update ...", end="\n\n", flush=True)
[issue_url] = input_issue_url("")
assert_exit(not issue_url, "No issue url.")
print("issue url:", issue_url, end="\n\n", flush=True)
issue = get_issue_json(issue_url)
old_tasks = parse_sub_tasks(issue["body"])
print(f"```tasks\n{to_task_str(old_tasks)}\n```", end="\n\n", flush=True)
[user_input] = update_tasks_input("")
assert_exit(not user_input, "No user input")
new_tasks = generate_issue_tasks(issue_data=issue, user_input=user_input)
assert_exit(not new_tasks, "No new tasks.")
print("new_tasks:", new_tasks, end="\n\n", flush=True)
assert_exit(not new_tasks.get("tasks", []), "No new tasks.")
print("new tasks:", to_task_str(new_tasks["tasks"]), end="\n\n", flush=True)
new_tasks = new_tasks["tasks"]
[old_tasks, new_tasks] = edit_issue_tasks(to_task_str(old_tasks), to_task_str(new_tasks))
assert_exit(not new_tasks, "No new tasks.")
print("new tasks:", new_tasks, end="\n\n", flush=True)
new_body = update_sub_tasks(issue["body"], new_tasks.split("\n"))
new_issue = update_issue_body(issue_url, new_body)
assert_exit(not new_issue, "Failed to update issue body.")
print("Issue tasks updated successfully!", end="\n\n", flush=True)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,4 @@
description: 'Create new issue.'
input: required
steps:
- run: $devchat_python $command_path/command.py "$input"

View File

@ -0,0 +1,100 @@
import subprocess
import sys
import requests
import json
import os
import time
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), ".."))
from lib.chatmark import TextEditor, Form # noqa: E402
from devchat.llm import chat_json # noqa: E402
from git_api import auto_push, check_git_installed, create_pull_request, get_commit_messages, get_current_branch, get_github_repo, get_issue_info, get_recently_pr, is_issue_url, read_issue_by_url, update_pr
from common_util import assert_exit, ui_edit # noqa: E402
from lib.chatmark import TextEditor, Form # noqa: E402
from devchat.llm import chat_completion_no_stream_return_json # noqa: E402
BASH_BRANCH = "main"
# 从分支名称中提取issue id
def extract_issue_id(branch_name):
if "#" in branch_name:
return branch_name.split("#")[-1]
return None
# 使用LLM模型生成PR内容
PROMPT = (
"Create a pull request title and body based on "
"the following issue and commit messages, if there is an "
"issue, close that issue in PR body as <user>/<repo>#issue_id:\n"
"Issue: {issue}\n"
"Commits:\n{commit_messages}\n"
"The response result should format as JSON object as following:\n"
'{{"title": "pr title", "body": "pr body"}}'
)
@chat_json(prompt=PROMPT, model="gpt-4-1106-preview")
def generate_pr_content_llm(issue, commit_messages):
pass
def generate_pr_content(issue, commit_messages):
response = generate_pr_content_llm(issue = json.dumps(issue), commit_messages = commit_messages)
assert_exit(not response, "Failed to generate PR content.", exit_code=-1)
return response.get("title"), response.get("body")
@ui_edit(ui_type="editor", description="Edit PR title:")
@ui_edit(ui_type="editor", description="Edit PR body:")
def edit_pr(title, body):
pass
def get_issue_json(issue_id):
issue = {"id": "no issue id", "title": "", "body": ""}
if issue_id:
issue = get_issue_info(issue_id)
assert_exit(not issue, "Failed to retrieve issue with ID: {issue_id}", exit_code=-1)
issue = {"id": issue_id, "html_url": issue["html_url"], "title": issue["title"], "body": issue["body"]}
return issue
# 主函数
def main():
print("start update_pr ...", end="\n\n", flush=True)
repo_name = get_github_repo()
branch_name = get_current_branch()
issue_id = extract_issue_id(branch_name)
# print basic info, repo_name, branch_name, issue_id
print("repo name:", repo_name, end="\n\n")
print("branch name:", branch_name, end="\n\n")
print("issue id:", issue_id, end="\n\n")
issue = get_issue_json(issue_id)
commit_messages = get_commit_messages(BASH_BRANCH)
recent_pr = get_recently_pr(repo_name)
assert_exit(not recent_pr, "Failed to get recent PR.", exit_code=-1)
print("generating pr title and body ...", end="\n\n", flush=True)
pr_title, pr_body = generate_pr_content(issue, commit_messages)
assert_exit(not pr_title, "Failed to generate PR content.", exit_code=-1)
pr_title, pr_body = edit_pr(pr_title, pr_body)
assert_exit(not pr_title, "PR creation cancelled.", exit_code=0)
is_push_success = auto_push()
assert_exit(not is_push_success, "Failed to push changes.", exit_code=-1)
pr = update_pr(recent_pr['number'], pr_title, pr_body, repo_name)
assert_exit(not pr, "Failed to update PR.", exit_code=-1)
print(f"PR updated successfully: {pr['html_url']}")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,4 @@
description: 'Create new PR.'
input: required
steps:
- run: $devchat_python $command_path/command.py "$input"

168
merico/pr/command.py Normal file
View File

@ -0,0 +1,168 @@
"""
/pr.describe https://github.com/devchat-ai/devchat-vscode/pull/25
"""
import os
import sys
import logging
import json
import argparse
import asyncio
from lib.ide_service import IDEService
# add the current directory to the path
from os.path import dirname, abspath
sys.path.append(dirname(dirname(abspath(__file__))))
# add new model configs to algo.MAX_TOKENS
import pr_agent.algo as algo
algo.MAX_TOKENS["gpt-4-turbo-preview"] = 128000
algo.MAX_TOKENS["claude-3-opus"] = 100000
algo.MAX_TOKENS["claude-3-sonnet"] = 100000
algo.MAX_TOKENS["claude-3-haiku"] = 100000
algo.MAX_TOKENS["ERNIE-Bot-4.0"] = 8000
algo.MAX_TOKENS["GLM-4"] = 8000
algo.MAX_TOKENS["hzwxai/Mixtral-8x7B-Instruct-v0.1-GPTQ"] = 16000
algo.MAX_TOKENS["minimax/abab6-chat"] = 8000
algo.MAX_TOKENS["xinghuo-3.5"] = 8000
algo.MAX_TOKENS["llama-2-70b-chat"] = 4000
algo.MAX_TOKENS["togetherai/codellama/CodeLlama-70b-Instruct-hf"] = 4000
algo.MAX_TOKENS["togetherai/mistralai/Mixtral-8x7B-Instruct-v0.1"] = 16000
algo.MAX_TOKENS["text-embedding-ada-002"] = 8000
algo.MAX_TOKENS["text-embedding-3-small"] = 8000
algo.MAX_TOKENS["text-embedding-3-large"] = 8000
algo.MAX_TOKENS["embedding-v1"] = 8000
algo.MAX_TOKENS["embedding-2"] = 8000
algo.MAX_TOKENS["togethercomputer/m2-bert-80M-2k-retrieval"] = 2048
algo.MAX_TOKENS["togethercomputer/m2-bert-80M-8k-retrieval"] = 8192
algo.MAX_TOKENS["togethercomputer/m2-bert-80M-32k-retrieval"] = 32768
algo.MAX_TOKENS["WhereIsAI/UAE-Large-V1"] = 512
algo.MAX_TOKENS["BAAI/bge-large-en-v1.5"] = 512
algo.MAX_TOKENS["BAAI/bge-base-en-v1.5"] = 512
algo.MAX_TOKENS["sentence-transformers/msmarco-bert-base-dot-v5"] = 512
algo.MAX_TOKENS["bert-base-uncased"] = 512
if os.environ.get("LLM_MODEL", "gpt-3.5-turbo-1106") not in algo.MAX_TOKENS:
current_model = os.environ.get("LLM_MODEL", "gpt-3.5-turbo-1106")
IDEService().ide_logging("info", f"{current_model}'s max tokens is not config, we use it as default 16000")
algo.MAX_TOKENS[os.environ.get("LLM_MODEL", "gpt-3.5-turbo-1106")] = 16000
# add new git provider
def get_git_provider():
from pr_agent.config_loader import get_settings
_git_provider_old_ = get_settings().config.git_provider
get_settings().config.git_provider = "devchat"
provider = _get_git_provider_old()
get_settings().config.git_provider = _git_provider_old_
return provider
import pr_agent.git_providers as git_providers
from providers.devchat_provider import DevChatProvider
git_providers._GIT_PROVIDERS['devchat'] = DevChatProvider
_get_git_provider_old = git_providers.get_git_provider
git_providers.get_git_provider = get_git_provider
from pr_agent.config_loader import get_settings
from pr_agent.cli import run
# mock logging method, to redirect log to IDE
from pr_agent.log import setup_logger, inv_analytics_filter
from lib.ide_service import IDEService
class CustomOutput:
def __init__(self):
pass
def write(self, message):
IDEService().ide_logging("info", message.strip())
def flush(self):
pass
def close(self):
pass
log_level = os.environ.get("LOG_LEVEL", "INFO")
logger = setup_logger(log_level)
logger.remove(None)
logger.add(CustomOutput(), level=logging.INFO, format="{message}", colorize=False, filter=inv_analytics_filter)
from config_util import read_server_access_token_with_input, get_repo_type
from custom_suggestions_config import get_custom_suggestions_system_prompt
# set openai key and api base
get_settings().set("OPENAI.KEY", os.environ.get("OPENAI_API_KEY", ""))
get_settings().set("OPENAI.API_BASE", os.environ.get("OPENAI_API_BASE", ""))
get_settings().set("LLM.CUSTOM_LLM_PROVIDER", "openai")
# set github token
access_token = read_server_access_token_with_input(sys.argv[1])
if not access_token:
print("Command has been canceled.", flush=True)
sys.exit(0)
repo_type = get_repo_type(sys.argv[1])
if repo_type == "github":
get_settings().set("GITHUB.USER_TOKEN", access_token)
elif repo_type == "gitlab":
get_settings().set("GITLAB.PERSONAL_ACCESS_TOKEN", access_token)
else:
print("Unsupported git hosting service, input pr url is:", sys.argv[1], file=sys.stderr, flush=True)
sys.exit(1)
# USER TOKEN
# set git provider, default is devchat
# in devchat provider, we will create actual repo provider
# get_settings().set("CONFIG.GIT_PROVIDER", "devchat")
# set model
get_settings().set("CONFIG.MODEL", os.environ.get("LLM_MODEL", "gpt-3.5-turbo-1106"))
get_settings().set("CONFIG.MODEL_TURBO", os.environ.get("LLM_MODEL", "gpt-3.5-turbo-1106"))
get_settings().set("CONFIG.FALLBACK_MODELS", [os.environ.get("LLM_MODEL", "gpt-3.5-turbo-1106")])
# disable help text as default config
get_settings().set("PR_REVIEWER.ENABLE_HELP_TEXT", False)
get_settings().set("PR_DESCRIPTION.ENABLE_HELP_TEXT", False)
get_settings().set("PR_DESCRIPTION.ENABLE_HELP_COMMENT", False)
get_settings().set("PR_CODE_SUGGESTIONS.ENABLE_HELP_TEXT", False)
get_settings().set("PR_TEST.ENABLE_HELP_TEXT", False)
get_settings().set("CHECKS.ENABLE_HELP_TEXT", False)
# get_settings().set("PR_CODE_SUGGESTIONS.SUMMARIZE", False)
# handle custom suggestions command
if sys.argv[2] == "custom_suggestions":
get_settings().pr_code_suggestions_prompt.system = get_custom_suggestions_system_prompt()
sys.argv[2] = "improve"
# get current language config
language = IDEService().ide_language()
language_prompt = "\n\n输出内容使用中文输出。\n" if language == "zh" else ""
get_settings().pr_code_suggestions_prompt.system += language_prompt
get_settings().pr_review_prompt.system += language_prompt
get_settings().pr_description_prompt.system += language_prompt
#get_settings().pr_reviewer.inline_code_comments = True
# config for find similar issues
get_settings().set("PR_SIMILAR_ISSUE.VECTORDB", "lancedb")
get_settings().set("LANCEDB.URI", "data/lancedb")
# set git provider type, devchat provider will create actual repo provider based on this type
pr_provider_type = get_repo_type(sys.argv[1])
if not pr_provider_type:
print("Unsupported git hosting service, input pr url is:", sys.argv[1], file=sys.stderr, flush=True)
sys.exit(1)
get_settings().set("CONFIG.GIT_PROVIDER", pr_provider_type)
os.environ['CONFIG.GIT_PROVIDER_TYPE'] = pr_provider_type
# os.environ['ENABLE_PUBLISH_LABELS'] = "1"
if __name__ == '__main__':
sys.argv = [
sys.executable,
'--pr_url',
sys.argv[1].strip(),
sys.argv[2].strip()
]
run()

5
merico/pr/command.yml Normal file
View File

@ -0,0 +1,5 @@
description: "pr command"
workflow_python:
env_name: devchat-pr-env4
version: 3.11.0
dependencies: requirements.txt

105
merico/pr/config_util.py Normal file
View File

@ -0,0 +1,105 @@
import os
import json
import sys
from lib.chatmark import TextEditor
# 根据PR URL获取不同的仓库管理类型
# 支持的类型有github gitlab bitbucket bitbucket_server azure codecommit gerrit
def get_repo_type(url):
# 根据URL的特征判断仓库管理类型
if "github.com" in url:
return "github"
elif "gitlab.com" in url or "/gitlab/" in url:
return "gitlab"
elif "bitbucket.org" in url:
return "bitbucket"
elif "bitbucket-server" in url:
return "bitbucket_server"
elif "dev.azure.com" in url or "visualstudio.com" in url:
return "azure"
elif "codecommit" in url:
return "codecommit"
elif "gerrit" in url:
return "gerrit"
else:
return ""
def read_github_token():
config_path = os.path.join(os.path.expanduser("~/.chat"), ".workflow_config.json")
if os.path.exists(config_path):
with open(config_path, "r", encoding="utf-8") as f:
config_data = json.load(f)
if "github_token" in config_data:
return config_data["github_token"]
return ""
def read_server_access_token(repo_type):
config_path = os.path.join(os.path.expanduser("~/.chat"), ".workflow_config.json")
if os.path.exists(config_path):
with open(config_path, "r", encoding="utf-8") as f:
config_data = json.load(f)
if repo_type in config_data and "access_token" in config_data[repo_type]:
return config_data[repo_type]["access_token"]
return ""
def save_github_token(github_token):
config_path = os.path.join(os.path.expanduser("~/.chat"), ".workflow_config.json")
config_data = {}
if os.path.exists(config_path):
with open(config_path, "r", encoding="utf-8") as f:
config_data = json.load(f)
config_data["github_token"] = github_token
with open(config_path, "w+", encoding="utf-8") as f:
json.dump(config_data, f, indent=4)
def save_server_access_token(repo_type, access_token):
config_path = os.path.join(os.path.expanduser("~/.chat"), ".workflow_config.json")
config_data = {}
if os.path.exists(config_path):
with open(config_path, "r", encoding="utf-8") as f:
config_data = json.load(f)
if repo_type not in config_data:
config_data[repo_type] = {}
config_data[repo_type]["access_token"] = access_token
with open(config_path, "w+", encoding="utf-8") as f:
json.dump(config_data, f, indent=4)
def read_github_token_with_input():
github_token = read_github_token()
if not github_token:
# Input your github TOKEN to access github api:
github_token_editor = TextEditor(
"",
"Please input your github TOKEN to access:"
)
github_token = github_token_editor.new_text
if not github_token:
return github_token
save_github_token(github_token)
return github_token
def read_server_access_token_with_input(pr_url):
repo_type = get_repo_type(pr_url)
if not repo_type:
return ""
server_access_token = read_server_access_token(repo_type)
if not server_access_token:
# Input your server access TOKEN to access server api:
server_access_token_editor = TextEditor(
"",
f"Please input your {repo_type} access TOKEN to access:"
)
server_access_token_editor.render()
server_access_token = server_access_token_editor.new_text
if not server_access_token:
return server_access_token
save_server_access_token(repo_type, server_access_token)
return server_access_token

View File

@ -0,0 +1,6 @@
# pr.improve
**/pr.improve命令用于生成PR的代码建议。**
使用方式为:/pr.improve <PR_URL>, 例如:
/pr.improve https://github.com/devchat-ai/devchat/pull/301

View File

@ -0,0 +1,4 @@
description: "review pr"
input: required
steps:
- run: $workflow_python $command_path/../command.py "$input" custom_suggestions

View File

@ -0,0 +1,48 @@
import os
import json
from lib.chatmark import TextEditor
def read_custom_suggestions():
config_path = os.path.join(os.path.expanduser("~/.chat"), ".workflow_config.json")
if os.path.exists(config_path):
with open(config_path, "r", encoding="utf-8") as f:
config_data = json.load(f)
if "custom_suggestions" in config_data:
return config_data["custom_suggestions"]
return ""
def save_custom_suggestions(custom_suggestions):
config_path = os.path.join(os.path.expanduser("~/.chat"), ".workflow_config.json")
config_data = {}
if os.path.exists(config_path):
with open(config_path, "r", encoding="utf-8") as f:
config_data = json.load(f)
config_data["custom_suggestions"] = custom_suggestions
with open(config_path, "w+", encoding="utf-8") as f:
json.dump(config_data, f, indent=4)
def config_custom_suggestions_with():
custom_suggestions = read_custom_suggestions()
if not custom_suggestions:
custom_suggestions = "- make sure the code is efficient\n"
# Input your github TOKEN to access github api:
custom_suggestions_editor = TextEditor(
custom_suggestions,
"Please input your custom suggestions:"
)
custom_suggestions_editor.render()
custom_suggestions = custom_suggestions_editor.new_text
if not custom_suggestions:
return
save_custom_suggestions(custom_suggestions)
if __name__ == "__main__":
config_custom_suggestions_with()

View File

@ -0,0 +1,3 @@
description: "edit custom suggestions"
steps:
- run: $devchat_python $command_path/command.py

View File

@ -0,0 +1,4 @@
{
"sha": "91da857973f5684d0956fa6e623f14edc99adad5504632d6ee4ddac6ae501761",
"command_python": "/Users/admin/miniconda3/envs/pr_test2/bin/python"
}

View File

@ -0,0 +1,154 @@
import os
import json
import sys
from lib.chatmark import TextEditor
def read_custom_suggestions():
config_path = os.path.join(os.path.expanduser("~/.chat"), ".workflow_config.json")
if os.path.exists(config_path):
with open(config_path, "r", encoding="utf-8") as f:
config_data = json.load(f)
if "custom_suggestions" in config_data:
return config_data["custom_suggestions"]
return ""
def save_custom_suggestions(custom_suggestions):
config_path = os.path.join(os.path.expanduser("~/.chat"), ".workflow_config.json")
config_data = {}
if os.path.exists(config_path):
with open(config_path, "r", encoding="utf-8") as f:
config_data = json.load(f)
config_data["custom_suggestions"] = custom_suggestions
with open(config_path, "w+", encoding="utf-8") as f:
json.dump(config_data, f, indent=4)
def read_custom_suggestions_with_input():
custom_suggestions = read_custom_suggestions()
if not custom_suggestions:
# Input your github TOKEN to access github api:
custom_suggestions_editor = TextEditor(
"- make sure the code is efficient\n",
"Please input your custom suggestions:"
)
custom_suggestions_editor.render()
custom_suggestions = custom_suggestions_editor.new_text
if not custom_suggestions:
return custom_suggestions
save_custom_suggestions(custom_suggestions)
return custom_suggestions
def get_custom_suggestions_system_prompt():
custom_suggestions = read_custom_suggestions_with_input()
if not custom_suggestions:
print("Command has been canceled.", flush=True)
sys.exit(0)
system_prompt = """You are PR-Reviewer, a language model that specializes in suggesting ways to improve for a Pull Request (PR) code.
Your task is to provide meaningful and actionable code suggestions, to improve the new code presented in a PR diff.
The format we will use to present the PR code diff:
======
## file: 'src/file1.py'
@@ ... @@ def func1():
__new hunk__
12 code line1 that remained unchanged in the PR
13 +new hunk code line2 added in the PR
14 code line3 that remained unchanged in the PR
__old hunk__
code line1 that remained unchanged in the PR
-old hunk code line2 that was removed in the PR
code line3 that remained unchanged in the PR
@@ ... @@ def func2():
__new hunk__
...
__old hunk__
...
## file: 'src/file2.py'
...
======
- In this format, we separated each hunk of code to '__new hunk__' and '__old hunk__' sections. The '__new hunk__' section contains the new code of the chunk, and the '__old hunk__' section contains the old code that was removed.
- Code lines are prefixed symbols ('+', '-', ' '). The '+' symbol indicates new code added in the PR, the '-' symbol indicates code removed in the PR, and the ' ' symbol indicates unchanged code.
- We also added line numbers for the '__new hunk__' sections, to help you refer to the code lines in your suggestions. These line numbers are not part of the actual code, and are only used for reference.
Specific instructions for generating code suggestions:
- Provide up to {{ num_code_suggestions }} code suggestions. The suggestions should be diverse and insightful.
- The suggestions should focus on ways to improve the new code in the PR, meaning focusing on lines from '__new hunk__' sections, starting with '+'. Use the '__old hunk__' sections to understand the context of the code changes.
- Prioritize suggestions that address possible issues, major problems, and bugs in the PR code.
- Don't suggest to add docstring, type hints, or comments, or to remove unused imports.
- Suggestions should not repeat code already present in the '__new hunk__' sections.
- Provide the exact line numbers range (inclusive) for each suggestion. Use the line numbers from the '__new hunk__' sections.
- When quoting variables or names from the code, use backticks (`) instead of single quote (').
- Take into account that you are reviewing a PR code diff, and that the entire codebase is not available for you as context. Hence, avoid suggestions that might conflict with unseen parts of the codebase.
Instructions from the user, that should be taken into account with high priority:
""" + custom_suggestions + """
{%- if extra_instructions %}
Extra instructions from the user, that should be taken into account with high priority:
======
{{ extra_instructions }}
======
{%- endif %}
The output must be a YAML object equivalent to type $PRCodeSuggestions, according to the following Pydantic definitions:
=====
class CodeSuggestion(BaseModel):
relevant_file: str = Field(description="the relevant file full path")
language: str = Field(description="the code language of the relevant file")
suggestion_content: str = Field(description="an actionable suggestion for meaningfully improving the new code introduced in the PR")
existing_code: str = Field(description="a short code snippet from a '__new hunk__' section to illustrate the relevant existing code. Don't show the line numbers.")
improved_code: str = Field(description="a short code snippet to illustrate the improved code, after applying the suggestion.")
one_sentence_summary:str = Field(description="a short summary of the suggestion action, in a single sentence. Focus on the 'what'. Be general, and avoid method or variable names.")
relevant_lines_start: int = Field(description="The relevant line number, from a '__new hunk__' section, where the suggestion starts (inclusive). Should be derived from the hunk line numbers, and correspond to the 'existing code' snippet above")
relevant_lines_end: int = Field(description="The relevant line number, from a '__new hunk__' section, where the suggestion ends (inclusive). Should be derived from the hunk line numbers, and correspond to the 'existing code' snippet above")
label: str = Field(description="a single label for the suggestion, to help the user understand the suggestion type. For example: 'security', 'possible bug', 'possible issue', 'performance', 'enhancement', 'best practice', 'maintainability', etc. Other labels are also allowed")
class PRCodeSuggestions(BaseModel):
code_suggestions: List[CodeSuggestion]
=====
Example output:
```yaml
code_suggestions:
- relevant_file: |
src/file1.py
language: |
python
suggestion_content: |
...
existing_code: |
...
improved_code: |
...
one_sentence_summary: |
...
relevant_lines_start: 12
relevant_lines_end: 13
label: |
...
```
Each YAML output MUST be after a newline, indented, with block scalar indicator ('|').
"""
return system_prompt

View File

@ -0,0 +1,6 @@
# pr.describe
**/pr.describe命令用于生成PR描述。**
使用方式为:/pr.describe <PR_URL>, 例如:
/pr.describe https://github.com/devchat-ai/devchat/pull/301

View File

View File

@ -0,0 +1,4 @@
description: "review pr"
input: required
steps:
- run: $workflow_python $command_path/../command.py "$input" describe

View File

@ -0,0 +1,4 @@
{
"sha": "91da857973f5684d0956fa6e623f14edc99adad5504632d6ee4ddac6ae501761",
"command_python": "/Users/admin/miniconda3/envs/pr_test2/bin/python"
}

View File

@ -0,0 +1,6 @@
# pr.improve
**/pr.improve命令用于生成PR的代码建议。**
使用方式为:/pr.improve <PR_URL>, 例如:
/pr.improve https://github.com/devchat-ai/devchat/pull/301

View File

@ -0,0 +1,4 @@
description: "review pr"
input: required
steps:
- run: $workflow_python $command_path/../command.py "$input" improve

View File

@ -0,0 +1,4 @@
{
"sha": "91da857973f5684d0956fa6e623f14edc99adad5504632d6ee4ddac6ae501761",
"command_python": "/Users/admin/miniconda3/envs/pr_test2/bin/python"
}

View File

View File

@ -0,0 +1,253 @@
import os
import sys
import json
from typing import Optional, Tuple
from pr_agent.git_providers.git_provider import GitProvider, IncrementalPR
from pr_agent.git_providers.github_provider import GithubProvider
import pr_agent.git_providers as git_providers
from lib.chatmark import Form, TextEditor, Button
class DevChatProvider(GitProvider):
def __init__(self, pr_url: Optional[str] = None, incremental=IncrementalPR(False)):
# 根据某个状态创建正确的GitProvider
provider_type = os.environ.get('CONFIG.GIT_PROVIDER_TYPE')
self.provider: GitProvider = git_providers._GIT_PROVIDERS[provider_type](pr_url, incremental)
@property
def pr(self):
return self.provider.pr
@property
def diff_files(self):
return self.provider.diff_files
@property
def github_client(self):
return self.provider.github_client
def is_supported(self, capability: str) -> bool:
return self.provider.is_supported(capability)
def get_diff_files(self):
return self.provider.get_diff_files()
def need_edit(self):
button = Button(
[
"Commit",
"Edit"
],
)
button.render()
return 1 == button.clicked
def publish_description(self, pr_title: str, pr_body: str):
# Preview pr title and body
print(f"\n\nPR Title: {pr_title}", end="\n\n", flush=True)
print("PR Body:", end="\n\n", flush=True)
print(pr_body, end="\n\n", flush=True)
# Need Edit?
if self.need_edit():
# Edit pr title and body
title_editor = TextEditor(pr_title)
body_editor = TextEditor(pr_body)
form = Form(['Edit pr title:', title_editor, 'Edit pr body:', body_editor])
form.render()
pr_title = title_editor.new_text
pr_body = body_editor.new_text
if not pr_title or not pr_body:
print("Title or body is empty, please fill in the title and body.")
sys.exit(0)
return self.provider.publish_description(pr_title, pr_body)
def publish_code_suggestions(self, code_suggestions: list) -> bool:
code_suggestions_json_str = json.dumps(code_suggestions, indent=4)
code_suggestions_editor = TextEditor(
code_suggestions_json_str,
"Edit code suggestions in JSON format:"
)
code_suggestions_editor.render()
code_suggestions_json_new = code_suggestions_editor.new_text
if not code_suggestions_json_new:
print("Code suggestions are empty, please fill in the code suggestions.")
sys.exit(0)
code_suggestions = json.loads(code_suggestions_json_new)
return self.provider.publish_code_suggestions(code_suggestions)
def get_languages(self):
return self.provider.get_languages()
def get_pr_branch(self):
return self.provider.get_pr_branch()
def get_files(self):
return self.provider.get_files()
def get_user_id(self):
return self.provider.get_user_id()
def get_pr_description_full(self) -> str:
return self.provider.get_pr_description_full()
def edit_comment(self, comment, body: str):
if body.find("## PR Code Suggestions") == -1:
return self.provider.edit_comment(comment, body)
print(f"\n\n{body}", end="\n\n", flush=True)
if self.need_edit():
comment_editor = TextEditor(
body,
"Edit Comment:"
)
comment_editor.render()
body = comment_editor.new_text
if not body:
print("Comment is empty, please fill in the comment.")
sys.exit(0)
return self.provider.edit_comment(comment, body)
def reply_to_comment_from_comment_id(self, comment_id: int, body: str):
return self.provider.reply_to_comment_from_comment_id(comment_id, body)
def get_pr_description(self, *, full: bool = True) -> str:
return self.provider.get_pr_description(full=full)
def get_user_description(self) -> str:
return self.provider.get_user_description()
def _possible_headers(self):
return self.provider._possible_headers()
def _is_generated_by_pr_agent(self, description_lowercase: str) -> bool:
return self.provider._is_generated_by_pr_agent(description_lowercase)
def get_repo_settings(self):
return self.provider.get_repo_settings()
def get_pr_id(self):
return self.provider.get_pr_id()
def get_line_link(self, relevant_file: str, relevant_line_start: int, relevant_line_end: int = None) -> str:
return self.provider.get_line_link(relevant_file, relevant_line_start, relevant_line_end)
#### comments operations ####
def publish_comment(self, pr_comment: str, is_temporary: bool = False):
if is_temporary:
return None
if pr_comment.find("## Generating PR code suggestions") != -1:
return None
if (not is_temporary and \
pr_comment.find("## Generating PR code suggestions") == -1 and \
pr_comment.find("**[PR Description]") == -1):
print(f"\n\n{pr_comment}", end="\n\n", flush=True)
if self.need_edit():
pr_comment_editor = TextEditor(
pr_comment
)
form = Form(['Edit pr comment:', pr_comment_editor])
form.render()
pr_comment = pr_comment_editor.new_text
if not pr_comment:
print("Comment is empty, please fill in the comment.")
sys.exit(0)
return self.provider.publish_comment(pr_comment, is_temporary=is_temporary)
def publish_persistent_comment(self, pr_comment: str,
initial_header: str,
update_header: bool = True,
name='review',
final_update_message=True):
print(f"\n\n{initial_header}", end="\n\n", flush=True)
print(pr_comment, end="\n\n", flush=True)
if self.need_edit():
pr_comment_editor = TextEditor(
pr_comment
)
form = Form(['Edit pr comment:', pr_comment_editor])
form.render()
pr_comment = pr_comment_editor.new_text
if not pr_comment:
print("Comment is empty, please fill in the comment.")
sys.exit(0)
return self.provider.publish_persistent_comment(pr_comment, initial_header, update_header, name, final_update_message)
def publish_inline_comment(self, body: str, relevant_file: str, relevant_line_in_file: str):
return self.provider.publish_inline_comment(body, relevant_file, relevant_line_in_file)
def create_inline_comment(self, body: str, relevant_file: str, relevant_line_in_file: str,
absolute_position: int = None):
return self.provider.create_inline_comment(body, relevant_file, relevant_line_in_file, absolute_position)
def publish_inline_comments(self, comments: list[dict]):
return self.provider.publish_inline_comments(comments)
def remove_initial_comment(self):
return self.provider.remove_initial_comment()
def remove_comment(self, comment):
return self.provider.remove_comment(comment)
def get_issue_comments(self):
return self.provider.get_issue_comments()
def get_comment_url(self, comment) -> str:
return self.provider.get_comment_url(comment)
#### labels operations ####
def publish_labels(self, labels):
if not os.environ.get('ENABLE_PUBLISH_LABELS', None):
return None
return self.provider.publish_labels(labels)
def get_pr_labels(self, update=False):
return self.provider.get_pr_labels(update=update)
def get_repo_labels(self):
return self.provider.get_repo_labels()
def add_eyes_reaction(self, issue_comment_id: int, disable_eyes: bool = False) -> Optional[int]:
return self.provider.add_eyes_reaction(issue_comment_id, disable_eyes=disable_eyes)
def remove_reaction(self, issue_comment_id: int, reaction_id: int) -> bool:
return self.provider.remove_reaction(issue_comment_id, reaction_id)
#### commits operations ####
def get_commit_messages(self):
return self.provider.get_commit_messages()
def get_pr_url(self) -> str:
return self.provider.get_pr_url()
def get_latest_commit_url(self) -> str:
return self.provider.get_latest_commit_url()
def auto_approve(self) -> bool:
return self.provider.auto_approve()
def calc_pr_statistics(self, pull_request_data: dict):
return self.provider.calc_pr_statistics(pull_request_data)
def get_num_of_files(self):
return self.provider.get_num_of_files()
@staticmethod
def _parse_issue_url(issue_url: str) -> Tuple[str, int]:
return GithubProvider._parse_issue_url(issue_url)

View File

@ -0,0 +1,2 @@
git+https://gitee.com/imlaji/pr-agent.git@main
git+https://gitee.com/devchat-ai/devchat.git@main

View File

@ -0,0 +1,6 @@
# pr.review
**/pr.review命令用于生成PR代码评审描述。**
使用方式为:/pr.review <PR_URL>, 例如:
/pr.review https://github.com/devchat-ai/devchat/pull/301

View File

@ -0,0 +1,4 @@
description: "review pr"
input: required
steps:
- run: $workflow_python $command_path/../command.py "$input" review

View File

@ -0,0 +1,4 @@
{
"sha": "91da857973f5684d0956fa6e623f14edc99adad5504632d6ee4ddac6ae501761",
"command_python": "/Users/admin/miniconda3/envs/pr_test2/bin/python"
}