break down toolbox.py to multiple files

This commit is contained in:
binary-husky 2024-01-13 16:10:46 +08:00
parent d698b96209
commit 1714116a89
6 changed files with 509 additions and 473 deletions

View File

@ -244,6 +244,9 @@ def predict(inputs, llm_kwargs, plugin_kwargs, chatbot, history=[], system_promp
if has_choices and not choice_valid:
# 一些垃圾第三方接口的出现这样的错误
continue
if len(chunk_decoded) > 0 and (chunkjson is None):
# 传递进来一些奇怪的东西
raise ValueError(f'无法读取以下数据,请检查配置。\n\n{chunk_decoded}')
# 前者是API2D的结束条件后者是OPENAI的结束条件
if ('data: [DONE]' in chunk_decoded) or (len(chunkjson['choices'][0]["delta"]) == 0):
# 判定为数据流的结束gpt_replying_buffer也写完了

View File

@ -0,0 +1,198 @@
import markdown
import re
import os
import math
from latex2mathml.converter import convert as tex2mathml
from functools import wraps, lru_cache
from shared_utils.config_loader import get_conf as get_conf
pj = os.path.join
default_user_name = 'default_user'
def text_divide_paragraph(text):
"""
将文本按照段落分隔符分割开生成带有段落标签的HTML代码
"""
pre = '<div class="markdown-body">'
suf = '</div>'
if text.startswith(pre) and text.endswith(suf):
return text
if '```' in text:
# careful input
return text
elif '</div>' in text:
# careful input
return text
else:
# whatever input
lines = text.split("\n")
for i, line in enumerate(lines):
lines[i] = lines[i].replace(" ", "&nbsp;")
text = "</br>".join(lines)
return pre + text + suf
@lru_cache(maxsize=128) # 使用 lru缓存 加快转换速度
def markdown_convertion(txt):
"""
将Markdown格式的文本转换为HTML格式如果包含数学公式则先将公式转换为HTML格式
"""
pre = '<div class="markdown-body">'
suf = '</div>'
if txt.startswith(pre) and txt.endswith(suf):
# print('警告,输入了已经经过转化的字符串,二次转化可能出问题')
return txt # 已经被转化过,不需要再次转化
markdown_extension_configs = {
'mdx_math': {
'enable_dollar_delimiter': True,
'use_gitlab_delimiters': False,
},
}
find_equation_pattern = r'<script type="math/tex(?:.*?)>(.*?)</script>'
def tex2mathml_catch_exception(content, *args, **kwargs):
try:
content = tex2mathml(content, *args, **kwargs)
except:
content = content
return content
def replace_math_no_render(match):
content = match.group(1)
if 'mode=display' in match.group(0):
content = content.replace('\n', '</br>')
return f"<font color=\"#00FF00\">$$</font><font color=\"#FF00FF\">{content}</font><font color=\"#00FF00\">$$</font>"
else:
return f"<font color=\"#00FF00\">$</font><font color=\"#FF00FF\">{content}</font><font color=\"#00FF00\">$</font>"
def replace_math_render(match):
content = match.group(1)
if 'mode=display' in match.group(0):
if '\\begin{aligned}' in content:
content = content.replace('\\begin{aligned}', '\\begin{array}')
content = content.replace('\\end{aligned}', '\\end{array}')
content = content.replace('&', ' ')
content = tex2mathml_catch_exception(content, display="block")
return content
else:
return tex2mathml_catch_exception(content)
def markdown_bug_hunt(content):
"""
解决一个mdx_math的bug$包裹begin命令时多余<script>
"""
content = content.replace('<script type="math/tex">\n<script type="math/tex; mode=display">',
'<script type="math/tex; mode=display">')
content = content.replace('</script>\n</script>', '</script>')
return content
def is_equation(txt):
"""
判定是否为公式 | 测试1 写出洛伦兹定律使用tex格式公式 测试2 给出柯西不等式使用latex格式 测试3 写出麦克斯韦方程组
"""
if '```' in txt and '```reference' not in txt: return False
if '$' not in txt and '\\[' not in txt: return False
mathpatterns = {
r'(?<!\\|\$)(\$)([^\$]+)(\$)': {'allow_multi_lines': False}, #  $...$
r'(?<!\\)(\$\$)([^\$]+)(\$\$)': {'allow_multi_lines': True}, # $$...$$
r'(?<!\\)(\\\[)(.+?)(\\\])': {'allow_multi_lines': False}, # \[...\]
# r'(?<!\\)(\\\()(.+?)(\\\))': {'allow_multi_lines': False}, # \(...\)
# r'(?<!\\)(\\begin{([a-z]+?\*?)})(.+?)(\\end{\2})': {'allow_multi_lines': True}, # \begin...\end
# r'(?<!\\)(\$`)([^`]+)(`\$)': {'allow_multi_lines': False}, # $`...`$
}
matches = []
for pattern, property in mathpatterns.items():
flags = re.ASCII | re.DOTALL if property['allow_multi_lines'] else re.ASCII
matches.extend(re.findall(pattern, txt, flags))
if len(matches) == 0: return False
contain_any_eq = False
illegal_pattern = re.compile(r'[^\x00-\x7F]|echo')
for match in matches:
if len(match) != 3: return False
eq_canidate = match[1]
if illegal_pattern.search(eq_canidate):
return False
else:
contain_any_eq = True
return contain_any_eq
def fix_markdown_indent(txt):
# fix markdown indent
if (' - ' not in txt) or ('. ' not in txt):
return txt # do not need to fix, fast escape
# walk through the lines and fix non-standard indentation
lines = txt.split("\n")
pattern = re.compile(r'^\s+-')
activated = False
for i, line in enumerate(lines):
if line.startswith('- ') or line.startswith('1. '):
activated = True
if activated and pattern.match(line):
stripped_string = line.lstrip()
num_spaces = len(line) - len(stripped_string)
if (num_spaces % 4) == 3:
num_spaces_should_be = math.ceil(num_spaces / 4) * 4
lines[i] = ' ' * num_spaces_should_be + stripped_string
return '\n'.join(lines)
txt = fix_markdown_indent(txt)
if is_equation(txt): # 有$标识的公式符号,且没有代码段```的标识
# convert everything to html format
split = markdown.markdown(text='---')
convert_stage_1 = markdown.markdown(text=txt, extensions=['sane_lists', 'tables', 'mdx_math', 'fenced_code'],
extension_configs=markdown_extension_configs)
convert_stage_1 = markdown_bug_hunt(convert_stage_1)
# 1. convert to easy-to-copy tex (do not render math)
convert_stage_2_1, n = re.subn(find_equation_pattern, replace_math_no_render, convert_stage_1, flags=re.DOTALL)
# 2. convert to rendered equation
convert_stage_2_2, n = re.subn(find_equation_pattern, replace_math_render, convert_stage_1, flags=re.DOTALL)
# cat them together
return pre + convert_stage_2_1 + f'{split}' + convert_stage_2_2 + suf
else:
return pre + markdown.markdown(txt, extensions=['sane_lists', 'tables', 'fenced_code', 'codehilite']) + suf
def close_up_code_segment_during_stream(gpt_reply):
"""
在gpt输出代码的中途输出了前面的```但还没输出完后面的```补上后面的```
Args:
gpt_reply (str): GPT模型返回的回复字符串
Returns:
str: 返回一个新的字符串将输出代码片段的后面的```补上
"""
if '```' not in gpt_reply:
return gpt_reply
if gpt_reply.endswith('```'):
return gpt_reply
# 排除了以上两个情况,我们
segments = gpt_reply.split('```')
n_mark = len(segments) - 1
if n_mark % 2 == 1:
return gpt_reply + '\n```' # 输出代码片段中!
else:
return gpt_reply
def format_io(self, y):
"""
将输入和输出解析为HTML格式将y中最后一项的输入部分段落化并将输出部分的Markdown和数学公式转换为HTML格式
"""
if y is None or y == []:
return []
i_ask, gpt_reply = y[-1]
# 输入部分太自由,预处理一波
if i_ask is not None: i_ask = text_divide_paragraph(i_ask)
# 当代码输出半截的时候,试着补上后个```
if gpt_reply is not None: gpt_reply = close_up_code_segment_during_stream(gpt_reply)
# process
y[-1] = (
None if i_ask is None else markdown.markdown(i_ask, extensions=['fenced_code', 'tables']),
None if gpt_reply is None else markdown_convertion(gpt_reply)
)
return y

View File

@ -0,0 +1,131 @@
import importlib
import time
import os
from functools import lru_cache
from colorful import print亮红, print亮绿, print亮蓝
pj = os.path.join
default_user_name = 'default_user'
def read_env_variable(arg, default_value):
"""
环境变量可以是 `GPT_ACADEMIC_CONFIG`(优先)也可以直接是`CONFIG`
例如在windows cmd中既可以写
set USE_PROXY=True
set API_KEY=sk-j7caBpkRoxxxxxxxxxxxxxxxxxxxxxxxxxxxx
set proxies={"http":"http://127.0.0.1:10085", "https":"http://127.0.0.1:10085",}
set AVAIL_LLM_MODELS=["gpt-3.5-turbo", "chatglm"]
set AUTHENTICATION=[("username", "password"), ("username2", "password2")]
也可以写
set GPT_ACADEMIC_USE_PROXY=True
set GPT_ACADEMIC_API_KEY=sk-j7caBpkRoxxxxxxxxxxxxxxxxxxxxxxxxxxxx
set GPT_ACADEMIC_proxies={"http":"http://127.0.0.1:10085", "https":"http://127.0.0.1:10085",}
set GPT_ACADEMIC_AVAIL_LLM_MODELS=["gpt-3.5-turbo", "chatglm"]
set GPT_ACADEMIC_AUTHENTICATION=[("username", "password"), ("username2", "password2")]
"""
arg_with_prefix = "GPT_ACADEMIC_" + arg
if arg_with_prefix in os.environ:
env_arg = os.environ[arg_with_prefix]
elif arg in os.environ:
env_arg = os.environ[arg]
else:
raise KeyError
print(f"[ENV_VAR] 尝试加载{arg},默认值:{default_value} --> 修正值:{env_arg}")
try:
if isinstance(default_value, bool):
env_arg = env_arg.strip()
if env_arg == 'True': r = True
elif env_arg == 'False': r = False
else: print('Enter True or False, but have:', env_arg); r = default_value
elif isinstance(default_value, int):
r = int(env_arg)
elif isinstance(default_value, float):
r = float(env_arg)
elif isinstance(default_value, str):
r = env_arg.strip()
elif isinstance(default_value, dict):
r = eval(env_arg)
elif isinstance(default_value, list):
r = eval(env_arg)
elif default_value is None:
assert arg == "proxies"
r = eval(env_arg)
else:
print亮红(f"[ENV_VAR] 环境变量{arg}不支持通过环境变量设置! ")
raise KeyError
except:
print亮红(f"[ENV_VAR] 环境变量{arg}加载失败! ")
raise KeyError(f"[ENV_VAR] 环境变量{arg}加载失败! ")
print亮绿(f"[ENV_VAR] 成功读取环境变量{arg}")
return r
@lru_cache(maxsize=128)
def read_single_conf_with_lru_cache(arg):
from shared_utils.key_pattern_manager import is_any_api_key
try:
# 优先级1. 获取环境变量作为配置
default_ref = getattr(importlib.import_module('config'), arg) # 读取默认值作为数据类型转换的参考
r = read_env_variable(arg, default_ref)
except:
try:
# 优先级2. 获取config_private中的配置
r = getattr(importlib.import_module('config_private'), arg)
except:
# 优先级3. 获取config中的配置
r = getattr(importlib.import_module('config'), arg)
# 在读取API_KEY时检查一下是不是忘了改config
if arg == 'API_URL_REDIRECT':
oai_rd = r.get("https://api.openai.com/v1/chat/completions", None) # API_URL_REDIRECT填写格式是错误的请阅读`https://github.com/binary-husky/gpt_academic/wiki/项目配置说明`
if oai_rd and not oai_rd.endswith('/completions'):
print亮红("\n\n[API_URL_REDIRECT] API_URL_REDIRECT填错了。请阅读`https://github.com/binary-husky/gpt_academic/wiki/项目配置说明`。如果您确信自己没填错,无视此消息即可。")
time.sleep(5)
if arg == 'API_KEY':
print亮蓝(f"[API_KEY] 本项目现已支持OpenAI和Azure的api-key。也支持同时填写多个api-key如API_KEY=\"openai-key1,openai-key2,azure-key3\"")
print亮蓝(f"[API_KEY] 您既可以在config.py中修改api-key(s)也可以在问题输入区输入临时的api-key(s),然后回车键提交后即可生效。")
if is_any_api_key(r):
print亮绿(f"[API_KEY] 您的 API_KEY 是: {r[:15]}*** API_KEY 导入成功")
else:
print亮红("[API_KEY] 您的 API_KEY 不满足任何一种已知的密钥格式请在config文件中修改API密钥之后再运行。")
if arg == 'proxies':
if not read_single_conf_with_lru_cache('USE_PROXY'): r = None # 检查USE_PROXY防止proxies单独起作用
if r is None:
print亮红('[PROXY] 网络代理状态未配置。无代理状态下很可能无法访问OpenAI家族的模型。建议检查USE_PROXY选项是否修改。')
else:
print亮绿('[PROXY] 网络代理状态:已配置。配置信息如下:', r)
assert isinstance(r, dict), 'proxies格式错误请注意proxies选项的格式不要遗漏括号。'
return r
@lru_cache(maxsize=128)
def get_conf(*args):
"""
本项目的所有配置都集中在config.py中 修改配置有三种方法您只需要选择其中一种即可
- 直接修改config.py
- 创建并修改config_private.py
- 修改环境变量修改docker-compose.yml等价于修改容器内部的环境变量
注意如果您使用docker-compose部署请修改docker-compose等价于修改容器内部的环境变量
"""
res = []
for arg in args:
r = read_single_conf_with_lru_cache(arg)
res.append(r)
if len(res) == 1: return res[0]
return res
def set_conf(key, value):
from toolbox import read_single_conf_with_lru_cache
read_single_conf_with_lru_cache.cache_clear()
get_conf.cache_clear()
os.environ[key] = str(value)
altered = get_conf(key)
return altered
def set_multi_conf(dic):
for k, v in dic.items(): set_conf(k, v)
return

View File

@ -0,0 +1,85 @@
import os
"""
========================================================================
接驳void-terminal:
- set_conf: 在运行过程中动态地修改配置
- set_multi_conf: 在运行过程中动态地修改多个配置
- get_plugin_handle: 获取插件的句柄
- get_plugin_default_kwargs: 获取插件的默认参数
- get_chat_handle: 获取简单聊天的句柄
- get_chat_default_kwargs: 获取简单聊天的默认参数
========================================================================
"""
def get_plugin_handle(plugin_name):
"""
e.g. plugin_name = 'crazy_functions.批量Markdown翻译->Markdown翻译指定语言'
"""
import importlib
assert '->' in plugin_name, \
"Example of plugin_name: crazy_functions.批量Markdown翻译->Markdown翻译指定语言"
module, fn_name = plugin_name.split('->')
f_hot_reload = getattr(importlib.import_module(module, fn_name), fn_name)
return f_hot_reload
def get_chat_handle():
"""
Get chat function
"""
from request_llms.bridge_all import predict_no_ui_long_connection
return predict_no_ui_long_connection
def get_plugin_default_kwargs():
"""
Get Plugin Default Arguments
"""
from toolbox import ChatBotWithCookies, load_chat_cookies
cookies = load_chat_cookies()
llm_kwargs = {
'api_key': cookies['api_key'],
'llm_model': cookies['llm_model'],
'top_p': 1.0,
'max_length': None,
'temperature': 1.0,
}
chatbot = ChatBotWithCookies(llm_kwargs)
# txt, llm_kwargs, plugin_kwargs, chatbot, history, system_prompt, web_port
DEFAULT_FN_GROUPS_kwargs = {
"main_input": "./README.md",
"llm_kwargs": llm_kwargs,
"plugin_kwargs": {},
"chatbot_with_cookie": chatbot,
"history": [],
"system_prompt": "You are a good AI.",
"web_port": None
}
return DEFAULT_FN_GROUPS_kwargs
def get_chat_default_kwargs():
"""
Get Chat Default Arguments
"""
from toolbox import load_chat_cookies
cookies = load_chat_cookies()
llm_kwargs = {
'api_key': cookies['api_key'],
'llm_model': cookies['llm_model'],
'top_p': 1.0,
'max_length': None,
'temperature': 1.0,
}
default_chat_kwargs = {
"inputs": "Hello there, are you ready?",
"llm_kwargs": llm_kwargs,
"history": [],
"sys_prompt": "You are AI assistant",
"observe_window": None,
"console_slience": False,
}
return default_chat_kwargs

View File

@ -0,0 +1,81 @@
import re
import os
from functools import wraps, lru_cache
from shared_utils.advanced_markdown_format import format_io
from shared_utils.config_loader import get_conf as get_conf
pj = os.path.join
default_user_name = 'default_user'
def is_openai_api_key(key):
CUSTOM_API_KEY_PATTERN = get_conf('CUSTOM_API_KEY_PATTERN')
if len(CUSTOM_API_KEY_PATTERN) != 0:
API_MATCH_ORIGINAL = re.match(CUSTOM_API_KEY_PATTERN, key)
else:
API_MATCH_ORIGINAL = re.match(r"sk-[a-zA-Z0-9]{48}$", key)
return bool(API_MATCH_ORIGINAL)
def is_azure_api_key(key):
API_MATCH_AZURE = re.match(r"[a-zA-Z0-9]{32}$", key)
return bool(API_MATCH_AZURE)
def is_api2d_key(key):
API_MATCH_API2D = re.match(r"fk[a-zA-Z0-9]{6}-[a-zA-Z0-9]{32}$", key)
return bool(API_MATCH_API2D)
def is_any_api_key(key):
if ',' in key:
keys = key.split(',')
for k in keys:
if is_any_api_key(k): return True
return False
else:
return is_openai_api_key(key) or is_api2d_key(key) or is_azure_api_key(key)
def what_keys(keys):
avail_key_list = {'OpenAI Key': 0, "Azure Key": 0, "API2D Key": 0}
key_list = keys.split(',')
for k in key_list:
if is_openai_api_key(k):
avail_key_list['OpenAI Key'] += 1
for k in key_list:
if is_api2d_key(k):
avail_key_list['API2D Key'] += 1
for k in key_list:
if is_azure_api_key(k):
avail_key_list['Azure Key'] += 1
return f"检测到: OpenAI Key {avail_key_list['OpenAI Key']} 个, Azure Key {avail_key_list['Azure Key']} 个, API2D Key {avail_key_list['API2D Key']}"
def select_api_key(keys, llm_model):
import random
avail_key_list = []
key_list = keys.split(',')
if llm_model.startswith('gpt-'):
for k in key_list:
if is_openai_api_key(k): avail_key_list.append(k)
if llm_model.startswith('api2d-'):
for k in key_list:
if is_api2d_key(k): avail_key_list.append(k)
if llm_model.startswith('azure-'):
for k in key_list:
if is_azure_api_key(k): avail_key_list.append(k)
if len(avail_key_list) == 0:
raise RuntimeError(f"您提供的api-key不满足要求不包含任何可用于{llm_model}的api-key。您可能选择了错误的模型或请求源右下角更换模型菜单中可切换openai,azure,claude,api2d等请求源")
api_key = random.choice(avail_key_list) # 随机负载均衡
return api_key

View File

@ -1,4 +1,3 @@
import markdown
import importlib
import time
import inspect
@ -8,9 +7,17 @@ import base64
import gradio
import shutil
import glob
import math
from latex2mathml.converter import convert as tex2mathml
from functools import wraps, lru_cache
from functools import wraps
from shared_utils.config_loader import get_conf
from shared_utils.config_loader import set_conf
from shared_utils.advanced_markdown_format import format_io
from shared_utils.key_pattern_manager import select_api_key
from shared_utils.key_pattern_manager import is_any_api_key
from shared_utils.key_pattern_manager import what_keys
from shared_utils.connect_void_terminal import get_chat_handle
from shared_utils.connect_void_terminal import get_plugin_handle
from shared_utils.connect_void_terminal import get_plugin_default_kwargs
from shared_utils.connect_void_terminal import get_chat_default_kwargs
pj = os.path.join
default_user_name = 'default_user'
@ -284,195 +291,6 @@ def report_exception(chatbot, history, a, b):
history.extend([a, b])
def text_divide_paragraph(text):
"""
将文本按照段落分隔符分割开生成带有段落标签的HTML代码
"""
pre = '<div class="markdown-body">'
suf = '</div>'
if text.startswith(pre) and text.endswith(suf):
return text
if '```' in text:
# careful input
return text
elif '</div>' in text:
# careful input
return text
else:
# whatever input
lines = text.split("\n")
for i, line in enumerate(lines):
lines[i] = lines[i].replace(" ", "&nbsp;")
text = "</br>".join(lines)
return pre + text + suf
@lru_cache(maxsize=128) # 使用 lru缓存 加快转换速度
def markdown_convertion(txt):
"""
将Markdown格式的文本转换为HTML格式如果包含数学公式则先将公式转换为HTML格式
"""
pre = '<div class="markdown-body">'
suf = '</div>'
if txt.startswith(pre) and txt.endswith(suf):
# print('警告,输入了已经经过转化的字符串,二次转化可能出问题')
return txt # 已经被转化过,不需要再次转化
markdown_extension_configs = {
'mdx_math': {
'enable_dollar_delimiter': True,
'use_gitlab_delimiters': False,
},
}
find_equation_pattern = r'<script type="math/tex(?:.*?)>(.*?)</script>'
def tex2mathml_catch_exception(content, *args, **kwargs):
try:
content = tex2mathml(content, *args, **kwargs)
except:
content = content
return content
def replace_math_no_render(match):
content = match.group(1)
if 'mode=display' in match.group(0):
content = content.replace('\n', '</br>')
return f"<font color=\"#00FF00\">$$</font><font color=\"#FF00FF\">{content}</font><font color=\"#00FF00\">$$</font>"
else:
return f"<font color=\"#00FF00\">$</font><font color=\"#FF00FF\">{content}</font><font color=\"#00FF00\">$</font>"
def replace_math_render(match):
content = match.group(1)
if 'mode=display' in match.group(0):
if '\\begin{aligned}' in content:
content = content.replace('\\begin{aligned}', '\\begin{array}')
content = content.replace('\\end{aligned}', '\\end{array}')
content = content.replace('&', ' ')
content = tex2mathml_catch_exception(content, display="block")
return content
else:
return tex2mathml_catch_exception(content)
def markdown_bug_hunt(content):
"""
解决一个mdx_math的bug$包裹begin命令时多余<script>
"""
content = content.replace('<script type="math/tex">\n<script type="math/tex; mode=display">',
'<script type="math/tex; mode=display">')
content = content.replace('</script>\n</script>', '</script>')
return content
def is_equation(txt):
"""
判定是否为公式 | 测试1 写出洛伦兹定律使用tex格式公式 测试2 给出柯西不等式使用latex格式 测试3 写出麦克斯韦方程组
"""
if '```' in txt and '```reference' not in txt: return False
if '$' not in txt and '\\[' not in txt: return False
mathpatterns = {
r'(?<!\\|\$)(\$)([^\$]+)(\$)': {'allow_multi_lines': False}, #  $...$
r'(?<!\\)(\$\$)([^\$]+)(\$\$)': {'allow_multi_lines': True}, # $$...$$
r'(?<!\\)(\\\[)(.+?)(\\\])': {'allow_multi_lines': False}, # \[...\]
# r'(?<!\\)(\\\()(.+?)(\\\))': {'allow_multi_lines': False}, # \(...\)
# r'(?<!\\)(\\begin{([a-z]+?\*?)})(.+?)(\\end{\2})': {'allow_multi_lines': True}, # \begin...\end
# r'(?<!\\)(\$`)([^`]+)(`\$)': {'allow_multi_lines': False}, # $`...`$
}
matches = []
for pattern, property in mathpatterns.items():
flags = re.ASCII | re.DOTALL if property['allow_multi_lines'] else re.ASCII
matches.extend(re.findall(pattern, txt, flags))
if len(matches) == 0: return False
contain_any_eq = False
illegal_pattern = re.compile(r'[^\x00-\x7F]|echo')
for match in matches:
if len(match) != 3: return False
eq_canidate = match[1]
if illegal_pattern.search(eq_canidate):
return False
else:
contain_any_eq = True
return contain_any_eq
def fix_markdown_indent(txt):
# fix markdown indent
if (' - ' not in txt) or ('. ' not in txt):
return txt # do not need to fix, fast escape
# walk through the lines and fix non-standard indentation
lines = txt.split("\n")
pattern = re.compile(r'^\s+-')
activated = False
for i, line in enumerate(lines):
if line.startswith('- ') or line.startswith('1. '):
activated = True
if activated and pattern.match(line):
stripped_string = line.lstrip()
num_spaces = len(line) - len(stripped_string)
if (num_spaces % 4) == 3:
num_spaces_should_be = math.ceil(num_spaces / 4) * 4
lines[i] = ' ' * num_spaces_should_be + stripped_string
return '\n'.join(lines)
txt = fix_markdown_indent(txt)
if is_equation(txt): # 有$标识的公式符号,且没有代码段```的标识
# convert everything to html format
split = markdown.markdown(text='---')
convert_stage_1 = markdown.markdown(text=txt, extensions=['sane_lists', 'tables', 'mdx_math', 'fenced_code'],
extension_configs=markdown_extension_configs)
convert_stage_1 = markdown_bug_hunt(convert_stage_1)
# 1. convert to easy-to-copy tex (do not render math)
convert_stage_2_1, n = re.subn(find_equation_pattern, replace_math_no_render, convert_stage_1, flags=re.DOTALL)
# 2. convert to rendered equation
convert_stage_2_2, n = re.subn(find_equation_pattern, replace_math_render, convert_stage_1, flags=re.DOTALL)
# cat them together
return pre + convert_stage_2_1 + f'{split}' + convert_stage_2_2 + suf
else:
return pre + markdown.markdown(txt, extensions=['sane_lists', 'tables', 'fenced_code', 'codehilite']) + suf
def close_up_code_segment_during_stream(gpt_reply):
"""
在gpt输出代码的中途输出了前面的```但还没输出完后面的```补上后面的```
Args:
gpt_reply (str): GPT模型返回的回复字符串
Returns:
str: 返回一个新的字符串将输出代码片段的后面的```补上
"""
if '```' not in gpt_reply:
return gpt_reply
if gpt_reply.endswith('```'):
return gpt_reply
# 排除了以上两个情况,我们
segments = gpt_reply.split('```')
n_mark = len(segments) - 1
if n_mark % 2 == 1:
return gpt_reply + '\n```' # 输出代码片段中!
else:
return gpt_reply
def format_io(self, y):
"""
将输入和输出解析为HTML格式将y中最后一项的输入部分段落化并将输出部分的Markdown和数学公式转换为HTML格式
"""
if y is None or y == []:
return []
i_ask, gpt_reply = y[-1]
# 输入部分太自由,预处理一波
if i_ask is not None: i_ask = text_divide_paragraph(i_ask)
# 当代码输出半截的时候,试着补上后个```
if gpt_reply is not None: gpt_reply = close_up_code_segment_during_stream(gpt_reply)
# process
y[-1] = (
None if i_ask is None else markdown.markdown(i_ask, extensions=['fenced_code', 'tables']),
None if gpt_reply is None else markdown_convertion(gpt_reply)
)
return y
def find_free_port():
"""
返回当前系统中可用的未使用端口
@ -777,189 +595,6 @@ def load_chat_cookies():
return {'api_key': API_KEY, 'llm_model': LLM_MODEL, 'customize_fn_overwrite': customize_fn_overwrite_}
def is_openai_api_key(key):
CUSTOM_API_KEY_PATTERN = get_conf('CUSTOM_API_KEY_PATTERN')
if len(CUSTOM_API_KEY_PATTERN) != 0:
API_MATCH_ORIGINAL = re.match(CUSTOM_API_KEY_PATTERN, key)
else:
API_MATCH_ORIGINAL = re.match(r"sk-[a-zA-Z0-9]{48}$", key)
return bool(API_MATCH_ORIGINAL)
def is_azure_api_key(key):
API_MATCH_AZURE = re.match(r"[a-zA-Z0-9]{32}$", key)
return bool(API_MATCH_AZURE)
def is_api2d_key(key):
API_MATCH_API2D = re.match(r"fk[a-zA-Z0-9]{6}-[a-zA-Z0-9]{32}$", key)
return bool(API_MATCH_API2D)
def is_any_api_key(key):
if ',' in key:
keys = key.split(',')
for k in keys:
if is_any_api_key(k): return True
return False
else:
return is_openai_api_key(key) or is_api2d_key(key) or is_azure_api_key(key)
def what_keys(keys):
avail_key_list = {'OpenAI Key': 0, "Azure Key": 0, "API2D Key": 0}
key_list = keys.split(',')
for k in key_list:
if is_openai_api_key(k):
avail_key_list['OpenAI Key'] += 1
for k in key_list:
if is_api2d_key(k):
avail_key_list['API2D Key'] += 1
for k in key_list:
if is_azure_api_key(k):
avail_key_list['Azure Key'] += 1
return f"检测到: OpenAI Key {avail_key_list['OpenAI Key']} 个, Azure Key {avail_key_list['Azure Key']} 个, API2D Key {avail_key_list['API2D Key']}"
def select_api_key(keys, llm_model):
import random
avail_key_list = []
key_list = keys.split(',')
if llm_model.startswith('gpt-'):
for k in key_list:
if is_openai_api_key(k): avail_key_list.append(k)
if llm_model.startswith('api2d-'):
for k in key_list:
if is_api2d_key(k): avail_key_list.append(k)
if llm_model.startswith('azure-'):
for k in key_list:
if is_azure_api_key(k): avail_key_list.append(k)
if len(avail_key_list) == 0:
raise RuntimeError(f"您提供的api-key不满足要求不包含任何可用于{llm_model}的api-key。您可能选择了错误的模型或请求源右下角更换模型菜单中可切换openai,azure,claude,api2d等请求源")
api_key = random.choice(avail_key_list) # 随机负载均衡
return api_key
def read_env_variable(arg, default_value):
"""
环境变量可以是 `GPT_ACADEMIC_CONFIG`(优先)也可以直接是`CONFIG`
例如在windows cmd中既可以写
set USE_PROXY=True
set API_KEY=sk-j7caBpkRoxxxxxxxxxxxxxxxxxxxxxxxxxxxx
set proxies={"http":"http://127.0.0.1:10085", "https":"http://127.0.0.1:10085",}
set AVAIL_LLM_MODELS=["gpt-3.5-turbo", "chatglm"]
set AUTHENTICATION=[("username", "password"), ("username2", "password2")]
也可以写
set GPT_ACADEMIC_USE_PROXY=True
set GPT_ACADEMIC_API_KEY=sk-j7caBpkRoxxxxxxxxxxxxxxxxxxxxxxxxxxxx
set GPT_ACADEMIC_proxies={"http":"http://127.0.0.1:10085", "https":"http://127.0.0.1:10085",}
set GPT_ACADEMIC_AVAIL_LLM_MODELS=["gpt-3.5-turbo", "chatglm"]
set GPT_ACADEMIC_AUTHENTICATION=[("username", "password"), ("username2", "password2")]
"""
from colorful import print亮红, print亮绿
arg_with_prefix = "GPT_ACADEMIC_" + arg
if arg_with_prefix in os.environ:
env_arg = os.environ[arg_with_prefix]
elif arg in os.environ:
env_arg = os.environ[arg]
else:
raise KeyError
print(f"[ENV_VAR] 尝试加载{arg},默认值:{default_value} --> 修正值:{env_arg}")
try:
if isinstance(default_value, bool):
env_arg = env_arg.strip()
if env_arg == 'True': r = True
elif env_arg == 'False': r = False
else: print('Enter True or False, but have:', env_arg); r = default_value
elif isinstance(default_value, int):
r = int(env_arg)
elif isinstance(default_value, float):
r = float(env_arg)
elif isinstance(default_value, str):
r = env_arg.strip()
elif isinstance(default_value, dict):
r = eval(env_arg)
elif isinstance(default_value, list):
r = eval(env_arg)
elif default_value is None:
assert arg == "proxies"
r = eval(env_arg)
else:
print亮红(f"[ENV_VAR] 环境变量{arg}不支持通过环境变量设置! ")
raise KeyError
except:
print亮红(f"[ENV_VAR] 环境变量{arg}加载失败! ")
raise KeyError(f"[ENV_VAR] 环境变量{arg}加载失败! ")
print亮绿(f"[ENV_VAR] 成功读取环境变量{arg}")
return r
@lru_cache(maxsize=128)
def read_single_conf_with_lru_cache(arg):
from colorful import print亮红, print亮绿, print亮蓝
try:
# 优先级1. 获取环境变量作为配置
default_ref = getattr(importlib.import_module('config'), arg) # 读取默认值作为数据类型转换的参考
r = read_env_variable(arg, default_ref)
except:
try:
# 优先级2. 获取config_private中的配置
r = getattr(importlib.import_module('config_private'), arg)
except:
# 优先级3. 获取config中的配置
r = getattr(importlib.import_module('config'), arg)
# 在读取API_KEY时检查一下是不是忘了改config
if arg == 'API_URL_REDIRECT':
oai_rd = r.get("https://api.openai.com/v1/chat/completions", None) # API_URL_REDIRECT填写格式是错误的请阅读`https://github.com/binary-husky/gpt_academic/wiki/项目配置说明`
if oai_rd and not oai_rd.endswith('/completions'):
print亮红("\n\n[API_URL_REDIRECT] API_URL_REDIRECT填错了。请阅读`https://github.com/binary-husky/gpt_academic/wiki/项目配置说明`。如果您确信自己没填错,无视此消息即可。")
time.sleep(5)
if arg == 'API_KEY':
print亮蓝(f"[API_KEY] 本项目现已支持OpenAI和Azure的api-key。也支持同时填写多个api-key如API_KEY=\"openai-key1,openai-key2,azure-key3\"")
print亮蓝(f"[API_KEY] 您既可以在config.py中修改api-key(s)也可以在问题输入区输入临时的api-key(s),然后回车键提交后即可生效。")
if is_any_api_key(r):
print亮绿(f"[API_KEY] 您的 API_KEY 是: {r[:15]}*** API_KEY 导入成功")
else:
print亮红("[API_KEY] 您的 API_KEY 不满足任何一种已知的密钥格式请在config文件中修改API密钥之后再运行。")
if arg == 'proxies':
if not read_single_conf_with_lru_cache('USE_PROXY'): r = None # 检查USE_PROXY防止proxies单独起作用
if r is None:
print亮红('[PROXY] 网络代理状态未配置。无代理状态下很可能无法访问OpenAI家族的模型。建议检查USE_PROXY选项是否修改。')
else:
print亮绿('[PROXY] 网络代理状态:已配置。配置信息如下:', r)
assert isinstance(r, dict), 'proxies格式错误请注意proxies选项的格式不要遗漏括号。'
return r
@lru_cache(maxsize=128)
def get_conf(*args):
"""
本项目的所有配置都集中在config.py中 修改配置有三种方法您只需要选择其中一种即可
- 直接修改config.py
- 创建并修改config_private.py
- 修改环境变量修改docker-compose.yml等价于修改容器内部的环境变量
注意如果您使用docker-compose部署请修改docker-compose等价于修改容器内部的环境变量
"""
res = []
for arg in args:
r = read_single_conf_with_lru_cache(arg)
res.append(r)
if len(res) == 1: return res[0]
return res
def clear_line_break(txt):
txt = txt.replace('\n', ' ')
txt = txt.replace(' ', ' ')
@ -1227,103 +862,6 @@ def Singleton(cls):
return _singleton
"""
========================================================================
第四部分
接驳void-terminal:
- set_conf: 在运行过程中动态地修改配置
- set_multi_conf: 在运行过程中动态地修改多个配置
- get_plugin_handle: 获取插件的句柄
- get_plugin_default_kwargs: 获取插件的默认参数
- get_chat_handle: 获取简单聊天的句柄
- get_chat_default_kwargs: 获取简单聊天的默认参数
========================================================================
"""
def set_conf(key, value):
from toolbox import read_single_conf_with_lru_cache, get_conf
read_single_conf_with_lru_cache.cache_clear()
get_conf.cache_clear()
os.environ[key] = str(value)
altered = get_conf(key)
return altered
def set_multi_conf(dic):
for k, v in dic.items(): set_conf(k, v)
return
def get_plugin_handle(plugin_name):
"""
e.g. plugin_name = 'crazy_functions.批量Markdown翻译->Markdown翻译指定语言'
"""
import importlib
assert '->' in plugin_name, \
"Example of plugin_name: crazy_functions.批量Markdown翻译->Markdown翻译指定语言"
module, fn_name = plugin_name.split('->')
f_hot_reload = getattr(importlib.import_module(module, fn_name), fn_name)
return f_hot_reload
def get_chat_handle():
"""
"""
from request_llms.bridge_all import predict_no_ui_long_connection
return predict_no_ui_long_connection
def get_plugin_default_kwargs():
"""
"""
from toolbox import ChatBotWithCookies
cookies = load_chat_cookies()
llm_kwargs = {
'api_key': cookies['api_key'],
'llm_model': cookies['llm_model'],
'top_p': 1.0,
'max_length': None,
'temperature': 1.0,
}
chatbot = ChatBotWithCookies(llm_kwargs)
# txt, llm_kwargs, plugin_kwargs, chatbot, history, system_prompt, web_port
DEFAULT_FN_GROUPS_kwargs = {
"main_input": "./README.md",
"llm_kwargs": llm_kwargs,
"plugin_kwargs": {},
"chatbot_with_cookie": chatbot,
"history": [],
"system_prompt": "You are a good AI.",
"web_port": None
}
return DEFAULT_FN_GROUPS_kwargs
def get_chat_default_kwargs():
"""
"""
cookies = load_chat_cookies()
llm_kwargs = {
'api_key': cookies['api_key'],
'llm_model': cookies['llm_model'],
'top_p': 1.0,
'max_length': None,
'temperature': 1.0,
}
default_chat_kwargs = {
"inputs": "Hello there, are you ready?",
"llm_kwargs": llm_kwargs,
"history": [],
"sys_prompt": "You are AI assistant",
"observe_window": None,
"console_slience": False,
}
return default_chat_kwargs
def get_pictures_list(path):
file_manifest = [f for f in glob.glob(f'{path}/**/*.jpg', recursive=True)]
file_manifest += [f for f in glob.glob(f'{path}/**/*.jpeg', recursive=True)]