Merge pull request #109 from devchat-ai/fix-format

Fix format issues in /pr & /github
This commit is contained in:
boob.yang 2024-05-19 07:51:19 +00:00 committed by GitHub
commit ccf6ba3d1a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 417 additions and 268 deletions

View File

@ -8,15 +8,16 @@ setup-dev:
@pip install -r requirements-dev.txt @pip install -r requirements-dev.txt
@echo "Done!" @echo "Done!"
T="."
check: check:
@echo ${div} @echo ${div}
ruff check . ruff check $(T)
ruff format . --check ruff format $(T) --check
@echo "Done!" @echo "Done!"
fix: fix:
@echo ${div} @echo ${div}
ruff format . ruff format $(T)
@echo ${div} @echo ${div}
ruff check . --fix ruff check $(T) --fix
@echo "Done!" @echo "Done!"

View File

@ -1,25 +1,36 @@
import json import json
import sys
import os import os
import sys
from devchat.llm import chat_json from devchat.llm import chat_json
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), "..")) sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), ".."))
from common_util import ui_edit, assert_exit # noqa: E402 from common_util import assert_exit, ui_edit # noqa: E402
from git_api import auto_push, check_git_installed, create_pull_request, get_commit_messages, get_current_branch, get_github_repo, get_issue_info, is_issue_url, read_issue_by_url from git_api import ( # noqa: E402
check_git_installed,
get_current_branch,
get_github_repo,
get_issue_info,
is_issue_url,
read_issue_by_url,
)
def extract_issue_id(branch_name): def extract_issue_id(branch_name):
if "#" in branch_name: if "#" in branch_name:
return branch_name.split("#")[-1] return branch_name.split("#")[-1]
return None return None
# Function to generate a random branch name # Function to generate a random branch name
PROMPT = ( PROMPT = (
"You are a coding engineer, required to summarize the ISSUE description into a coding task description of no more than 50 words. \n" "You are a coding engineer, required to summarize the ISSUE description into a coding task description of no more than 50 words. \n" # noqa: E501
"The ISSUE description is as follows: {issue_body}, please summarize the corresponding coding task description.\n" "The ISSUE description is as follows: {issue_body}, please summarize the corresponding coding task description.\n" # noqa: E501
"The coding task description should be output in JSON format, in the form of: {{\"summary\": \"code task summary\"}}\n" 'The coding task description should be output in JSON format, in the form of: {{"summary": "code task summary"}}\n' # noqa: E501
) )
@chat_json(prompt=PROMPT, model="gpt-4-1106-preview") @chat_json(prompt=PROMPT, model="gpt-4-1106-preview")
def generate_code_task_summary(issue_body): def generate_code_task_summary(issue_body):
pass pass
@ -34,45 +45,56 @@ def get_issue_or_task(task):
if is_issue_url(task): if is_issue_url(task):
issue = read_issue_by_url(task.strip()) issue = read_issue_by_url(task.strip())
assert_exit(not issue, "Failed to read issue.", exit_code=-1) assert_exit(not issue, "Failed to read issue.", exit_code=-1)
return json.dumps({"id": issue["number"], "title": issue["title"], "body": issue["body"]}) return json.dumps({"id": issue["number"], "title": issue["title"], "body": issue["body"]})
else: else:
return task return task
def get_issue_json(issue_id, task): def get_issue_json(issue_id, task):
issue = {"id": "no issue id", "title": "", "body": task} issue = {"id": "no issue id", "title": "", "body": task}
if issue_id: if issue_id:
issue = get_issue_info(issue_id) issue = get_issue_info(issue_id)
assert_exit(not issue, "Failed to retrieve issue with ID: {issue_id}", exit_code=-1) assert_exit(not issue, "Failed to retrieve issue with ID: {issue_id}", exit_code=-1)
issue = {"id": issue_id, "html_url": issue["html_url"], "title": issue["title"], "body": issue["body"]} issue = {
"id": issue_id,
"html_url": issue["html_url"],
"title": issue["title"],
"body": issue["body"],
}
return issue return issue
# Main function # Main function
def main(): def main():
print("Start update code task summary ...", end="\n\n", flush=True) print("Start update code task summary ...", end="\n\n", flush=True)
is_git_installed = check_git_installed() is_git_installed = check_git_installed()
assert_exit(not is_git_installed, "Git is not installed.", exit_code=-1) assert_exit(not is_git_installed, "Git is not installed.", exit_code=-1)
task = sys.argv[1] task = sys.argv[1]
repo_name = get_github_repo() repo_name = get_github_repo()
branch_name = get_current_branch() branch_name = get_current_branch()
issue_id = extract_issue_id(branch_name) issue_id = extract_issue_id(branch_name)
# print basic info, repo_name, branch_name, issue_id # print basic info, repo_name, branch_name, issue_id
print("repo name:", repo_name, end="\n\n") print("repo name:", repo_name, end="\n\n")
print("branch name:", branch_name, end="\n\n") print("branch name:", branch_name, end="\n\n")
print("issue id:", issue_id, end="\n\n") print("issue id:", issue_id, end="\n\n")
issue = get_issue_json(issue_id, task) issue = get_issue_json(issue_id, task)
assert_exit(not issue["body"], "Failed to retrieve issue with ID: {issue_id}", exit_code=-1) assert_exit(not issue["body"], "Failed to retrieve issue with ID: {issue_id}", exit_code=-1)
# Generate 5 branch names # Generate 5 branch names
print("Generating code task summary ...", end="\n\n", flush=True) print("Generating code task summary ...", end="\n\n", flush=True)
code_task_summary = generate_code_task_summary(issue_body = issue["body"]) code_task_summary = generate_code_task_summary(issue_body=issue["body"])
assert_exit(not code_task_summary, "Failed to generate code task summary.", exit_code=-1) assert_exit(not code_task_summary, "Failed to generate code task summary.", exit_code=-1)
assert_exit(not code_task_summary.get("summary", None), "Failed to generate code task summary, missing summary field in result.", exit_code=-1) assert_exit(
not code_task_summary.get("summary", None),
"Failed to generate code task summary, missing summary field in result.",
exit_code=-1,
)
code_task_summary = code_task_summary["summary"] code_task_summary = code_task_summary["summary"]
# Select branch name # Select branch name
@ -81,7 +103,7 @@ def main():
code_task_summary = code_task_summary[0] code_task_summary = code_task_summary[0]
# create and checkout branch # create and checkout branch
print(f"Updating code task summary to config:") print("Updating code task summary to config:")
config_file = os.path.join(".chat", "complete.config") config_file = os.path.join(".chat", "complete.config")
if os.path.exists(config_file): if os.path.exists(config_file):
with open(config_file, "r") as f: with open(config_file, "r") as f:
@ -93,6 +115,6 @@ def main():
json.dump(config, f, indent=4) json.dump(config, f, indent=4)
print("Code task summary has updated") print("Code task summary has updated")
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@ -4,16 +4,16 @@ import re
import subprocess import subprocess
import sys import sys
from devchat.llm import chat_completion_stream
from lib.chatmark import Checkbox, Form, TextEditor from lib.chatmark import Checkbox, Form, TextEditor
from lib.ide_service import IDEService from lib.ide_service import IDEService
from devchat.llm import chat_completion_stream
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), "..")) sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), ".."))
from common_util import assert_exit # noqa: E402 from common_util import assert_exit # noqa: E402
from git_api import get_issue_info from git_api import get_issue_info
diff_too_large_message_en = ( diff_too_large_message_en = (
"Commit failed. The modified content is too long " "Commit failed. The modified content is too long "
"and exceeds the model's length limit. " "and exceeds the model's length limit. "
@ -264,9 +264,11 @@ def generate_commit_message_base_diff(user_input, diff, issue):
""" """
global language global language
language_prompt = "You must response commit message in chinese。\n" if language == "zh" else "" language_prompt = "You must response commit message in chinese。\n" if language == "zh" else ""
prompt = PROMPT_COMMIT_MESSAGE_BY_DIFF_USER_INPUT.replace("{__DIFF__}", f"{diff}").replace( prompt = (
"{__USER_INPUT__}", f"{user_input + language_prompt}" PROMPT_COMMIT_MESSAGE_BY_DIFF_USER_INPUT.replace("{__DIFF__}", f"{diff}")
).replace("{__ISSUE__}", f"{issue}") .replace("{__USER_INPUT__}", f"{user_input + language_prompt}")
.replace("{__ISSUE__}", f"{issue}")
)
model_token_limit_error = ( model_token_limit_error = (
diff_too_large_message_en if language == "en" else diff_too_large_message_zh diff_too_large_message_en if language == "en" else diff_too_large_message_zh
@ -322,7 +324,12 @@ def get_issue_json(issue_id):
if issue_id: if issue_id:
issue = get_issue_info(issue_id) issue = get_issue_info(issue_id)
assert_exit(not issue, "Failed to retrieve issue with ID: {issue_id}", exit_code=-1) assert_exit(not issue, "Failed to retrieve issue with ID: {issue_id}", exit_code=-1)
issue = {"id": issue_id, "html_url": issue["html_url"], "title": issue["title"], "body": issue["body"]} issue = {
"id": issue_id,
"html_url": issue["html_url"],
"title": issue["title"],
"body": issue["body"],
}
return issue return issue
@ -392,8 +399,6 @@ def main():
if branch_name: if branch_name:
user_input += "\ncurrent repo branch name is:" + branch_name user_input += "\ncurrent repo branch name is:" + branch_name
commit_message = generate_commit_message_base_diff(user_input, diff, issue) commit_message = generate_commit_message_base_diff(user_input, diff, issue)
# TODO # TODO
# remove Closes #IssueNumber in commit message # remove Closes #IssueNumber in commit message

View File

@ -1,10 +1,7 @@
import functools import functools
import sys import sys
import os
from lib.chatmark import Checkbox, Form, Radio, TextEditor
from lib.chatmark import TextEditor, Form, Radio, Checkbox
def create_ui_objs(ui_decls, args): def create_ui_objs(ui_decls, args):
@ -24,7 +21,7 @@ def edit_form(uis, args):
ui_objs, editors = create_ui_objs(uis, args) ui_objs, editors = create_ui_objs(uis, args)
form = Form(editors) form = Form(editors)
form.render() form.render()
values = [] values = []
for obj in ui_objs: for obj in ui_objs:
if isinstance(obj, TextEditor): if isinstance(obj, TextEditor):
@ -35,7 +32,7 @@ def edit_form(uis, args):
# TODO # TODO
pass pass
return values return values
def editor(description): def editor(description):
def decorator_edit(func): def decorator_edit(func):
@ -43,39 +40,39 @@ def editor(description):
def wrapper(*args, **kwargs): def wrapper(*args, **kwargs):
uis = wrapper.uis[::-1] uis = wrapper.uis[::-1]
return edit_form(uis, args) return edit_form(uis, args)
if hasattr(func, "uis"): if hasattr(func, "uis"):
wrapper.uis = func.uis wrapper.uis = func.uis
else: else:
wrapper.uis = [] wrapper.uis = []
wrapper.uis.append((TextEditor, description)) wrapper.uis.append((TextEditor, description))
return wrapper return wrapper
return decorator_edit return decorator_edit
def ui_edit(ui_type, description): def ui_edit(ui_type, description):
def decorator_edit(func): def decorator_edit(func):
@functools.wraps(func) @functools.wraps(func)
def wrapper(*args, **kwargs): def wrapper(*args, **kwargs):
uis = wrapper.uis[::-1] uis = wrapper.uis[::-1]
return edit_form(uis, args) return edit_form(uis, args)
if hasattr(func, "uis"): if hasattr(func, "uis"):
wrapper.uis = func.uis wrapper.uis = func.uis
else: else:
wrapper.uis = [] wrapper.uis = []
ui_type_class = { ui_type_class = {"editor": TextEditor, "radio": Radio, "checkbox": Checkbox}[ui_type]
"editor": TextEditor,
"radio": Radio,
"checkbox": Checkbox
}[ui_type]
wrapper.uis.append((ui_type_class, description)) wrapper.uis.append((ui_type_class, description))
return wrapper return wrapper
return decorator_edit return decorator_edit
def assert_exit(condition, message, exit_code = -1):
def assert_exit(condition, message, exit_code=-1):
if condition: if condition:
if exit_code == 0: if exit_code == 0:
print(message, end="\n\n", flush=True) print(message, end="\n\n", flush=True)
else: else:
print(message, end="\n\n", file=sys.stderr, flush=True) print(message, end="\n\n", file=sys.stderr, flush=True)
sys.exit(exit_code) sys.exit(exit_code)

View File

@ -1,6 +1,6 @@
import sys
import json import json
import os import os
import sys
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), "..")) sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), ".."))
@ -16,20 +16,22 @@ def read_issue_url():
return config_data["issue_repo"] return config_data["issue_repo"]
return "" return ""
def save_issue_url(issue_url): def save_issue_url(issue_url):
config_path = os.path.join(os.getcwd(), ".chat", ".workflow_config.json") config_path = os.path.join(os.getcwd(), ".chat", ".workflow_config.json")
# make dirs # make dirs
os.makedirs(os.path.dirname(config_path), exist_ok=True) os.makedirs(os.path.dirname(config_path), exist_ok=True)
config_data = {} config_data = {}
if os.path.exists(config_path): if os.path.exists(config_path):
with open(config_path, "r", encoding="utf-8") as f: with open(config_path, "r", encoding="utf-8") as f:
config_data = json.load(f) config_data = json.load(f)
config_data["issue_repo"] = issue_url config_data["issue_repo"] = issue_url
with open(config_path, "w+", encoding="utf-8") as f: with open(config_path, "w+", encoding="utf-8") as f:
json.dump(config_data, f, indent=4) json.dump(config_data, f, indent=4)
def read_github_token(): def read_github_token():
config_path = os.path.join(os.path.expanduser("~/.chat"), ".workflow_config.json") config_path = os.path.join(os.path.expanduser("~/.chat"), ".workflow_config.json")
if os.path.exists(config_path): if os.path.exists(config_path):
@ -39,6 +41,7 @@ def read_github_token():
return config_data["github_token"] return config_data["github_token"]
return "" return ""
def save_github_token(github_token): def save_github_token(github_token):
config_path = os.path.join(os.path.expanduser("~/.chat"), ".workflow_config.json") config_path = os.path.join(os.path.expanduser("~/.chat"), ".workflow_config.json")
@ -46,15 +49,17 @@ def save_github_token(github_token):
if os.path.exists(config_path): if os.path.exists(config_path):
with open(config_path, "r", encoding="utf-8") as f: with open(config_path, "r", encoding="utf-8") as f:
config_data = json.load(f) config_data = json.load(f)
config_data["github_token"] = github_token config_data["github_token"] = github_token
with open(config_path, "w+", encoding="utf-8") as f: with open(config_path, "w+", encoding="utf-8") as f:
json.dump(config_data, f, indent=4) json.dump(config_data, f, indent=4)
@editor("Please specify the issue's repository, " @editor(
"If the issue is within this repository, no need to specify. " "Please specify the issue's repository, "
"Otherwise, format as: username/repository-name") "If the issue is within this repository, no need to specify. "
"Otherwise, format as: username/repository-name"
)
@editor("Input your github TOKEN to access github api:") @editor("Input your github TOKEN to access github api:")
def edit_issue(issue_url, github_token): def edit_issue(issue_url, github_token):
pass pass

