""" openai api utils """ # flake8: noqa: E402 # Import necessary libraries import json import os import re from typing import Dict, List import httpx import openai from devchat.ide import IDEService from .pipeline import ( RetryException, # Import RetryException class exception_handle, # Function to handle exceptions parallel, # Function to run tasks in parallel pipeline, # Function to create a pipeline of tasks retry, # Function to retry a task ) def _try_remove_markdown_block_flag(content): """ If the content is a markdown block, this function removes the header ```xxx and footer ``` """ # Define a regex pattern to match the header and footer of a markdown block pattern = r"^\s*```\s*(\w+)\s*\n(.*?)\n\s*```\s*$" # Use the re module to match the pattern match = re.search(pattern, content, re.DOTALL | re.MULTILINE) if match: # If a match is found, extract the content of the markdown block and return it _ = match.group(1) # language markdown_content = match.group(2) return markdown_content.strip() # If no match is found, return the original content return content def chat_completion_stream_commit( messages: List[Dict], # [{"role": "user", "content": "hello"}] llm_config: Dict, # {"model": "...", ...} ): """ This function is used to commit chat completion stream """ proxy_url = os.environ.get("DEVCHAT_PROXY", "") proxy_setting = {"proxy": {"https://": proxy_url, "http://": proxy_url}} if proxy_url else {} # Initialize OpenAI client with API key, base URL and http client client = openai.OpenAI( api_key=os.environ.get("OPENAI_API_KEY", None), base_url=os.environ.get("OPENAI_API_BASE", None), http_client=httpx.Client(**proxy_setting, trust_env=False), ) # Update llm_config dictionary llm_config["stream"] = True llm_config["timeout"] = 60 # Return chat completions return client.chat.completions.create(messages=messages, **llm_config) def chat_completion_stream_raw(**kwargs): """ This function is used to get raw chat completion stream """ proxy_url = os.environ.get("DEVCHAT_PROXY", "") proxy_setting = {"proxy": {"https://": proxy_url, "http://": proxy_url}} if proxy_url else {} # Initialize OpenAI client with API key, base URL and http client client = openai.OpenAI( api_key=os.environ.get("OPENAI_API_KEY", None), base_url=os.environ.get("OPENAI_API_BASE", None), http_client=httpx.Client(**proxy_setting, trust_env=False), ) # Update kwargs dictionary kwargs["stream"] = True kwargs["timeout"] = 60 # Return chat completions return client.chat.completions.create(**kwargs) def stream_out_chunk(chunks): """ This function is used to print out chunks of data """ for chunk in chunks: chunk_dict = chunk.dict() if len(chunk_dict["choices"]) > 0: delta = chunk_dict["choices"][0]["delta"] if delta.get("content", None): print(delta["content"], end="", flush=True) yield chunk def retry_timeout(chunks): """ This function is used to handle timeout errors """ try: for chunk in chunks: yield chunk except (openai.APIConnectionError, openai.APITimeoutError) as err: IDEService().ide_logging("info", f"in retry_timeout: err: {err}") raise RetryException(err) from err def chunk_list(chunks): """ This function is used to convert chunks into a list """ return [chunk for chunk in chunks] def chunks_content(chunks): """ This function is used to extract content from chunks """ content = None for chunk in chunks: chunk_dict = chunk.dict() if len(chunk_dict["choices"]) > 0: delta = chunk_dict["choices"][0]["delta"] if delta.get("content", None): if content is None: content = "" content += delta["content"] return content def chunks_call(chunks): """ This function is used to extract tool calls from chunks """ tool_calls = [] for chunk in chunks: chunk = chunk.dict() if len(chunk["choices"]) > 0: delta = chunk["choices"][0]["delta"] if "tool_calls" in delta and delta["tool_calls"]: tool_call = delta["tool_calls"][0]["function"] if delta["tool_calls"][0].get("index", None) is not None: index = delta["tool_calls"][0]["index"] if index >= len(tool_calls): tool_calls.append({"name": None, "arguments": ""}) if tool_call.get("name", None): tool_calls[-1]["name"] = tool_call["name"] if tool_call.get("arguments", None): tool_calls[-1]["arguments"] += tool_call["arguments"] return tool_calls def content_to_json(content): """ This function is used to convert content to JSON """ try: content_no_block = _try_remove_markdown_block_flag(content) response_obj = json.loads(content_no_block, strict=False) return response_obj except json.JSONDecodeError as err: IDEService().ide_logging("debug", f"Receive content: {content}") IDEService().ide_logging("debug", f"in content_to_json: json decode error: {err}") raise RetryException(err) from err except Exception as err: IDEService().ide_logging("debug", f"in content_to_json: other error: {err}") raise err def to_dict_content_and_call(content, tool_calls=None): """ This function is used to convert content and tool calls to a dictionary """ if tool_calls is None: tool_calls = [] return { "content": content, "function_name": tool_calls[0]["name"] if tool_calls else None, "parameters": tool_calls[0]["arguments"] if tool_calls else "", "tool_calls": tool_calls, } # Define a pipeline function for chat completion content. # This pipeline first commits a chat completion stream, handles any timeout errors, # and then extracts the content from the chunks. # If any step in the pipeline fails, it will retry the entire pipeline up to 3 times. chat_completion_content = retry( pipeline(chat_completion_stream_commit, retry_timeout, chunks_content), times=3 ) # Define a pipeline function for chat completion stream content. # This pipeline first commits a chat completion stream, handles any timeout errors, # streams out the chunk, and then extracts the content from the chunks. # If any step in the pipeline fails, it will retry the entire pipeline up to 3 times. chat_completion_stream_content = retry( pipeline(chat_completion_stream_commit, retry_timeout, stream_out_chunk, chunks_content), times=3, ) # Define a pipeline function for chat completion call. # This pipeline first commits a chat completion stream, handles any timeout errors, # and then extracts the tool calls from the chunks. # If any step in the pipeline fails, it will retry the entire pipeline up to 3 times. chat_completion_call = retry( pipeline(chat_completion_stream_commit, retry_timeout, chunks_call), times=3 ) # Define a pipeline function for chat completion without streaming and return a JSON object. # This pipeline first commits a chat completion stream, handles any timeout errors, extracts # the content from the chunks and then converts the content to JSON. # If any step in the pipeline fails, it will retry the entire pipeline up to 3 times. # If a JSONDecodeError is encountered during the content to JSON conversion, it will log the # error and retry the pipeline. # If any other exception is encountered, it will log the error and raise it. chat_completion_no_stream_return_json_with_retry = exception_handle( retry( pipeline(chat_completion_stream_commit, retry_timeout, chunks_content, content_to_json), times=3, ), None, ) def chat_completion_no_stream_return_json(messages: List[Dict], llm_config: Dict): """ This function is used to get chat completion without streaming and return JSON object """ llm_config["response_format"] = {"type": "json_object"} return chat_completion_no_stream_return_json_with_retry( messages=messages, llm_config=llm_config ) # Define a pipeline function for chat completion stream. # This pipeline first commits a chat completion stream, handles any timeout errors, # extracts the content from the chunks, and then converts the content and tool calls # to a dictionary. # If any step in the pipeline fails, it will retry the entire pipeline up to 3 times. # If an exception is encountered, it will return a dictionary with None values and the error. chat_completion_stream = exception_handle( retry( pipeline( chat_completion_stream_commit, retry_timeout, chunks_content, to_dict_content_and_call, ), times=3, ), None, ) # Define a pipeline function for chat call completion stream. # This pipeline first commits a chat completion stream, handles any timeout errors, # converts the chunks to a list, extracts the content and tool calls from the chunks # in parallel, and then converts the content and tool calls to a dictionary. # If any step in the pipeline fails, it will retry the entire pipeline up to 3 times. # If an exception is encountered, it will return a dictionary with None values, an empty # tool calls list, and the error. chat_call_completion_stream = exception_handle( retry( pipeline( chat_completion_stream_commit, retry_timeout, chunk_list, parallel(chunks_content, chunks_call), to_dict_content_and_call, ), times=3, ), None, )