diff --git a/request_llms/com_google.py b/request_llms/com_google.py index 211581a..e66d659 100644 --- a/request_llms/com_google.py +++ b/request_llms/com_google.py @@ -7,7 +7,7 @@ import os import re import requests from typing import List, Dict, Tuple -from toolbox import get_conf, encode_image, get_pictures_list +from toolbox import get_conf, encode_image, get_pictures_list, to_markdown_tabs proxies, TIMEOUT_SECONDS = get_conf("proxies", "TIMEOUT_SECONDS") @@ -112,34 +112,6 @@ def html_local_img(__file, layout="left", max_width=None, max_height=None, md=Tr return a -def to_markdown_tabs(head: list, tabs: list, alignment=":---:", column=False): - """ - Args: - head: 表头:[] - tabs: 表值:[[列1], [列2], [列3], [列4]] - alignment: :--- 左对齐, :---: 居中对齐, ---: 右对齐 - column: True to keep data in columns, False to keep data in rows (default). - Returns: - A string representation of the markdown table. - """ - if column: - transposed_tabs = list(map(list, zip(*tabs))) - else: - transposed_tabs = tabs - # Find the maximum length among the columns - max_len = max(len(column) for column in transposed_tabs) - - tab_format = "| %s " - tabs_list = "".join([tab_format % i for i in head]) + "|\n" - tabs_list += "".join([tab_format % alignment for i in head]) + "|\n" - - for i in range(max_len): - row_data = [tab[i] if i < len(tab) else "" for tab in transposed_tabs] - row_data = file_manifest_filter_html(row_data, filter_=None) - tabs_list += "".join([tab_format % i for i in row_data]) + "|\n" - - return tabs_list - class GoogleChatInit: def __init__(self): diff --git a/shared_utils/handle_upload.py b/shared_utils/handle_upload.py new file mode 100644 index 0000000..ec0bcd9 --- /dev/null +++ b/shared_utils/handle_upload.py @@ -0,0 +1,137 @@ +import importlib +import time +import inspect +import re +import os +import base64 +import gradio +import shutil +import glob +from shared_utils.config_loader import get_conf + +def html_local_file(file): + base_path = os.path.dirname(__file__) # 项目目录 + if os.path.exists(str(file)): + file = f'file={file.replace(base_path, ".")}' + return file + + +def html_local_img(__file, layout="left", max_width=None, max_height=None, md=True): + style = "" + if max_width is not None: + style += f"max-width: {max_width};" + if max_height is not None: + style += f"max-height: {max_height};" + __file = html_local_file(__file) + a = f'
' + if md: + a = f"![{__file}]({__file})" + return a + + +def file_manifest_filter_type(file_list, filter_: list = None): + new_list = [] + if not filter_: + filter_ = ["png", "jpg", "jpeg"] + for file in file_list: + if str(os.path.basename(file)).split(".")[-1] in filter_: + new_list.append(html_local_img(file, md=False)) + else: + new_list.append(file) + return new_list + + +def zip_extract_member_new(self, member, targetpath, pwd): + # 修复中文乱码的问题 + """Extract the ZipInfo object 'member' to a physical + file on the path targetpath. + """ + import zipfile + if not isinstance(member, zipfile.ZipInfo): + member = self.getinfo(member) + + # build the destination pathname, replacing + # forward slashes to platform specific separators. + arcname = member.filename.replace('/', os.path.sep) + arcname = arcname.encode('cp437', errors='replace').decode('gbk', errors='replace') + + if os.path.altsep: + arcname = arcname.replace(os.path.altsep, os.path.sep) + # interpret absolute pathname as relative, remove drive letter or + # UNC path, redundant separators, "." and ".." components. + arcname = os.path.splitdrive(arcname)[1] + invalid_path_parts = ('', os.path.curdir, os.path.pardir) + arcname = os.path.sep.join(x for x in arcname.split(os.path.sep) + if x not in invalid_path_parts) + if os.path.sep == '\\': + # filter illegal characters on Windows + arcname = self._sanitize_windows_name(arcname, os.path.sep) + + targetpath = os.path.join(targetpath, arcname) + targetpath = os.path.normpath(targetpath) + + # Create all upper directories if necessary. + upperdirs = os.path.dirname(targetpath) + if upperdirs and not os.path.exists(upperdirs): + os.makedirs(upperdirs) + + if member.is_dir(): + if not os.path.isdir(targetpath): + os.mkdir(targetpath) + return targetpath + + with self.open(member, pwd=pwd) as source, \ + open(targetpath, "wb") as target: + shutil.copyfileobj(source, target) + + return targetpath + + +def extract_archive(file_path, dest_dir): + import zipfile + import tarfile + import os + + # Get the file extension of the input file + file_extension = os.path.splitext(file_path)[1] + + # Extract the archive based on its extension + if file_extension == ".zip": + with zipfile.ZipFile(file_path, "r") as zipobj: + zipobj._extract_member = lambda a,b,c: zip_extract_member_new(zipobj, a,b,c) # 修复中文乱码的问题 + zipobj.extractall(path=dest_dir) + print("Successfully extracted zip archive to {}".format(dest_dir)) + + elif file_extension in [".tar", ".gz", ".bz2"]: + with tarfile.open(file_path, "r:*") as tarobj: + tarobj.extractall(path=dest_dir) + print("Successfully extracted tar archive to {}".format(dest_dir)) + + # 第三方库,需要预先pip install rarfile + # 此外,Windows上还需要安装winrar软件,配置其Path环境变量,如"C:\Program Files\WinRAR"才可以 + elif file_extension == ".rar": + try: + import rarfile + + with rarfile.RarFile(file_path) as rf: + rf.extractall(path=dest_dir) + print("Successfully extracted rar archive to {}".format(dest_dir)) + except: + print("Rar format requires additional dependencies to install") + return "\n\n解压失败! 需要安装pip install rarfile来解压rar文件。建议:使用zip压缩格式。" + + # 第三方库,需要预先pip install py7zr + elif file_extension == ".7z": + try: + import py7zr + + with py7zr.SevenZipFile(file_path, mode="r") as f: + f.extractall(path=dest_dir) + print("Successfully extracted 7z archive to {}".format(dest_dir)) + except: + print("7z format requires additional dependencies to install") + return "\n\n解压失败! 需要安装pip install py7zr来解压7z文件" + else: + return "" + return "" + diff --git a/toolbox.py b/toolbox.py index 39f7051..77ceaec 100644 --- a/toolbox.py +++ b/toolbox.py @@ -25,6 +25,10 @@ from shared_utils.text_mask import apply_gpt_academic_string_mask from shared_utils.text_mask import build_gpt_academic_masked_string from shared_utils.text_mask import apply_gpt_academic_string_mask_langbased from shared_utils.text_mask import build_gpt_academic_masked_string_langbased +from shared_utils.handle_upload import html_local_file +from shared_utils.handle_upload import html_local_img +from shared_utils.handle_upload import file_manifest_filter_type +from shared_utils.handle_upload import extract_archive pj = os.path.join default_user_name = "default_user" @@ -329,54 +333,6 @@ def find_free_port(): return s.getsockname()[1] -def extract_archive(file_path, dest_dir): - import zipfile - import tarfile - import os - - # Get the file extension of the input file - file_extension = os.path.splitext(file_path)[1] - - # Extract the archive based on its extension - if file_extension == ".zip": - with zipfile.ZipFile(file_path, "r") as zipobj: - zipobj.extractall(path=dest_dir) - print("Successfully extracted zip archive to {}".format(dest_dir)) - - elif file_extension in [".tar", ".gz", ".bz2"]: - with tarfile.open(file_path, "r:*") as tarobj: - tarobj.extractall(path=dest_dir) - print("Successfully extracted tar archive to {}".format(dest_dir)) - - # 第三方库,需要预先pip install rarfile - # 此外,Windows上还需要安装winrar软件,配置其Path环境变量,如"C:\Program Files\WinRAR"才可以 - elif file_extension == ".rar": - try: - import rarfile - - with rarfile.RarFile(file_path) as rf: - rf.extractall(path=dest_dir) - print("Successfully extracted rar archive to {}".format(dest_dir)) - except: - print("Rar format requires additional dependencies to install") - return "\n\n解压失败! 需要安装pip install rarfile来解压rar文件。建议:使用zip压缩格式。" - - # 第三方库,需要预先pip install py7zr - elif file_extension == ".7z": - try: - import py7zr - - with py7zr.SevenZipFile(file_path, mode="r") as f: - f.extractall(path=dest_dir) - print("Successfully extracted 7z archive to {}".format(dest_dir)) - except: - print("7z format requires additional dependencies to install") - return "\n\n解压失败! 需要安装pip install py7zr来解压7z文件" - else: - return "" - return "" - - def find_recent_files(directory): """ me: find files that is created with in one minutes under a directory with python, write a function @@ -474,39 +430,8 @@ def del_outdated_uploads(outdate_time_seconds, target_path_base=None): return -def html_local_file(file): - base_path = os.path.dirname(__file__) # 项目目录 - if os.path.exists(str(file)): - file = f'file={file.replace(base_path, ".")}' - return file - -def html_local_img(__file, layout="left", max_width=None, max_height=None, md=True): - style = "" - if max_width is not None: - style += f"max-width: {max_width};" - if max_height is not None: - style += f"max-height: {max_height};" - __file = html_local_file(__file) - a = f'
' - if md: - a = f"![{__file}]({__file})" - return a - - -def file_manifest_filter_type(file_list, filter_: list = None): - new_list = [] - if not filter_: - filter_ = ["png", "jpg", "jpeg"] - for file in file_list: - if str(os.path.basename(file)).split(".")[-1] in filter_: - new_list.append(html_local_img(file, md=False)) - else: - new_list.append(file) - return new_list - - -def to_markdown_tabs(head: list, tabs: list, alignment=":---:", column=False): +def to_markdown_tabs(head: list, tabs: list, alignment=":---:", column=False, omit_path=None): """ Args: head: 表头:[] @@ -530,6 +455,9 @@ def to_markdown_tabs(head: list, tabs: list, alignment=":---:", column=False): for i in range(max_len): row_data = [tab[i] if i < len(tab) else "" for tab in transposed_tabs] row_data = file_manifest_filter_type(row_data, filter_=None) + # for dat in row_data: + # if (omit_path is not None) and os.path.exists(dat): + # dat = os.path.relpath(dat, omit_path) tabs_list += "".join([tab_format % i for i in row_data]) + "|\n" return tabs_list @@ -565,15 +493,21 @@ def on_file_uploaded( ) # 整理文件集合 输出消息 - moved_files = [fp for fp in glob.glob(f"{target_path_base}/**/*", recursive=True)] - moved_files_str = to_markdown_tabs(head=["文件"], tabs=[moved_files]) + files = glob.glob(f"{target_path_base}/**/*", recursive=True) + moved_files = [fp for fp in files] + max_file_to_show = 10 + if len(moved_files) > max_file_to_show: + moved_files = moved_files[:max_file_to_show//2] + [f'... ( 📌省略{len(moved_files) - max_file_to_show}个文件的显示 ) ...'] + \ + moved_files[-max_file_to_show//2:] + moved_files_str = to_markdown_tabs(head=["文件"], tabs=[moved_files], omit_path=target_path_base) chatbot.append( [ "我上传了文件,请查收", - f"[Local Message] 收到以下文件: \n\n{moved_files_str}" - + f"\n\n调用路径参数已自动修正到: \n\n{txt}" - + f"\n\n现在您点击任意函数插件时,以上文件将被作为输入参数" - + upload_msg, + f"[Local Message] 收到以下文件 (上传到路径:{target_path_base}): " + + f"\n\n{moved_files_str}" + + f"\n\n调用路径参数已自动修正到: \n\n{txt}" + + f"\n\n现在您点击任意函数插件时,以上文件将被作为输入参数" + + upload_msg, ] )