View File

@ -1,10 +1,10 @@
import subprocess
import requests
import time
import json import json
import os import os
import subprocess
import sys import sys
import time
import requests
from lib.chatmark import TextEditor from lib.chatmark import TextEditor
@ -18,10 +18,7 @@ def read_github_token():
return config_data["github_token"] return config_data["github_token"]
# ask user to input github token # ask user to input github token
server_access_token_editor = TextEditor( server_access_token_editor = TextEditor("", "Please input your GITHUB access TOKEN to access:")
"",
f"Please input your GITHUB access TOKEN to access:"
)
server_access_token_editor.render() server_access_token_editor.render()
server_access_token = server_access_token_editor.new_text server_access_token = server_access_token_editor.new_text
@ -54,10 +51,11 @@ def create_issue(title, body):
print(f"Failed to create issue: {response.content}", file=sys.stderr, end="\n\n") print(f"Failed to create issue: {response.content}", file=sys.stderr, end="\n\n")
return None return None
def update_issue_body(issue_url, issue_body): def update_issue_body(issue_url, issue_body):
""" """
Update the body text of a GitHub issue. Update the body text of a GitHub issue.
:param issue_url: The API URL of the issue to update. :param issue_url: The API URL of the issue to update.
:param issue_body: The new body text for the issue. :param issue_body: The new body text for the issue.
""" """
@ -69,11 +67,10 @@ def update_issue_body(issue_url, issue_body):
"body": issue_body, "body": issue_body,
} }
issue_api_url = f"https://api.github.com/repos/{get_github_repo(True)}/issues" issue_api_url = f"https://api.github.com/repos/{get_github_repo(True)}/issues"
api_url = f"{issue_api_url}/{issue_url.split('/')[-1]}" api_url = f"{issue_api_url}/{issue_url.split('/')[-1]}"
response = requests.patch(api_url, headers=headers, data=json.dumps(data)) response = requests.patch(api_url, headers=headers, data=json.dumps(data))
if response.status_code == 200: if response.status_code == 200:
print("Issue updated successfully!") print("Issue updated successfully!")
return response.json() return response.json()
@ -81,6 +78,7 @@ def update_issue_body(issue_url, issue_body):
print(f"Failed to update issue: {response.status_code}") print(f"Failed to update issue: {response.status_code}")
return None return None
# parse sub tasks in issue body # parse sub tasks in issue body
def parse_sub_tasks(body): def parse_sub_tasks(body):
sub_tasks = [] sub_tasks = []
@ -90,31 +88,31 @@ def parse_sub_tasks(body):
sub_tasks.append(line[2:]) sub_tasks.append(line[2:])
return sub_tasks return sub_tasks
def update_sub_tasks(body, tasks): def update_sub_tasks(body, tasks):
# remove all existing tasks # remove all existing tasks
lines = body.split("\n") lines = body.split("\n")
updated_body = "\n".join(line for line in lines if not line.startswith("- [")) updated_body = "\n".join(line for line in lines if not line.startswith("- ["))
# add new tasks # add new tasks
updated_body += "\n" + "\n".join(f"- {task}" for task in tasks) updated_body += "\n" + "\n".join(f"- {task}" for task in tasks)
return updated_body return updated_body
def update_task_issue_url(body, task, issue_url): def update_task_issue_url(body, task, issue_url):
# task is like: # task is like:
#[ ] task name # [ ] task name
#[x] task name # [x] task name
# replace task name with issue url, like: # replace task name with issue url, like:
#[ ] [task name](url) # [ ] [task name](url)
#[x] [task name](url) # [x] [task name](url)
if task.find('] ') == -1: if task.find("] ") == -1:
return None return None
task = task[task.find('] ')+2 : ] task = task[task.find("] ") + 2 :]
return body.replace(task, f"[{task}]({issue_url})") return body.replace(task, f"[{task}]({issue_url})")
def check_git_installed(): def check_git_installed():
""" """
Check if Git is installed on the local machine. Check if Git is installed on the local machine.
@ -133,6 +131,7 @@ def check_git_installed():
print("Git is not installed on your system.") print("Git is not installed on your system.")
return False return False
def create_and_checkout_branch(branch_name): def create_and_checkout_branch(branch_name):
subprocess.run(["git", "checkout", "-b", branch_name], check=True) subprocess.run(["git", "checkout", "-b", branch_name], check=True)
@ -169,7 +168,13 @@ def get_github_repo(issue_repo=False):
with open(config_path, "r", encoding="utf-8") as f: with open(config_path, "r", encoding="utf-8") as f:
config_data = json.load(f) config_data = json.load(f)
if "issue_repo" in config_data: if "issue_repo" in config_data:
print("current issue repo:", config_data["issue_repo"], end="\n\n", file=sys.stderr, flush=True) print(
"current issue repo:",
config_data["issue_repo"],
end="\n\n",
file=sys.stderr,
flush=True,
)
return config_data["issue_repo"] return config_data["issue_repo"]
# 使用git命令获取当前仓库的URL # 使用git命令获取当前仓库的URL
@ -194,6 +199,7 @@ def get_github_repo(issue_repo=False):
print("==> File not found...") print("==> File not found...")
return None return None
# 获取当前分支名称 # 获取当前分支名称
def get_current_branch(): def get_current_branch():
try: try:
@ -229,7 +235,8 @@ def get_parent_branch():
return None return None
# 使用git命令获取父分支的名称 # 使用git命令获取父分支的名称
result = subprocess.check_output( result = subprocess.check_output(
["git", "name-rev", "--name-only", "--exclude=tags/*", parent_branch_ref], stderr=subprocess.STDOUT ["git", "name-rev", "--name-only", "--exclude=tags/*", parent_branch_ref],
stderr=subprocess.STDOUT,
).strip() ).strip()
parent_branch_name = result.decode("utf-8") parent_branch_name = result.decode("utf-8")
return parent_branch_name return parent_branch_name
@ -260,12 +267,14 @@ def get_issue_info(issue_id):
else: else:
return None return None
def get_issue_info_by_url(issue_url): def get_issue_info_by_url(issue_url):
# get issue id from issue_url # get issue id from issue_url
def get_issue_id(issue_url): def get_issue_id(issue_url):
# Extract the issue id from the issue_url # Extract the issue id from the issue_url
issue_id = issue_url.split("/")[-1] issue_id = issue_url.split("/")[-1]
return issue_id return issue_id
return get_issue_info(get_issue_id(issue_url)) return get_issue_info(get_issue_id(issue_url))
@ -315,7 +324,6 @@ def create_pull_request(title, body, head, base, repo_name):
return None return None
def run_command_with_retries(command, retries=3, delay=5): def run_command_with_retries(command, retries=3, delay=5):
for attempt in range(retries): for attempt in range(retries):
try: try:
@ -341,12 +349,17 @@ def check_unpushed_commits():
print(f"Error checking for unpushed commits: {e}") print(f"Error checking for unpushed commits: {e}")
return True return True
def auto_push(): def auto_push():
# 获取当前分支名 # 获取当前分支名
if not check_unpushed_commits(): if not check_unpushed_commits():
return True return True
try: try:
branch = subprocess.check_output(["git", "rev-parse", "--abbrev-ref", "HEAD"]).strip().decode("utf-8") branch = (
subprocess.check_output(["git", "rev-parse", "--abbrev-ref", "HEAD"])
.strip()
.decode("utf-8")
)
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
print(f"Error getting current branch: {e}") print(f"Error getting current branch: {e}")
return False return False
@ -366,21 +379,25 @@ def auto_push():
def get_recently_pr(repo): def get_recently_pr(repo):
url = f"{GITHUB_API_URL}/repos/{repo}/pulls?state=open&sort=updated" url = f"{GITHUB_API_URL}/repos/{repo}/pulls?state=open&sort=updated"
headers = {"Authorization": f"token {GITHUB_ACCESS_TOKEN}", "Accept": "application/vnd.github.v3+json"} headers = {
"Authorization": f"token {GITHUB_ACCESS_TOKEN}",
"Accept": "application/vnd.github.v3+json",
}
response = requests.get(url, headers=headers) response = requests.get(url, headers=headers)
print("=>:", url) print("=>:", url)
branch_name = get_current_branch() branch_name = get_current_branch()
if response.status_code == 200: if response.status_code == 200:
prs = response.json() prs = response.json()
for pr in prs: for pr in prs:
if pr['head']['ref'] == branch_name: if pr["head"]["ref"] == branch_name:
return pr return pr
return None return None
else: else:
return None return None
def update_pr(pr_number, title, body, repo_name): def update_pr(pr_number, title, body, repo_name):
url = f"{GITHUB_API_URL}/repos/{repo_name}/pulls/{pr_number}" url = f"{GITHUB_API_URL}/repos/{repo_name}/pulls/{pr_number}"
headers = {"Authorization": f"token {GITHUB_ACCESS_TOKEN}", "Content-Type": "application/json"} headers = {"Authorization": f"token {GITHUB_ACCESS_TOKEN}", "Content-Type": "application/json"}
@ -393,7 +410,3 @@ def update_pr(pr_number, title, body, repo_name):
else: else:
print("Failed to update PR.") print("Failed to update PR.")
return None return None

