refactor: Replace subprocess calls with subprocess_check_output
- Replaced all instances of subprocess.check_output with subprocess_check_output - Added subprocess_check_output function to handle current repo directory - Updated git commands to use the new subprocess_check_output function
This commit is contained in:
parent
a4942d8527
commit
2525010384
@ -12,7 +12,7 @@ from lib.ide_service import IDEService
|
||||
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), ".."))
|
||||
|
||||
from common_util import assert_exit # noqa: E402
|
||||
from git_api import get_issue_info
|
||||
from git_api import get_issue_info, subprocess_check_output
|
||||
|
||||
diff_too_large_message_en = (
|
||||
"Commit failed. The modified content is too long "
|
||||
@ -137,7 +137,7 @@ def get_modified_files():
|
||||
tuple: 包含两个list的元组,第一个list包含当前修改过的文件,第二个list包含已经staged的文件
|
||||
"""
|
||||
""" 获取当前修改文件列表以及已经staged的文件列表"""
|
||||
output = subprocess.check_output(["git", "status", "-s", "-u"], text=True, encoding="utf-8")
|
||||
output = subprocess_check_output(["git", "status", "-s", "-u"], text=True, encoding="utf-8")
|
||||
lines = output.split("\n")
|
||||
modified_files = []
|
||||
staged_files = []
|
||||
@ -216,7 +216,7 @@ def rebuild_stage_list(user_files):
|
||||
|
||||
"""
|
||||
# Unstage all files
|
||||
subprocess.check_output(["git", "reset"])
|
||||
subprocess_check_output(["git", "reset"])
|
||||
# Stage all user_files
|
||||
for file in user_files:
|
||||
os.system(f'git add "{file}"')
|
||||
@ -233,13 +233,13 @@ def get_diff():
|
||||
bytes: 返回bytes类型,是git diff --cached命令的输出结果
|
||||
|
||||
"""
|
||||
return subprocess.check_output(["git", "diff", "--cached"])
|
||||
return subprocess_check_output(["git", "diff", "--cached"])
|
||||
|
||||
|
||||
def get_current_branch():
|
||||
try:
|
||||
# 使用git命令获取当前分支名称
|
||||
result = subprocess.check_output(
|
||||
result = subprocess_check_output(
|
||||
["git", "branch", "--show-current"], stderr=subprocess.STDOUT
|
||||
).strip()
|
||||
# 将结果从bytes转换为str
|
||||
@ -313,7 +313,7 @@ def display_commit_message_and_commit(commit_message):
|
||||
new_commit_message = text_editor.new_text
|
||||
if not new_commit_message:
|
||||
return None
|
||||
return subprocess.check_output(["git", "commit", "-m", new_commit_message])
|
||||
return subprocess_check_output(["git", "commit", "-m", new_commit_message])
|
||||
|
||||
|
||||
def extract_issue_id(branch_name):
|
||||
|
@ -28,6 +28,66 @@ def read_github_token():
|
||||
sys.exit(-1)
|
||||
return server_access_token
|
||||
|
||||
current_repo_dir = None
|
||||
def get_current_repo():
|
||||
"""
|
||||
获取当前文件所在的仓库信息
|
||||
"""
|
||||
global current_repo_dir
|
||||
|
||||
if not current_repo_dir:
|
||||
selected_data = IDEService().get_selected_range().dict()
|
||||
current_file = selected_data.get("abspath", None)
|
||||
if not current_file:
|
||||
return None
|
||||
current_dir = os.path.dirname(current_file)
|
||||
try:
|
||||
# 获取仓库根目录
|
||||
current_repo_dir = subprocess.check_output(['git', 'rev-parse', '--show-toplevel'], stderr=subprocess.DEVNULL, cwd=current_dir).decode('utf-8').strip()
|
||||
except subprocess.CalledProcessError:
|
||||
# 如果发生错误,可能不在git仓库中
|
||||
return None
|
||||
return current_repo_dir
|
||||
|
||||
def subprocess_check_output(*popenargs, timeout=None, **kwargs):
|
||||
# 将 current_dir 添加到 kwargs 中的 cwd 参数
|
||||
current_repo = get_current_repo()
|
||||
if current_repo:
|
||||
kwargs['cwd'] = kwargs.get('cwd', current_repo)
|
||||
|
||||
# 调用 subprocess.check_output
|
||||
return subprocess.check_output(*popenargs, timeout=timeout, **kwargs)
|
||||
|
||||
def subprocess_run(*popenargs,
|
||||
input=None, capture_output=False, timeout=None, check=False, **kwargs):
|
||||
current_repo = get_current_repo()
|
||||
if current_repo:
|
||||
kwargs['cwd'] = kwargs.get('cwd', current_repo)
|
||||
|
||||
# 调用 subprocess.run
|
||||
return subprocess.run(*popenargs,
|
||||
input=input,
|
||||
capture_output=capture_output,
|
||||
timeout=timeout,
|
||||
check=check,
|
||||
**kwargs)
|
||||
|
||||
def subprocess_call(*popenargs, timeout=None, **kwargs):
|
||||
current_repo = get_current_repo()
|
||||
if current_repo:
|
||||
kwargs['cwd'] = kwargs.get('cwd', current_repo)
|
||||
|
||||
# 调用 subprocess.call
|
||||
return subprocess.call(*popenargs, timeout=timeout, **kwargs)
|
||||
|
||||
def subprocess_check_call(*popenargs, timeout=None, **kwargs):
|
||||
current_repo = get_current_repo()
|
||||
if current_repo:
|
||||
kwargs['cwd'] = kwargs.get('cwd', current_repo)
|
||||
|
||||
# 调用 subprocess.check_call
|
||||
return subprocess.check_call(*popenargs, timeout=timeout, **kwargs)
|
||||
|
||||
|
||||
GITHUB_ACCESS_TOKEN = read_github_token()
|
||||
GITHUB_API_URL = "https://api.github.com"
|
||||
@ -124,7 +184,7 @@ def check_git_installed():
|
||||
bool: True if Git is installed, False otherwise.
|
||||
"""
|
||||
try:
|
||||
subprocess.run(
|
||||
subprocess_run(
|
||||
["git", "--version"], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
|
||||
)
|
||||
return True
|
||||
@ -134,7 +194,7 @@ def check_git_installed():
|
||||
|
||||
|
||||
def create_and_checkout_branch(branch_name):
|
||||
subprocess.run(["git", "checkout", "-b", branch_name], check=True)
|
||||
subprocess_run(["git", "checkout", "-b", branch_name], check=True)
|
||||
|
||||
|
||||
def is_issue_url(task):
|
||||
@ -179,7 +239,7 @@ def get_github_repo(issue_repo=False):
|
||||
return config_data["issue_repo"]
|
||||
|
||||
# 使用git命令获取当前仓库的URL
|
||||
result = subprocess.check_output(
|
||||
result = subprocess_check_output(
|
||||
["git", "remote", "get-url", "origin"], stderr=subprocess.STDOUT
|
||||
).strip()
|
||||
# 将结果从bytes转换为str并提取出仓库信息
|
||||
@ -205,7 +265,7 @@ def get_github_repo(issue_repo=False):
|
||||
def get_current_branch():
|
||||
try:
|
||||
# 使用git命令获取当前分支名称
|
||||
result = subprocess.check_output(
|
||||
result = subprocess_check_output(
|
||||
["git", "branch", "--show-current"], stderr=subprocess.STDOUT
|
||||
).strip()
|
||||
# 将结果从bytes转换为str
|
||||
@ -225,7 +285,7 @@ def get_parent_branch():
|
||||
return None
|
||||
try:
|
||||
# 使用git命令获取当前分支的父分支引用
|
||||
result = subprocess.check_output(
|
||||
result = subprocess_check_output(
|
||||
["git", "rev-parse", "--abbrev-ref", f"{current_branch}@{1}"], stderr=subprocess.STDOUT
|
||||
).strip()
|
||||
# 将结果从bytes转换为str
|
||||
@ -235,7 +295,7 @@ def get_parent_branch():
|
||||
# 如果父分支引用和当前分支相同,说明当前分支可能是基于一个没有父分支的提交创建的
|
||||
return None
|
||||
# 使用git命令获取父分支的名称
|
||||
result = subprocess.check_output(
|
||||
result = subprocess_check_output(
|
||||
["git", "name-rev", "--name-only", "--exclude=tags/*", parent_branch_ref],
|
||||
stderr=subprocess.STDOUT,
|
||||
).strip()
|
||||
@ -282,7 +342,7 @@ def get_issue_info_by_url(issue_url):
|
||||
# 获取当前分支自从与base_branch分叉以来的历史提交信息
|
||||
def get_commit_messages(base_branch):
|
||||
# 找到当前分支与base_branch的分叉点
|
||||
merge_base = subprocess.run(
|
||||
merge_base = subprocess_run(
|
||||
["git", "merge-base", "HEAD", base_branch],
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
@ -297,7 +357,7 @@ def get_commit_messages(base_branch):
|
||||
merge_base_commit = merge_base.stdout.strip()
|
||||
|
||||
# 获取从分叉点到当前分支的所有提交信息
|
||||
result = subprocess.run(
|
||||
result = subprocess_run(
|
||||
["git", "log", f"{merge_base_commit}..HEAD"],
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
@ -328,7 +388,7 @@ def create_pull_request(title, body, head, base, repo_name):
|
||||
def run_command_with_retries(command, retries=3, delay=5):
|
||||
for attempt in range(retries):
|
||||
try:
|
||||
subprocess.check_call(command)
|
||||
subprocess_check_call(command)
|
||||
return True
|
||||
except subprocess.CalledProcessError as e:
|
||||
print(f"Command failed: {e}")
|
||||
@ -343,7 +403,7 @@ def run_command_with_retries(command, retries=3, delay=5):
|
||||
def check_unpushed_commits():
|
||||
try:
|
||||
# 获取当前分支的本地提交和远程提交的差异
|
||||
result = subprocess.check_output(["git", "cherry", "-v"]).decode("utf-8").strip()
|
||||
result = subprocess_check_output(["git", "cherry", "-v"]).decode("utf-8").strip()
|
||||
# 如果结果不为空,说明存在未push的提交
|
||||
return bool(result)
|
||||
except subprocess.CalledProcessError as e:
|
||||
@ -357,7 +417,7 @@ def auto_push():
|
||||
return True
|
||||
try:
|
||||
branch = (
|
||||
subprocess.check_output(["git", "rev-parse", "--abbrev-ref", "HEAD"])
|
||||
subprocess_check_output(["git", "rev-parse", "--abbrev-ref", "HEAD"])
|
||||
.strip()
|
||||
.decode("utf-8")
|
||||
)
|
||||
@ -366,7 +426,7 @@ def auto_push():
|
||||
return False
|
||||
|
||||
# 检查当前分支是否有对应的远程分支
|
||||
remote_branch_exists = subprocess.call(["git", "ls-remote", "--exit-code", "origin", branch])
|
||||
remote_branch_exists = subprocess_call(["git", "ls-remote", "--exit-code", "origin", branch])
|
||||
|
||||
push_command = ["git", "push", "origin", branch]
|
||||
if remote_branch_exists == 0:
|
||||
|
Loading…
x
Reference in New Issue
Block a user