fix zip chinese file name error
This commit is contained in:
parent
b72289bfb0
commit
8bb3d84912
@ -7,7 +7,7 @@ import os
|
||||
import re
|
||||
import requests
|
||||
from typing import List, Dict, Tuple
|
||||
from toolbox import get_conf, encode_image, get_pictures_list
|
||||
from toolbox import get_conf, encode_image, get_pictures_list, to_markdown_tabs
|
||||
|
||||
proxies, TIMEOUT_SECONDS = get_conf("proxies", "TIMEOUT_SECONDS")
|
||||
|
||||
@ -112,34 +112,6 @@ def html_local_img(__file, layout="left", max_width=None, max_height=None, md=Tr
|
||||
return a
|
||||
|
||||
|
||||
def to_markdown_tabs(head: list, tabs: list, alignment=":---:", column=False):
|
||||
"""
|
||||
Args:
|
||||
head: 表头:[]
|
||||
tabs: 表值:[[列1], [列2], [列3], [列4]]
|
||||
alignment: :--- 左对齐, :---: 居中对齐, ---: 右对齐
|
||||
column: True to keep data in columns, False to keep data in rows (default).
|
||||
Returns:
|
||||
A string representation of the markdown table.
|
||||
"""
|
||||
if column:
|
||||
transposed_tabs = list(map(list, zip(*tabs)))
|
||||
else:
|
||||
transposed_tabs = tabs
|
||||
# Find the maximum length among the columns
|
||||
max_len = max(len(column) for column in transposed_tabs)
|
||||
|
||||
tab_format = "| %s "
|
||||
tabs_list = "".join([tab_format % i for i in head]) + "|\n"
|
||||
tabs_list += "".join([tab_format % alignment for i in head]) + "|\n"
|
||||
|
||||
for i in range(max_len):
|
||||
row_data = [tab[i] if i < len(tab) else "" for tab in transposed_tabs]
|
||||
row_data = file_manifest_filter_html(row_data, filter_=None)
|
||||
tabs_list += "".join([tab_format % i for i in row_data]) + "|\n"
|
||||
|
||||
return tabs_list
|
||||
|
||||
|
||||
class GoogleChatInit:
|
||||
def __init__(self):
|
||||
|
137
shared_utils/handle_upload.py
Normal file
137
shared_utils/handle_upload.py
Normal file
@ -0,0 +1,137 @@
|
||||
import importlib
|
||||
import time
|
||||
import inspect
|
||||
import re
|
||||
import os
|
||||
import base64
|
||||
import gradio
|
||||
import shutil
|
||||
import glob
|
||||
from shared_utils.config_loader import get_conf
|
||||
|
||||
def html_local_file(file):
|
||||
base_path = os.path.dirname(__file__) # 项目目录
|
||||
if os.path.exists(str(file)):
|
||||
file = f'file={file.replace(base_path, ".")}'
|
||||
return file
|
||||
|
||||
|
||||
def html_local_img(__file, layout="left", max_width=None, max_height=None, md=True):
|
||||
style = ""
|
||||
if max_width is not None:
|
||||
style += f"max-width: {max_width};"
|
||||
if max_height is not None:
|
||||
style += f"max-height: {max_height};"
|
||||
__file = html_local_file(__file)
|
||||
a = f'<div align="{layout}"><img src="{__file}" style="{style}"></div>'
|
||||
if md:
|
||||
a = f""
|
||||
return a
|
||||
|
||||
|
||||
def file_manifest_filter_type(file_list, filter_: list = None):
|
||||
new_list = []
|
||||
if not filter_:
|
||||
filter_ = ["png", "jpg", "jpeg"]
|
||||
for file in file_list:
|
||||
if str(os.path.basename(file)).split(".")[-1] in filter_:
|
||||
new_list.append(html_local_img(file, md=False))
|
||||
else:
|
||||
new_list.append(file)
|
||||
return new_list
|
||||
|
||||
|
||||
def zip_extract_member_new(self, member, targetpath, pwd):
|
||||
# 修复中文乱码的问题
|
||||
"""Extract the ZipInfo object 'member' to a physical
|
||||
file on the path targetpath.
|
||||
"""
|
||||
import zipfile
|
||||
if not isinstance(member, zipfile.ZipInfo):
|
||||
member = self.getinfo(member)
|
||||
|
||||
# build the destination pathname, replacing
|
||||
# forward slashes to platform specific separators.
|
||||
arcname = member.filename.replace('/', os.path.sep)
|
||||
arcname = arcname.encode('cp437', errors='replace').decode('gbk', errors='replace')
|
||||
|
||||
if os.path.altsep:
|
||||
arcname = arcname.replace(os.path.altsep, os.path.sep)
|
||||
# interpret absolute pathname as relative, remove drive letter or
|
||||
# UNC path, redundant separators, "." and ".." components.
|
||||
arcname = os.path.splitdrive(arcname)[1]
|
||||
invalid_path_parts = ('', os.path.curdir, os.path.pardir)
|
||||
arcname = os.path.sep.join(x for x in arcname.split(os.path.sep)
|
||||
if x not in invalid_path_parts)
|
||||
if os.path.sep == '\\':
|
||||
# filter illegal characters on Windows
|
||||
arcname = self._sanitize_windows_name(arcname, os.path.sep)
|
||||
|
||||
targetpath = os.path.join(targetpath, arcname)
|
||||
targetpath = os.path.normpath(targetpath)
|
||||
|
||||
# Create all upper directories if necessary.
|
||||
upperdirs = os.path.dirname(targetpath)
|
||||
if upperdirs and not os.path.exists(upperdirs):
|
||||
os.makedirs(upperdirs)
|
||||
|
||||
if member.is_dir():
|
||||
if not os.path.isdir(targetpath):
|
||||
os.mkdir(targetpath)
|
||||
return targetpath
|
||||
|
||||
with self.open(member, pwd=pwd) as source, \
|
||||
open(targetpath, "wb") as target:
|
||||
shutil.copyfileobj(source, target)
|
||||
|
||||
return targetpath
|
||||
|
||||
|
||||
def extract_archive(file_path, dest_dir):
|
||||
import zipfile
|
||||
import tarfile
|
||||
import os
|
||||
|
||||
# Get the file extension of the input file
|
||||
file_extension = os.path.splitext(file_path)[1]
|
||||
|
||||
# Extract the archive based on its extension
|
||||
if file_extension == ".zip":
|
||||
with zipfile.ZipFile(file_path, "r") as zipobj:
|
||||
zipobj._extract_member = lambda a,b,c: zip_extract_member_new(zipobj, a,b,c) # 修复中文乱码的问题
|
||||
zipobj.extractall(path=dest_dir)
|
||||
print("Successfully extracted zip archive to {}".format(dest_dir))
|
||||
|
||||
elif file_extension in [".tar", ".gz", ".bz2"]:
|
||||
with tarfile.open(file_path, "r:*") as tarobj:
|
||||
tarobj.extractall(path=dest_dir)
|
||||
print("Successfully extracted tar archive to {}".format(dest_dir))
|
||||
|
||||
# 第三方库,需要预先pip install rarfile
|
||||
# 此外,Windows上还需要安装winrar软件,配置其Path环境变量,如"C:\Program Files\WinRAR"才可以
|
||||
elif file_extension == ".rar":
|
||||
try:
|
||||
import rarfile
|
||||
|
||||
with rarfile.RarFile(file_path) as rf:
|
||||
rf.extractall(path=dest_dir)
|
||||
print("Successfully extracted rar archive to {}".format(dest_dir))
|
||||
except:
|
||||
print("Rar format requires additional dependencies to install")
|
||||
return "\n\n解压失败! 需要安装pip install rarfile来解压rar文件。建议:使用zip压缩格式。"
|
||||
|
||||
# 第三方库,需要预先pip install py7zr
|
||||
elif file_extension == ".7z":
|
||||
try:
|
||||
import py7zr
|
||||
|
||||
with py7zr.SevenZipFile(file_path, mode="r") as f:
|
||||
f.extractall(path=dest_dir)
|
||||
print("Successfully extracted 7z archive to {}".format(dest_dir))
|
||||
except:
|
||||
print("7z format requires additional dependencies to install")
|
||||
return "\n\n解压失败! 需要安装pip install py7zr来解压7z文件"
|
||||
else:
|
||||
return ""
|
||||
return ""
|
||||
|
106
toolbox.py
106
toolbox.py
@ -25,6 +25,10 @@ from shared_utils.text_mask import apply_gpt_academic_string_mask
|
||||
from shared_utils.text_mask import build_gpt_academic_masked_string
|
||||
from shared_utils.text_mask import apply_gpt_academic_string_mask_langbased
|
||||
from shared_utils.text_mask import build_gpt_academic_masked_string_langbased
|
||||
from shared_utils.handle_upload import html_local_file
|
||||
from shared_utils.handle_upload import html_local_img
|
||||
from shared_utils.handle_upload import file_manifest_filter_type
|
||||
from shared_utils.handle_upload import extract_archive
|
||||
|
||||
pj = os.path.join
|
||||
default_user_name = "default_user"
|
||||
@ -329,54 +333,6 @@ def find_free_port():
|
||||
return s.getsockname()[1]
|
||||
|
||||
|
||||
def extract_archive(file_path, dest_dir):
|
||||
import zipfile
|
||||
import tarfile
|
||||
import os
|
||||
|
||||
# Get the file extension of the input file
|
||||
file_extension = os.path.splitext(file_path)[1]
|
||||
|
||||
# Extract the archive based on its extension
|
||||
if file_extension == ".zip":
|
||||
with zipfile.ZipFile(file_path, "r") as zipobj:
|
||||
zipobj.extractall(path=dest_dir)
|
||||
print("Successfully extracted zip archive to {}".format(dest_dir))
|
||||
|
||||
elif file_extension in [".tar", ".gz", ".bz2"]:
|
||||
with tarfile.open(file_path, "r:*") as tarobj:
|
||||
tarobj.extractall(path=dest_dir)
|
||||
print("Successfully extracted tar archive to {}".format(dest_dir))
|
||||
|
||||
# 第三方库,需要预先pip install rarfile
|
||||
# 此外,Windows上还需要安装winrar软件,配置其Path环境变量,如"C:\Program Files\WinRAR"才可以
|
||||
elif file_extension == ".rar":
|
||||
try:
|
||||
import rarfile
|
||||
|
||||
with rarfile.RarFile(file_path) as rf:
|
||||
rf.extractall(path=dest_dir)
|
||||
print("Successfully extracted rar archive to {}".format(dest_dir))
|
||||
except:
|
||||
print("Rar format requires additional dependencies to install")
|
||||
return "\n\n解压失败! 需要安装pip install rarfile来解压rar文件。建议:使用zip压缩格式。"
|
||||
|
||||
# 第三方库,需要预先pip install py7zr
|
||||
elif file_extension == ".7z":
|
||||
try:
|
||||
import py7zr
|
||||
|
||||
with py7zr.SevenZipFile(file_path, mode="r") as f:
|
||||
f.extractall(path=dest_dir)
|
||||
print("Successfully extracted 7z archive to {}".format(dest_dir))
|
||||
except:
|
||||
print("7z format requires additional dependencies to install")
|
||||
return "\n\n解压失败! 需要安装pip install py7zr来解压7z文件"
|
||||
else:
|
||||
return ""
|
||||
return ""
|
||||
|
||||
|
||||
def find_recent_files(directory):
|
||||
"""
|
||||
me: find files that is created with in one minutes under a directory with python, write a function
|
||||
@ -474,39 +430,8 @@ def del_outdated_uploads(outdate_time_seconds, target_path_base=None):
|
||||
return
|
||||
|
||||
|
||||
def html_local_file(file):
|
||||
base_path = os.path.dirname(__file__) # 项目目录
|
||||
if os.path.exists(str(file)):
|
||||
file = f'file={file.replace(base_path, ".")}'
|
||||
return file
|
||||
|
||||
|
||||
def html_local_img(__file, layout="left", max_width=None, max_height=None, md=True):
|
||||
style = ""
|
||||
if max_width is not None:
|
||||
style += f"max-width: {max_width};"
|
||||
if max_height is not None:
|
||||
style += f"max-height: {max_height};"
|
||||
__file = html_local_file(__file)
|
||||
a = f'<div align="{layout}"><img src="{__file}" style="{style}"></div>'
|
||||
if md:
|
||||
a = f""
|
||||
return a
|
||||
|
||||
|
||||
def file_manifest_filter_type(file_list, filter_: list = None):
|
||||
new_list = []
|
||||
if not filter_:
|
||||
filter_ = ["png", "jpg", "jpeg"]
|
||||
for file in file_list:
|
||||
if str(os.path.basename(file)).split(".")[-1] in filter_:
|
||||
new_list.append(html_local_img(file, md=False))
|
||||
else:
|
||||
new_list.append(file)
|
||||
return new_list
|
||||
|
||||
|
||||
def to_markdown_tabs(head: list, tabs: list, alignment=":---:", column=False):
|
||||
def to_markdown_tabs(head: list, tabs: list, alignment=":---:", column=False, omit_path=None):
|
||||
"""
|
||||
Args:
|
||||
head: 表头:[]
|
||||
@ -530,6 +455,9 @@ def to_markdown_tabs(head: list, tabs: list, alignment=":---:", column=False):
|
||||
for i in range(max_len):
|
||||
row_data = [tab[i] if i < len(tab) else "" for tab in transposed_tabs]
|
||||
row_data = file_manifest_filter_type(row_data, filter_=None)
|
||||
# for dat in row_data:
|
||||
# if (omit_path is not None) and os.path.exists(dat):
|
||||
# dat = os.path.relpath(dat, omit_path)
|
||||
tabs_list += "".join([tab_format % i for i in row_data]) + "|\n"
|
||||
|
||||
return tabs_list
|
||||
@ -565,15 +493,21 @@ def on_file_uploaded(
|
||||
)
|
||||
|
||||
# 整理文件集合 输出消息
|
||||
moved_files = [fp for fp in glob.glob(f"{target_path_base}/**/*", recursive=True)]
|
||||
moved_files_str = to_markdown_tabs(head=["文件"], tabs=[moved_files])
|
||||
files = glob.glob(f"{target_path_base}/**/*", recursive=True)
|
||||
moved_files = [fp for fp in files]
|
||||
max_file_to_show = 10
|
||||
if len(moved_files) > max_file_to_show:
|
||||
moved_files = moved_files[:max_file_to_show//2] + [f'... ( 📌省略{len(moved_files) - max_file_to_show}个文件的显示 ) ...'] + \
|
||||
moved_files[-max_file_to_show//2:]
|
||||
moved_files_str = to_markdown_tabs(head=["文件"], tabs=[moved_files], omit_path=target_path_base)
|
||||
chatbot.append(
|
||||
[
|
||||
"我上传了文件,请查收",
|
||||
f"[Local Message] 收到以下文件: \n\n{moved_files_str}"
|
||||
+ f"\n\n调用路径参数已自动修正到: \n\n{txt}"
|
||||
+ f"\n\n现在您点击任意函数插件时,以上文件将被作为输入参数"
|
||||
+ upload_msg,
|
||||
f"[Local Message] 收到以下文件 (上传到路径:{target_path_base}): " +
|
||||
f"\n\n{moved_files_str}" +
|
||||
f"\n\n调用路径参数已自动修正到: \n\n{txt}" +
|
||||
f"\n\n现在您点击任意函数插件时,以上文件将被作为输入参数" +
|
||||
upload_msg,
|
||||
]
|
||||
)
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user