View File

@ -1,25 +1,27 @@
import sys
import os import os
import sys
from devchat.llm import chat_json from devchat.llm import chat_json
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), "..")) sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), ".."))
from common_util import assert_exit, editor # noqa: E402
from git_api import create_issue # noqa: E402 from git_api import create_issue # noqa: E402
from common_util import editor, assert_exit # noqa: E402
# Function to generate issue title and body using LLM # Function to generate issue title and body using LLM
PROMPT = ("Based on the following description, " PROMPT = (
"suggest a title and a detailed body for a GitHub issue:\n\n" "Based on the following description, "
"Description: {description}\n\n" "suggest a title and a detailed body for a GitHub issue:\n\n"
"Output format: {{\"title\": \"<title>\", \"body\": \"<body>\"}} ") "Description: {description}\n\n"
'Output format: {{"title": "<title>", "body": "<body>"}} '
)
@chat_json(prompt=PROMPT) @chat_json(prompt=PROMPT)
def generate_issue_content(description): def generate_issue_content(description):
pass pass
@editor("Edit issue title:") @editor("Edit issue title:")
@editor("Edit issue body:") @editor("Edit issue body:")
def edit_issue(title, body): def edit_issue(title, body):
@ -29,14 +31,14 @@ def edit_issue(title, body):
# Main function # Main function
def main(): def main():
print("start new_issue ...", end="\n\n", flush=True) print("start new_issue ...", end="\n\n", flush=True)
assert_exit(len(sys.argv) < 2, "Missing argument.", exit_code=-1) assert_exit(len(sys.argv) < 2, "Missing argument.", exit_code=-1)
description = sys.argv[1] description = sys.argv[1]
print("Generating issue content ...", end="\n\n", flush=True) print("Generating issue content ...", end="\n\n", flush=True)
issue_json_ob = generate_issue_content(description=description) issue_json_ob = generate_issue_content(description=description)
assert_exit(not issue_json_ob, "Failed to generate issue content.", exit_code=-1) assert_exit(not issue_json_ob, "Failed to generate issue content.", exit_code=-1)
issue_title, issue_body = edit_issue(issue_json_ob["title"], issue_json_ob["body"]) issue_title, issue_body = edit_issue(issue_json_ob["title"], issue_json_ob["body"])
assert_exit(not issue_title, "Issue creation cancelled.", exit_code=0) assert_exit(not issue_title, "Issue creation cancelled.", exit_code=0)
print("New Issue:", issue_title, "body:", issue_body, end="\n\n", flush=True) print("New Issue:", issue_title, "body:", issue_body, end="\n\n", flush=True)
@ -48,4 +50,4 @@ def main():
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@ -1,14 +1,18 @@
import json import json
import sys
import os import os
import sys
from devchat.llm import chat_json from devchat.llm import chat_json
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), "..")) sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), ".."))
from common_util import ui_edit, assert_exit # noqa: E402 from common_util import assert_exit, ui_edit # noqa: E402
from git_api import check_git_installed, create_and_checkout_branch, is_issue_url, read_issue_by_url # noqa: E402 from git_api import ( # noqa: E402
check_git_installed,
create_and_checkout_branch,
is_issue_url,
read_issue_by_url,
)
# Function to generate a random branch name # Function to generate a random branch name
PROMPT = ( PROMPT = (
@ -20,6 +24,8 @@ PROMPT = (
"the final result is output in JSON format, " "the final result is output in JSON format, "
'as follows: {{"names":["name1", "name2", .. "name5"]}}\n' 'as follows: {{"names":["name1", "name2", .. "name5"]}}\n'
) )
@chat_json(prompt=PROMPT, model="gpt-4-1106-preview") @chat_json(prompt=PROMPT, model="gpt-4-1106-preview")
def generate_branch_name(task): def generate_branch_name(task):
pass pass
@ -29,6 +35,7 @@ def generate_branch_name(task):
def select_branch_name_ui(branch_names): def select_branch_name_ui(branch_names):
pass pass
def select_branch_name(branch_names): def select_branch_name(branch_names):
[branch_selection] = select_branch_name_ui(branch_names) [branch_selection] = select_branch_name_ui(branch_names)
assert_exit(branch_selection is None, "No branch selected.", exit_code=0) assert_exit(branch_selection is None, "No branch selected.", exit_code=0)
@ -39,7 +46,7 @@ def get_issue_or_task(task):
if is_issue_url(task): if is_issue_url(task):
issue = read_issue_by_url(task.strip()) issue = read_issue_by_url(task.strip())
assert_exit(not issue, "Failed to read issue.", exit_code=-1) assert_exit(not issue, "Failed to read issue.", exit_code=-1)
return json.dumps({"id": issue["number"], "title": issue["title"], "body": issue["body"]}) return json.dumps({"id": issue["number"], "title": issue["title"], "body": issue["body"]})
else: else:
return task return task
@ -48,19 +55,23 @@ def get_issue_or_task(task):
# Main function # Main function
def main(): def main():
print("Start create branch ...", end="\n\n", flush=True) print("Start create branch ...", end="\n\n", flush=True)
is_git_installed = check_git_installed() is_git_installed = check_git_installed()
assert_exit(not is_git_installed, "Git is not installed.", exit_code=-1) assert_exit(not is_git_installed, "Git is not installed.", exit_code=-1)
task = sys.argv[1] task = sys.argv[1]
assert_exit(not task, "You need input something about the new branch, or input a issue url.", exit_code=-1) assert_exit(
not task,
"You need input something about the new branch, or input a issue url.",
exit_code=-1,
)
# read issue by url # read issue by url
task = get_issue_or_task(task) task = get_issue_or_task(task)
# Generate 5 branch names # Generate 5 branch names
print("Generating branch names ...", end="\n\n", flush=True) print("Generating branch names ...", end="\n\n", flush=True)
branch_names = generate_branch_name(task = task) branch_names = generate_branch_name(task=task)
assert_exit(not branch_names, "Failed to generate branch names.", exit_code=-1) assert_exit(not branch_names, "Failed to generate branch names.", exit_code=-1)
branch_names = branch_names["names"] branch_names = branch_names["names"]
@ -75,4 +86,3 @@ def main():
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@ -1,18 +1,21 @@
import sys
import os import os
import sys
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), "..")) sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), ".."))
from common_util import assert_exit, editor # noqa: E402
from devchat.llm import chat_json # noqa: E402 from devchat.llm import chat_json # noqa: E402
from git_api import create_issue # noqa: E402 from git_api import create_issue # noqa: E402
from common_util import editor, assert_exit # noqa: E402
# Function to generate issue title and body using LLM # Function to generate issue title and body using LLM
PROMPT = ("Based on the following description, " PROMPT = (
"suggest a title and a detailed body for a GitHub issue:\n\n" "Based on the following description, "
"Description: {description}\n\n" "suggest a title and a detailed body for a GitHub issue:\n\n"
"Output as valid JSON format: {{\"title\": \"<title>\", \"body\": \"<body> use \\n as new line flag.\"}} ") "Description: {description}\n\n"
'Output as valid JSON format: {{"title": "<title>", "body": "<body> use \\n as new line flag."}} ' # noqa: E501
)
@chat_json(prompt=PROMPT) @chat_json(prompt=PROMPT)
def generate_issue_content(description): def generate_issue_content(description):
pass pass
@ -27,14 +30,14 @@ def edit_issue(title, body):
# Main function # Main function
def main(): def main():
print("start new_issue ...", end="\n\n", flush=True) print("start new_issue ...", end="\n\n", flush=True)
assert_exit(len(sys.argv) < 2, "Missing argument.", exit_code=-1) assert_exit(len(sys.argv) < 2, "Missing argument.", exit_code=-1)
description = sys.argv[1] description = sys.argv[1]
print("Generating issue content ...", end="\n\n", flush=True) print("Generating issue content ...", end="\n\n", flush=True)
issue_json_ob = generate_issue_content(description=description) issue_json_ob = generate_issue_content(description=description)
assert_exit(not issue_json_ob, "Failed to generate issue content.", exit_code=-1) assert_exit(not issue_json_ob, "Failed to generate issue content.", exit_code=-1)
issue_title, issue_body = edit_issue(issue_json_ob["title"], issue_json_ob["body"]) issue_title, issue_body = edit_issue(issue_json_ob["title"], issue_json_ob["body"])
assert_exit(not issue_title, "Issue creation cancelled.", exit_code=0) assert_exit(not issue_title, "Issue creation cancelled.", exit_code=0)
print("New Issue:", issue_title, "body:", issue_body, end="\n\n", flush=True) print("New Issue:", issue_title, "body:", issue_body, end="\n\n", flush=True)
@ -46,4 +49,4 @@ def main():
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@ -1,12 +1,17 @@
import sys
import os import os
import sys
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "..")) sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", ".."))
from common_util import assert_exit, editor, ui_edit # noqa: E402
from devchat.llm import chat_json # noqa: E402 from devchat.llm import chat_json # noqa: E402
from git_api import create_issue, parse_sub_tasks, update_task_issue_url, update_issue_body, get_issue_info_by_url # noqa: E402 from git_api import ( # noqa: E402
from common_util import editor, assert_exit, ui_edit # noqa: E402 create_issue,
get_issue_info_by_url,
parse_sub_tasks,
update_issue_body,
update_task_issue_url,
)
# Function to generate issue title and body using LLM # Function to generate issue title and body using LLM
PROMPT = ( PROMPT = (
@ -14,7 +19,10 @@ PROMPT = (
"{issue_content}\n\n" "{issue_content}\n\n"
"Based on the following issue task: {task}" "Based on the following issue task: {task}"
"suggest a title and a detailed body for a GitHub issue:\n\n" "suggest a title and a detailed body for a GitHub issue:\n\n"
"Output format: {{\"title\": \"<title>\", \"body\": \"<body>\"}} ") 'Output format: {{"title": "<title>", "body": "<body>"}} '
)
@chat_json(prompt=PROMPT) @chat_json(prompt=PROMPT)
def generate_issue_content(issue_content, task): def generate_issue_content(issue_content, task):
pass pass
@ -30,16 +38,22 @@ def edit_issue(title, body):
def select_task(tasks): def select_task(tasks):
pass pass
def get_issue_json(issue_url): def get_issue_json(issue_url):
issue = get_issue_info_by_url(issue_url) issue = get_issue_info_by_url(issue_url)
assert_exit(not issue, "Failed to retrieve issue with ID: {issue_id}", exit_code=-1) assert_exit(not issue, "Failed to retrieve issue with ID: {issue_id}", exit_code=-1)
return {"id": issue["number"], "html_url": issue["html_url"], "title": issue["title"], "body": issue["body"]} return {
"id": issue["number"],
"html_url": issue["html_url"],
"title": issue["title"],
"body": issue["body"],
}
# Main function # Main function
def main(): def main():
print("start new_issue ...", end="\n\n", flush=True) print("start new_issue ...", end="\n\n", flush=True)
assert_exit(len(sys.argv) < 2, "Missing argument.", exit_code=-1) assert_exit(len(sys.argv) < 2, "Missing argument.", exit_code=-1)
issue_url = sys.argv[1] issue_url = sys.argv[1]
@ -48,17 +62,16 @@ def main():
tasks = parse_sub_tasks(old_issue["body"]) tasks = parse_sub_tasks(old_issue["body"])
assert_exit(not tasks, "No tasks in issue body.") assert_exit(not tasks, "No tasks in issue body.")
# select task from tasks # select task from tasks
[task] = select_task(tasks) [task] = select_task(tasks)
assert_exit(task is None, "No task selected.") assert_exit(task is None, "No task selected.")
task = tasks[task] task = tasks[task]
print("task:", task, end="\n\n", flush=True) print("task:", task, end="\n\n", flush=True)
print("Generating issue content ...", end="\n\n", flush=True) print("Generating issue content ...", end="\n\n", flush=True)
issue_json_ob = generate_issue_content(issue_content=old_issue, task=task) issue_json_ob = generate_issue_content(issue_content=old_issue, task=task)
assert_exit(not issue_json_ob, "Failed to generate issue content.", exit_code=-1) assert_exit(not issue_json_ob, "Failed to generate issue content.", exit_code=-1)
issue_title, issue_body = edit_issue(issue_json_ob["title"], issue_json_ob["body"]) issue_title, issue_body = edit_issue(issue_json_ob["title"], issue_json_ob["body"])
assert_exit(not issue_title, "Issue creation cancelled.", exit_code=0) assert_exit(not issue_title, "Issue creation cancelled.", exit_code=0)
print("New Issue:", issue_title, "body:", issue_body, end="\n\n", flush=True) print("New Issue:", issue_title, "body:", issue_body, end="\n\n", flush=True)
@ -78,4 +91,4 @@ def main():
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@ -1,18 +1,20 @@
import subprocess
import sys
import requests
import json import json
import os import os
import time import sys
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), "..")) sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), ".."))
from lib.chatmark import TextEditor, Form # noqa: E402
from devchat.llm import chat_json # noqa: E402
from git_api import auto_push, check_git_installed, create_pull_request, get_commit_messages, get_current_branch, get_github_repo, get_issue_info, is_issue_url, read_issue_by_url
from common_util import assert_exit, ui_edit # noqa: E402 from common_util import assert_exit, ui_edit # noqa: E402
from devchat.llm import chat_json # noqa: E402
from git_api import ( # noqa: E402
auto_push,
create_pull_request,
get_commit_messages,
get_current_branch,
get_github_repo,
get_issue_info,
)
BASH_BRANCH = "main" BASH_BRANCH = "main"
@ -23,6 +25,7 @@ def extract_issue_id(branch_name):
return branch_name.split("#")[-1] return branch_name.split("#")[-1]
return None return None
# 使用LLM模型生成PR内容 # 使用LLM模型生成PR内容
PROMPT = ( PROMPT = (
"Create a pull request title and body based on " "Create a pull request title and body based on "
@ -34,12 +37,17 @@ PROMPT = (
"The response result should format as JSON object as following:\n" "The response result should format as JSON object as following:\n"
'{{"title": "pr title", "body": "pr body"}}' '{{"title": "pr title", "body": "pr body"}}'
) )
@chat_json(prompt=PROMPT, model="gpt-4-1106-preview") @chat_json(prompt=PROMPT, model="gpt-4-1106-preview")
def generate_pr_content_llm(issue, commit_message, user_input): def generate_pr_content_llm(issue, commit_message, user_input):
pass pass
def generate_pr_content(issue, commit_messages, user_input): def generate_pr_content(issue, commit_messages, user_input):
response = generate_pr_content_llm(issue = json.dumps(issue), commit_messages = commit_messages, user_input = user_input) response = generate_pr_content_llm(
issue=json.dumps(issue), commit_messages=commit_messages, user_input=user_input
)
assert_exit(not response, "Failed to generate PR content.", exit_code=-1) assert_exit(not response, "Failed to generate PR content.", exit_code=-1)
return response.get("title"), response.get("body") return response.get("title"), response.get("body")
@ -55,40 +63,45 @@ def get_issue_json(issue_id):
if issue_id: if issue_id:
issue = get_issue_info(issue_id) issue = get_issue_info(issue_id)
assert_exit(not issue, "Failed to retrieve issue with ID: {issue_id}", exit_code=-1) assert_exit(not issue, "Failed to retrieve issue with ID: {issue_id}", exit_code=-1)
issue = {"id": issue_id, "html_url": issue["html_url"], "title": issue["title"], "body": issue["body"]} issue = {
"id": issue_id,
"html_url": issue["html_url"],
"title": issue["title"],
"body": issue["body"],
}
return issue return issue
# 主函数 # 主函数
def main(): def main():
print("start new_pr ...", end="\n\n", flush=True) print("start new_pr ...", end="\n\n", flush=True)
repo_name = get_github_repo() repo_name = get_github_repo()
branch_name = get_current_branch() branch_name = get_current_branch()
issue_id = extract_issue_id(branch_name) issue_id = extract_issue_id(branch_name)
# print basic info, repo_name, branch_name, issue_id # print basic info, repo_name, branch_name, issue_id
print("repo name:", repo_name, end="\n\n") print("repo name:", repo_name, end="\n\n")
print("branch name:", branch_name, end="\n\n") print("branch name:", branch_name, end="\n\n")
print("issue id:", issue_id, end="\n\n") print("issue id:", issue_id, end="\n\n")
issue = get_issue_json(issue_id) issue = get_issue_json(issue_id)
commit_messages = get_commit_messages(BASH_BRANCH) commit_messages = get_commit_messages(BASH_BRANCH)
print("generating pr title and body ...", end="\n\n", flush=True) print("generating pr title and body ...", end="\n\n", flush=True)
user_input = sys.argv[1] user_input = sys.argv[1]
pr_title, pr_body = generate_pr_content(issue, commit_messages, user_input) pr_title, pr_body = generate_pr_content(issue, commit_messages, user_input)
assert_exit(not pr_title, "Failed to generate PR content.", exit_code=-1) assert_exit(not pr_title, "Failed to generate PR content.", exit_code=-1)
pr_title, pr_body = edit_pr(pr_title, pr_body) pr_title, pr_body = edit_pr(pr_title, pr_body)
assert_exit(not pr_title, "PR creation cancelled.", exit_code=0) assert_exit(not pr_title, "PR creation cancelled.", exit_code=0)
is_push_success = auto_push() is_push_success = auto_push()
assert_exit(not is_push_success, "Failed to push changes.", exit_code=-1) assert_exit(not is_push_success, "Failed to push changes.", exit_code=-1)
pr = create_pull_request(pr_title, pr_body, branch_name, BASH_BRANCH, repo_name) pr = create_pull_request(pr_title, pr_body, branch_name, BASH_BRANCH, repo_name)
assert_exit(not pr, "Failed to create PR.", exit_code=-1) assert_exit(not pr, "Failed to create PR.", exit_code=-1)
print(f"PR created successfully: {pr['html_url']}") print(f"PR created successfully: {pr['html_url']}")

View File

@ -1,12 +1,16 @@
import sys
import os import os
import sys
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), "..")) sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), ".."))
from common_util import assert_exit, editor # noqa: E402
from devchat.llm import chat_json # noqa: E402 from devchat.llm import chat_json # noqa: E402
from git_api import get_issue_info_by_url, parse_sub_tasks, update_sub_tasks, update_issue_body # noqa: E402 from git_api import ( # noqa: E402
from common_util import editor, assert_exit # noqa: E402 get_issue_info_by_url,
parse_sub_tasks,
update_issue_body,
update_sub_tasks,
)
TASKS_PROMPT = ( TASKS_PROMPT = (
"Following is my git issue content.\n" "Following is my git issue content.\n"
@ -18,10 +22,13 @@ TASKS_PROMPT = (
"Please output all tasks in JSON format as:" "Please output all tasks in JSON format as:"
'{{"tasks": ["[ ] task1", "[ ] task2"]}}' '{{"tasks": ["[ ] task1", "[ ] task2"]}}'
) )
@chat_json(prompt=TASKS_PROMPT) @chat_json(prompt=TASKS_PROMPT)
def generate_issue_tasks(issue_data, user_input): def generate_issue_tasks(issue_data, user_input):
pass pass
def to_task_str(tasks): def to_task_str(tasks):
task_str = "" task_str = ""
for task in tasks: for task in tasks:
@ -34,24 +41,32 @@ def to_task_str(tasks):
def edit_issue_tasks(old_tasks, new_tasks): def edit_issue_tasks(old_tasks, new_tasks):
pass pass
@editor("Input ISSUE url:") @editor("Input ISSUE url:")
def input_issue_url(url): def input_issue_url(url):
pass pass
@editor("How to update tasks:") @editor("How to update tasks:")
def update_tasks_input(user_input): def update_tasks_input(user_input):
pass pass
def get_issue_json(issue_url): def get_issue_json(issue_url):
issue = get_issue_info_by_url(issue_url) issue = get_issue_info_by_url(issue_url)
assert_exit(not issue, "Failed to retrieve issue with ID: {issue_id}", exit_code=-1) assert_exit(not issue, "Failed to retrieve issue with ID: {issue_id}", exit_code=-1)
return {"id": issue["number"], "html_url": issue["html_url"], "title": issue["title"], "body": issue["body"]} return {
"id": issue["number"],
"html_url": issue["html_url"],
"title": issue["title"],
"body": issue["body"],
}
# Main function # Main function
def main(): def main():
print("start issue tasks update ...", end="\n\n", flush=True) print("start issue tasks update ...", end="\n\n", flush=True)
[issue_url] = input_issue_url("") [issue_url] = input_issue_url("")
assert_exit(not issue_url, "No issue url.") assert_exit(not issue_url, "No issue url.")
print("issue url:", issue_url, end="\n\n", flush=True) print("issue url:", issue_url, end="\n\n", flush=True)
@ -64,7 +79,7 @@ def main():
[user_input] = update_tasks_input("") [user_input] = update_tasks_input("")
assert_exit(not user_input, "No user input") assert_exit(not user_input, "No user input")
new_tasks = generate_issue_tasks(issue_data=issue, user_input=user_input) new_tasks = generate_issue_tasks(issue_data=issue, user_input=user_input)
assert_exit(not new_tasks, "No new tasks.") assert_exit(not new_tasks, "No new tasks.")
print("new_tasks:", new_tasks, end="\n\n", flush=True) print("new_tasks:", new_tasks, end="\n\n", flush=True)
assert_exit(not new_tasks.get("tasks", []), "No new tasks.") assert_exit(not new_tasks.get("tasks", []), "No new tasks.")
@ -83,4 +98,4 @@ def main():
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@ -1,24 +1,27 @@
import subprocess
import sys
import requests
import json import json
import os import os
import time import sys
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), "..")) sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), ".."))
from lib.chatmark import TextEditor, Form # noqa: E402
from devchat.llm import chat_json # noqa: E402
from git_api import auto_push, check_git_installed, create_pull_request, get_commit_messages, get_current_branch, get_github_repo, get_issue_info, get_recently_pr, is_issue_url, read_issue_by_url, update_pr
from common_util import assert_exit, ui_edit # noqa: E402 from common_util import assert_exit, ui_edit # noqa: E402
from devchat.llm import ( # noqa: E402
from lib.chatmark import TextEditor, Form # noqa: E402 chat_json,
from devchat.llm import chat_completion_no_stream_return_json # noqa: E402 )
from git_api import ( # noqa: E402
auto_push,
get_commit_messages,
get_current_branch,
get_github_repo,
get_issue_info,
get_recently_pr,
update_pr,
)
BASH_BRANCH = "main" BASH_BRANCH = "main"
# 从分支名称中提取issue id # 从分支名称中提取issue id
def extract_issue_id(branch_name): def extract_issue_id(branch_name):
if "#" in branch_name: if "#" in branch_name:
@ -36,12 +39,15 @@ PROMPT = (
"The response result should format as JSON object as following:\n" "The response result should format as JSON object as following:\n"
'{{"title": "pr title", "body": "pr body"}}' '{{"title": "pr title", "body": "pr body"}}'
) )
@chat_json(prompt=PROMPT, model="gpt-4-1106-preview") @chat_json(prompt=PROMPT, model="gpt-4-1106-preview")
def generate_pr_content_llm(issue, commit_messages): def generate_pr_content_llm(issue, commit_messages):
pass pass
def generate_pr_content(issue, commit_messages): def generate_pr_content(issue, commit_messages):
response = generate_pr_content_llm(issue = json.dumps(issue), commit_messages = commit_messages) response = generate_pr_content_llm(issue=json.dumps(issue), commit_messages=commit_messages)
assert_exit(not response, "Failed to generate PR content.", exit_code=-1) assert_exit(not response, "Failed to generate PR content.", exit_code=-1)
return response.get("title"), response.get("body") return response.get("title"), response.get("body")
@ -57,42 +63,47 @@ def get_issue_json(issue_id):
if issue_id: if issue_id:
issue = get_issue_info(issue_id) issue = get_issue_info(issue_id)
assert_exit(not issue, "Failed to retrieve issue with ID: {issue_id}", exit_code=-1) assert_exit(not issue, "Failed to retrieve issue with ID: {issue_id}", exit_code=-1)
issue = {"id": issue_id, "html_url": issue["html_url"], "title": issue["title"], "body": issue["body"]} issue = {
"id": issue_id,
"html_url": issue["html_url"],
"title": issue["title"],
"body": issue["body"],
}
return issue return issue
# 主函数 # 主函数
def main(): def main():
print("start update_pr ...", end="\n\n", flush=True) print("start update_pr ...", end="\n\n", flush=True)
repo_name = get_github_repo() repo_name = get_github_repo()
branch_name = get_current_branch() branch_name = get_current_branch()
issue_id = extract_issue_id(branch_name) issue_id = extract_issue_id(branch_name)
# print basic info, repo_name, branch_name, issue_id # print basic info, repo_name, branch_name, issue_id
print("repo name:", repo_name, end="\n\n") print("repo name:", repo_name, end="\n\n")
print("branch name:", branch_name, end="\n\n") print("branch name:", branch_name, end="\n\n")
print("issue id:", issue_id, end="\n\n") print("issue id:", issue_id, end="\n\n")
issue = get_issue_json(issue_id) issue = get_issue_json(issue_id)
commit_messages = get_commit_messages(BASH_BRANCH) commit_messages = get_commit_messages(BASH_BRANCH)
recent_pr = get_recently_pr(repo_name) recent_pr = get_recently_pr(repo_name)
assert_exit(not recent_pr, "Failed to get recent PR.", exit_code=-1) assert_exit(not recent_pr, "Failed to get recent PR.", exit_code=-1)
print("generating pr title and body ...", end="\n\n", flush=True) print("generating pr title and body ...", end="\n\n", flush=True)
pr_title, pr_body = generate_pr_content(issue, commit_messages) pr_title, pr_body = generate_pr_content(issue, commit_messages)
assert_exit(not pr_title, "Failed to generate PR content.", exit_code=-1) assert_exit(not pr_title, "Failed to generate PR content.", exit_code=-1)
pr_title, pr_body = edit_pr(pr_title, pr_body) pr_title, pr_body = edit_pr(pr_title, pr_body)
assert_exit(not pr_title, "PR creation cancelled.", exit_code=0) assert_exit(not pr_title, "PR creation cancelled.", exit_code=0)
is_push_success = auto_push() is_push_success = auto_push()
assert_exit(not is_push_success, "Failed to push changes.", exit_code=-1) assert_exit(not is_push_success, "Failed to push changes.", exit_code=-1)
pr = update_pr(recent_pr['number'], pr_title, pr_body, repo_name) pr = update_pr(recent_pr["number"], pr_title, pr_body, repo_name)
assert_exit(not pr, "Failed to update PR.", exit_code=-1) assert_exit(not pr, "Failed to update PR.", exit_code=-1)
print(f"PR updated successfully: {pr['html_url']}") print(f"PR updated successfully: {pr['html_url']}")

View File

@ -1,22 +1,22 @@
""" """
/pr.describe https://github.com/devchat-ai/devchat-vscode/pull/25 /pr.describe https://github.com/devchat-ai/devchat-vscode/pull/25
""" """
# ruff: noqa: E402
import logging
import os import os
import sys import sys
import logging
import json # add the current directory to the path
import argparse from os.path import abspath, dirname
import asyncio
from lib.ide_service import IDEService from lib.ide_service import IDEService
# add the current directory to the path
from os.path import dirname, abspath
sys.path.append(dirname(dirname(abspath(__file__)))) sys.path.append(dirname(dirname(abspath(__file__))))
# add new model configs to algo.MAX_TOKENS # add new model configs to algo.MAX_TOKENS
import pr_agent.algo as algo import pr_agent.algo as algo
algo.MAX_TOKENS["gpt-4-turbo-preview"] = 128000 algo.MAX_TOKENS["gpt-4-turbo-preview"] = 128000
algo.MAX_TOKENS["claude-3-opus"] = 100000 algo.MAX_TOKENS["claude-3-opus"] = 100000
algo.MAX_TOKENS["claude-3-sonnet"] = 100000 algo.MAX_TOKENS["claude-3-sonnet"] = 100000
@ -44,49 +44,68 @@ algo.MAX_TOKENS["sentence-transformers/msmarco-bert-base-dot-v5"] = 512
algo.MAX_TOKENS["bert-base-uncased"] = 512 algo.MAX_TOKENS["bert-base-uncased"] = 512
if os.environ.get("LLM_MODEL", "gpt-3.5-turbo-1106") not in algo.MAX_TOKENS: if os.environ.get("LLM_MODEL", "gpt-3.5-turbo-1106") not in algo.MAX_TOKENS:
current_model = os.environ.get("LLM_MODEL", "gpt-3.5-turbo-1106") current_model = os.environ.get("LLM_MODEL", "gpt-3.5-turbo-1106")
IDEService().ide_logging("info", f"{current_model}'s max tokens is not config, we use it as default 16000") IDEService().ide_logging(
"info", f"{current_model}'s max tokens is not config, we use it as default 16000"
)
algo.MAX_TOKENS[os.environ.get("LLM_MODEL", "gpt-3.5-turbo-1106")] = 16000 algo.MAX_TOKENS[os.environ.get("LLM_MODEL", "gpt-3.5-turbo-1106")] = 16000
# add new git provider # add new git provider
def get_git_provider(): def get_git_provider():
from pr_agent.config_loader import get_settings from pr_agent.config_loader import get_settings
_git_provider_old_ = get_settings().config.git_provider _git_provider_old_ = get_settings().config.git_provider
get_settings().config.git_provider = "devchat" get_settings().config.git_provider = "devchat"
provider = _get_git_provider_old() provider = _get_git_provider_old()
get_settings().config.git_provider = _git_provider_old_ get_settings().config.git_provider = _git_provider_old_
return provider return provider
import pr_agent.git_providers as git_providers import pr_agent.git_providers as git_providers
from providers.devchat_provider import DevChatProvider from providers.devchat_provider import DevChatProvider
git_providers._GIT_PROVIDERS['devchat'] = DevChatProvider
git_providers._GIT_PROVIDERS["devchat"] = DevChatProvider
_get_git_provider_old = git_providers.get_git_provider _get_git_provider_old = git_providers.get_git_provider
git_providers.get_git_provider = get_git_provider git_providers.get_git_provider = get_git_provider
from pr_agent.config_loader import get_settings
from pr_agent.cli import run from pr_agent.cli import run
from pr_agent.config_loader import get_settings
# mock logging method, to redirect log to IDE # mock logging method, to redirect log to IDE
from pr_agent.log import setup_logger, inv_analytics_filter from pr_agent.log import inv_analytics_filter, setup_logger
from lib.ide_service import IDEService from lib.ide_service import IDEService
class CustomOutput: class CustomOutput:
def __init__(self): def __init__(self):
pass pass
def write(self, message): def write(self, message):
IDEService().ide_logging("info", message.strip()) IDEService().ide_logging("info", message.strip())
def flush(self): def flush(self):
pass pass
def close(self): def close(self):
pass pass
log_level = os.environ.get("LOG_LEVEL", "INFO") log_level = os.environ.get("LOG_LEVEL", "INFO")
logger = setup_logger(log_level) logger = setup_logger(log_level)
logger.remove(None) logger.remove(None)
logger.add(CustomOutput(), level=logging.INFO, format="{message}", colorize=False, filter=inv_analytics_filter) logger.add(
CustomOutput(),
level=logging.INFO,
format="{message}",
colorize=False,
filter=inv_analytics_filter,
)
from config_util import read_server_access_token_with_input, get_repo_type from config_util import get_repo_type, read_server_access_token_with_input
from custom_suggestions_config import get_custom_suggestions_system_prompt from custom_suggestions_config import get_custom_suggestions_system_prompt
# set openai key and api base # set openai key and api base
@ -107,7 +126,12 @@ if repo_type == "github":
elif repo_type == "gitlab": elif repo_type == "gitlab":
get_settings().set("GITLAB.PERSONAL_ACCESS_TOKEN", access_token) get_settings().set("GITLAB.PERSONAL_ACCESS_TOKEN", access_token)
else: else:
print("Unsupported git hosting service, input pr url is:", sys.argv[1], file=sys.stderr, flush=True) print(
"Unsupported git hosting service, input pr url is:",
sys.argv[1],
file=sys.stderr,
flush=True,
)
sys.exit(1) sys.exit(1)
@ -143,7 +167,7 @@ language_prompt = "\n\n输出内容使用中文输出。\n" if language == "zh"
get_settings().pr_code_suggestions_prompt.system += language_prompt get_settings().pr_code_suggestions_prompt.system += language_prompt
get_settings().pr_review_prompt.system += language_prompt get_settings().pr_review_prompt.system += language_prompt
get_settings().pr_description_prompt.system += language_prompt get_settings().pr_description_prompt.system += language_prompt
#get_settings().pr_reviewer.inline_code_comments = True # get_settings().pr_reviewer.inline_code_comments = True
# config for find similar issues # config for find similar issues
get_settings().set("PR_SIMILAR_ISSUE.VECTORDB", "lancedb") get_settings().set("PR_SIMILAR_ISSUE.VECTORDB", "lancedb")
@ -152,17 +176,17 @@ get_settings().set("LANCEDB.URI", "data/lancedb")
# set git provider type, devchat provider will create actual repo provider based on this type # set git provider type, devchat provider will create actual repo provider based on this type
pr_provider_type = get_repo_type(sys.argv[1]) pr_provider_type = get_repo_type(sys.argv[1])
if not pr_provider_type: if not pr_provider_type:
print("Unsupported git hosting service, input pr url is:", sys.argv[1], file=sys.stderr, flush=True) print(
"Unsupported git hosting service, input pr url is:",
sys.argv[1],
file=sys.stderr,
flush=True,
)
sys.exit(1) sys.exit(1)
get_settings().set("CONFIG.GIT_PROVIDER", pr_provider_type) get_settings().set("CONFIG.GIT_PROVIDER", pr_provider_type)
os.environ['CONFIG.GIT_PROVIDER_TYPE'] = pr_provider_type os.environ["CONFIG.GIT_PROVIDER_TYPE"] = pr_provider_type
# os.environ['ENABLE_PUBLISH_LABELS'] = "1" # os.environ['ENABLE_PUBLISH_LABELS'] = "1"
if __name__ == '__main__': if __name__ == "__main__":
sys.argv = [ sys.argv = [sys.executable, "--pr_url", sys.argv[1].strip(), sys.argv[2].strip()]
sys.executable,
'--pr_url',
sys.argv[1].strip(),
sys.argv[2].strip()
]
run() run()

View File

@ -1,6 +1,5 @@
import os
import json import json
import sys import os
from lib.chatmark import TextEditor from lib.chatmark import TextEditor
@ -26,6 +25,7 @@ def get_repo_type(url):
else: else:
return "" return ""
def read_github_token(): def read_github_token():
config_path = os.path.join(os.path.expanduser("~/.chat"), ".workflow_config.json") config_path = os.path.join(os.path.expanduser("~/.chat"), ".workflow_config.json")
if os.path.exists(config_path): if os.path.exists(config_path):
@ -35,6 +35,7 @@ def read_github_token():
return config_data["github_token"] return config_data["github_token"]
return "" return ""
def read_server_access_token(repo_type): def read_server_access_token(repo_type):
config_path = os.path.join(os.path.expanduser("~/.chat"), ".workflow_config.json") config_path = os.path.join(os.path.expanduser("~/.chat"), ".workflow_config.json")
if os.path.exists(config_path): if os.path.exists(config_path):
@ -44,6 +45,7 @@ def read_server_access_token(repo_type):
return config_data[repo_type]["access_token"] return config_data[repo_type]["access_token"]
return "" return ""
def save_github_token(github_token): def save_github_token(github_token):
config_path = os.path.join(os.path.expanduser("~/.chat"), ".workflow_config.json") config_path = os.path.join(os.path.expanduser("~/.chat"), ".workflow_config.json")
@ -51,11 +53,12 @@ def save_github_token(github_token):
if os.path.exists(config_path): if os.path.exists(config_path):
with open(config_path, "r", encoding="utf-8") as f: with open(config_path, "r", encoding="utf-8") as f:
config_data = json.load(f) config_data = json.load(f)
config_data["github_token"] = github_token config_data["github_token"] = github_token
with open(config_path, "w+", encoding="utf-8") as f: with open(config_path, "w+", encoding="utf-8") as f:
json.dump(config_data, f, indent=4) json.dump(config_data, f, indent=4)
def save_server_access_token(repo_type, access_token): def save_server_access_token(repo_type, access_token):
config_path = os.path.join(os.path.expanduser("~/.chat"), ".workflow_config.json") config_path = os.path.join(os.path.expanduser("~/.chat"), ".workflow_config.json")
@ -63,27 +66,26 @@ def save_server_access_token(repo_type, access_token):
if os.path.exists(config_path): if os.path.exists(config_path):
with open(config_path, "r", encoding="utf-8") as f: with open(config_path, "r", encoding="utf-8") as f:
config_data = json.load(f) config_data = json.load(f)
if repo_type not in config_data: if repo_type not in config_data:
config_data[repo_type] = {} config_data[repo_type] = {}
config_data[repo_type]["access_token"] = access_token config_data[repo_type]["access_token"] = access_token
with open(config_path, "w+", encoding="utf-8") as f: with open(config_path, "w+", encoding="utf-8") as f:
json.dump(config_data, f, indent=4) json.dump(config_data, f, indent=4)
def read_github_token_with_input(): def read_github_token_with_input():
github_token = read_github_token() github_token = read_github_token()
if not github_token: if not github_token:
# Input your github TOKEN to access github api: # Input your github TOKEN to access github api:
github_token_editor = TextEditor( github_token_editor = TextEditor("", "Please input your github TOKEN to access:")
"",
"Please input your github TOKEN to access:"
)
github_token = github_token_editor.new_text github_token = github_token_editor.new_text
if not github_token: if not github_token:
return github_token return github_token
save_github_token(github_token) save_github_token(github_token)
return github_token return github_token
def read_server_access_token_with_input(pr_url): def read_server_access_token_with_input(pr_url):
repo_type = get_repo_type(pr_url) repo_type = get_repo_type(pr_url)
if not repo_type: if not repo_type:
@ -93,8 +95,7 @@ def read_server_access_token_with_input(pr_url):
if not server_access_token: if not server_access_token:
# Input your server access TOKEN to access server api: # Input your server access TOKEN to access server api:
server_access_token_editor = TextEditor( server_access_token_editor = TextEditor(
"", "", f"Please input your {repo_type} access TOKEN to access:"
f"Please input your {repo_type} access TOKEN to access:"
) )
server_access_token_editor.render() server_access_token_editor.render()

View File

@ -1,5 +1,5 @@
import os
import json import json
import os
from lib.chatmark import TextEditor from lib.chatmark import TextEditor
@ -13,6 +13,7 @@ def read_custom_suggestions():
return config_data["custom_suggestions"] return config_data["custom_suggestions"]
return "" return ""
def save_custom_suggestions(custom_suggestions): def save_custom_suggestions(custom_suggestions):
config_path = os.path.join(os.path.expanduser("~/.chat"), ".workflow_config.json") config_path = os.path.join(os.path.expanduser("~/.chat"), ".workflow_config.json")
@ -20,7 +21,7 @@ def save_custom_suggestions(custom_suggestions):
if os.path.exists(config_path): if os.path.exists(config_path):
with open(config_path, "r", encoding="utf-8") as f: with open(config_path, "r", encoding="utf-8") as f:
config_data = json.load(f) config_data = json.load(f)
config_data["custom_suggestions"] = custom_suggestions config_data["custom_suggestions"] = custom_suggestions
with open(config_path, "w+", encoding="utf-8") as f: with open(config_path, "w+", encoding="utf-8") as f:
json.dump(config_data, f, indent=4) json.dump(config_data, f, indent=4)
@ -30,19 +31,19 @@ def config_custom_suggestions_with():
custom_suggestions = read_custom_suggestions() custom_suggestions = read_custom_suggestions()
if not custom_suggestions: if not custom_suggestions:
custom_suggestions = "- make sure the code is efficient\n" custom_suggestions = "- make sure the code is efficient\n"
# Input your github TOKEN to access github api: # Input your github TOKEN to access github api:
custom_suggestions_editor = TextEditor( custom_suggestions_editor = TextEditor(
custom_suggestions, custom_suggestions, "Please input your custom suggestions:"
"Please input your custom suggestions:"
) )
custom_suggestions_editor.render() custom_suggestions_editor.render()
custom_suggestions = custom_suggestions_editor.new_text custom_suggestions = custom_suggestions_editor.new_text
if not custom_suggestions: if not custom_suggestions:
return return
save_custom_suggestions(custom_suggestions) save_custom_suggestions(custom_suggestions)
if __name__ == "__main__": if __name__ == "__main__":
config_custom_suggestions_with() config_custom_suggestions_with()

View File

@ -1,5 +1,6 @@
import os # ruff: noqa: E501
import json import json
import os
import sys import sys
from lib.chatmark import TextEditor from lib.chatmark import TextEditor
@ -14,6 +15,7 @@ def read_custom_suggestions():
return config_data["custom_suggestions"] return config_data["custom_suggestions"]
return "" return ""
def save_custom_suggestions(custom_suggestions): def save_custom_suggestions(custom_suggestions):
config_path = os.path.join(os.path.expanduser("~/.chat"), ".workflow_config.json") config_path = os.path.join(os.path.expanduser("~/.chat"), ".workflow_config.json")
@ -21,7 +23,7 @@ def save_custom_suggestions(custom_suggestions):
if os.path.exists(config_path): if os.path.exists(config_path):
with open(config_path, "r", encoding="utf-8") as f: with open(config_path, "r", encoding="utf-8") as f:
config_data = json.load(f) config_data = json.load(f)
config_data["custom_suggestions"] = custom_suggestions config_data["custom_suggestions"] = custom_suggestions
with open(config_path, "w+", encoding="utf-8") as f: with open(config_path, "w+", encoding="utf-8") as f:
json.dump(config_data, f, indent=4) json.dump(config_data, f, indent=4)
@ -32,8 +34,7 @@ def read_custom_suggestions_with_input():
if not custom_suggestions: if not custom_suggestions:
# Input your github TOKEN to access github api: # Input your github TOKEN to access github api:
custom_suggestions_editor = TextEditor( custom_suggestions_editor = TextEditor(
"- make sure the code is efficient\n", "- make sure the code is efficient\n", "Please input your custom suggestions:"
"Please input your custom suggestions:"
) )
custom_suggestions_editor.render() custom_suggestions_editor.render()
@ -50,7 +51,8 @@ def get_custom_suggestions_system_prompt():
print("Command has been canceled.", flush=True) print("Command has been canceled.", flush=True)
sys.exit(0) sys.exit(0)
system_prompt = """You are PR-Reviewer, a language model that specializes in suggesting ways to improve for a Pull Request (PR) code. system_prompt = (
"""You are PR-Reviewer, a language model that specializes in suggesting ways to improve for a Pull Request (PR) code.
Your task is to provide meaningful and actionable code suggestions, to improve the new code presented in a PR diff. Your task is to provide meaningful and actionable code suggestions, to improve the new code presented in a PR diff.
@ -96,7 +98,9 @@ Specific instructions for generating code suggestions:
Instructions from the user, that should be taken into account with high priority: Instructions from the user, that should be taken into account with high priority:
""" + custom_suggestions + """ """
+ custom_suggestions
+ """
{%- if extra_instructions %} {%- if extra_instructions %}
@ -151,4 +155,5 @@ code_suggestions:
Each YAML output MUST be after a newline, indented, with block scalar indicator ('|'). Each YAML output MUST be after a newline, indented, with block scalar indicator ('|').
""" """
)
return system_prompt return system_prompt

View File

@ -1,28 +1,31 @@
import json
import os import os
import sys import sys
import json
from typing import Optional, Tuple from typing import Optional, Tuple
import pr_agent.git_providers as git_providers
from pr_agent.git_providers.git_provider import GitProvider, IncrementalPR from pr_agent.git_providers.git_provider import GitProvider, IncrementalPR
from pr_agent.git_providers.github_provider import GithubProvider from pr_agent.git_providers.github_provider import GithubProvider
import pr_agent.git_providers as git_providers
from lib.chatmark import Form, TextEditor, Button from lib.chatmark import Button, Form, TextEditor
class DevChatProvider(GitProvider): class DevChatProvider(GitProvider):
def __init__(self, pr_url: Optional[str] = None, incremental=IncrementalPR(False)): def __init__(self, pr_url: Optional[str] = None, incremental=IncrementalPR(False)):
# 根据某个状态创建正确的GitProvider # 根据某个状态创建正确的GitProvider
provider_type = os.environ.get('CONFIG.GIT_PROVIDER_TYPE') provider_type = os.environ.get("CONFIG.GIT_PROVIDER_TYPE")
self.provider: GitProvider = git_providers._GIT_PROVIDERS[provider_type](pr_url, incremental) self.provider: GitProvider = git_providers._GIT_PROVIDERS[provider_type](
pr_url, incremental
)
@property @property
def pr(self): def pr(self):
return self.provider.pr return self.provider.pr
@property @property
def diff_files(self): def diff_files(self):
return self.provider.diff_files return self.provider.diff_files
@property @property
def github_client(self): def github_client(self):
return self.provider.github_client return self.provider.github_client
@ -32,13 +35,10 @@ class DevChatProvider(GitProvider):
def get_diff_files(self): def get_diff_files(self):
return self.provider.get_diff_files() return self.provider.get_diff_files()
def need_edit(self): def need_edit(self):
button = Button( button = Button(
[ ["Commit", "Edit"],
"Commit",
"Edit"
],
) )
button.render() button.render()
return 1 == button.clicked return 1 == button.clicked
@ -54,9 +54,9 @@ class DevChatProvider(GitProvider):
# Edit pr title and body # Edit pr title and body
title_editor = TextEditor(pr_title) title_editor = TextEditor(pr_title)
body_editor = TextEditor(pr_body) body_editor = TextEditor(pr_body)
form = Form(['Edit pr title:', title_editor, 'Edit pr body:', body_editor]) form = Form(["Edit pr title:", title_editor, "Edit pr body:", body_editor])
form.render() form.render()
pr_title = title_editor.new_text pr_title = title_editor.new_text
pr_body = body_editor.new_text pr_body = body_editor.new_text
if not pr_title or not pr_body: if not pr_title or not pr_body:
@ -68,8 +68,7 @@ class DevChatProvider(GitProvider):
def publish_code_suggestions(self, code_suggestions: list) -> bool: def publish_code_suggestions(self, code_suggestions: list) -> bool:
code_suggestions_json_str = json.dumps(code_suggestions, indent=4) code_suggestions_json_str = json.dumps(code_suggestions, indent=4)
code_suggestions_editor = TextEditor( code_suggestions_editor = TextEditor(
code_suggestions_json_str, code_suggestions_json_str, "Edit code suggestions in JSON format:"
"Edit code suggestions in JSON format:"
) )
code_suggestions_editor.render() code_suggestions_editor.render()
@ -77,7 +76,7 @@ class DevChatProvider(GitProvider):
if not code_suggestions_json_new: if not code_suggestions_json_new:
print("Code suggestions are empty, please fill in the code suggestions.") print("Code suggestions are empty, please fill in the code suggestions.")
sys.exit(0) sys.exit(0)
code_suggestions = json.loads(code_suggestions_json_new) code_suggestions = json.loads(code_suggestions_json_new)
return self.provider.publish_code_suggestions(code_suggestions) return self.provider.publish_code_suggestions(code_suggestions)
@ -86,7 +85,7 @@ class DevChatProvider(GitProvider):
def get_pr_branch(self): def get_pr_branch(self):
return self.provider.get_pr_branch() return self.provider.get_pr_branch()
def get_files(self): def get_files(self):
return self.provider.get_files() return self.provider.get_files()
@ -103,10 +102,7 @@ class DevChatProvider(GitProvider):
print(f"\n\n{body}", end="\n\n", flush=True) print(f"\n\n{body}", end="\n\n", flush=True)
if self.need_edit(): if self.need_edit():
comment_editor = TextEditor( comment_editor = TextEditor(body, "Edit Comment:")
body,
"Edit Comment:"
)
comment_editor.render() comment_editor.render()
body = comment_editor.new_text body = comment_editor.new_text
@ -138,7 +134,9 @@ class DevChatProvider(GitProvider):
def get_pr_id(self): def get_pr_id(self):
return self.provider.get_pr_id() return self.provider.get_pr_id()
def get_line_link(self, relevant_file: str, relevant_line_start: int, relevant_line_end: int = None) -> str: def get_line_link(
self, relevant_file: str, relevant_line_start: int, relevant_line_end: int = None
) -> str:
return self.provider.get_line_link(relevant_file, relevant_line_start, relevant_line_end) return self.provider.get_line_link(relevant_file, relevant_line_start, relevant_line_end)
#### comments operations #### #### comments operations ####
@ -148,18 +146,18 @@ class DevChatProvider(GitProvider):
if pr_comment.find("## Generating PR code suggestions") != -1: if pr_comment.find("## Generating PR code suggestions") != -1:
return None return None
if (not is_temporary and \ if (
pr_comment.find("## Generating PR code suggestions") == -1 and \ not is_temporary
pr_comment.find("**[PR Description]") == -1): and pr_comment.find("## Generating PR code suggestions") == -1
and pr_comment.find("**[PR Description]") == -1
):
print(f"\n\n{pr_comment}", end="\n\n", flush=True) print(f"\n\n{pr_comment}", end="\n\n", flush=True)
if self.need_edit(): if self.need_edit():
pr_comment_editor = TextEditor( pr_comment_editor = TextEditor(pr_comment)
pr_comment form = Form(["Edit pr comment:", pr_comment_editor])
)
form = Form(['Edit pr comment:', pr_comment_editor])
form.render() form.render()
pr_comment = pr_comment_editor.new_text pr_comment = pr_comment_editor.new_text
if not pr_comment: if not pr_comment:
print("Comment is empty, please fill in the comment.") print("Comment is empty, please fill in the comment.")
@ -167,19 +165,20 @@ class DevChatProvider(GitProvider):
return self.provider.publish_comment(pr_comment, is_temporary=is_temporary) return self.provider.publish_comment(pr_comment, is_temporary=is_temporary)
def publish_persistent_comment(self, pr_comment: str, def publish_persistent_comment(
initial_header: str, self,
update_header: bool = True, pr_comment: str,
name='review', initial_header: str,
final_update_message=True): update_header: bool = True,
name="review",
final_update_message=True,
):
print(f"\n\n{initial_header}", end="\n\n", flush=True) print(f"\n\n{initial_header}", end="\n\n", flush=True)
print(pr_comment, end="\n\n", flush=True) print(pr_comment, end="\n\n", flush=True)
if self.need_edit(): if self.need_edit():
pr_comment_editor = TextEditor( pr_comment_editor = TextEditor(pr_comment)
pr_comment form = Form(["Edit pr comment:", pr_comment_editor])
)
form = Form(['Edit pr comment:', pr_comment_editor])
form.render() form.render()
pr_comment = pr_comment_editor.new_text pr_comment = pr_comment_editor.new_text
@ -187,14 +186,23 @@ class DevChatProvider(GitProvider):
if not pr_comment: if not pr_comment:
print("Comment is empty, please fill in the comment.") print("Comment is empty, please fill in the comment.")
sys.exit(0) sys.exit(0)
return self.provider.publish_persistent_comment(pr_comment, initial_header, update_header, name, final_update_message) return self.provider.publish_persistent_comment(
pr_comment, initial_header, update_header, name, final_update_message
)
def publish_inline_comment(self, body: str, relevant_file: str, relevant_line_in_file: str): def publish_inline_comment(self, body: str, relevant_file: str, relevant_line_in_file: str):
return self.provider.publish_inline_comment(body, relevant_file, relevant_line_in_file) return self.provider.publish_inline_comment(body, relevant_file, relevant_line_in_file)
def create_inline_comment(self, body: str, relevant_file: str, relevant_line_in_file: str, def create_inline_comment(
absolute_position: int = None): self,
return self.provider.create_inline_comment(body, relevant_file, relevant_line_in_file, absolute_position) body: str,
relevant_file: str,
relevant_line_in_file: str,
absolute_position: int = None,
):
return self.provider.create_inline_comment(
body, relevant_file, relevant_line_in_file, absolute_position
)
def publish_inline_comments(self, comments: list[dict]): def publish_inline_comments(self, comments: list[dict]):
return self.provider.publish_inline_comments(comments) return self.provider.publish_inline_comments(comments)
@ -213,7 +221,7 @@ class DevChatProvider(GitProvider):
#### labels operations #### #### labels operations ####
def publish_labels(self, labels): def publish_labels(self, labels):
if not os.environ.get('ENABLE_PUBLISH_LABELS', None): if not os.environ.get("ENABLE_PUBLISH_LABELS", None):
return None return None
return self.provider.publish_labels(labels) return self.provider.publish_labels(labels)
@ -247,7 +255,7 @@ class DevChatProvider(GitProvider):
def get_num_of_files(self): def get_num_of_files(self):
return self.provider.get_num_of_files() return self.provider.get_num_of_files()
@staticmethod @staticmethod
def _parse_issue_url(issue_url: str) -> Tuple[str, int]: def _parse_issue_url(issue_url: str) -> Tuple[str, int]:
return GithubProvider._parse_issue_url(issue_url) return GithubProvider._parse_issue_url(issue_url)