file IO
This commit is contained in:
		
							parent
							
								
									0f20ffeff4
								
							
						
					
					
						commit
						bc6d0926b1
					
				
							
								
								
									
										1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							@ -136,3 +136,4 @@ ssr_conf
 | 
			
		||||
config_private.py
 | 
			
		||||
gpt_log
 | 
			
		||||
private.md
 | 
			
		||||
private_upload
 | 
			
		||||
@ -127,3 +127,23 @@ def 解析一个C项目的头文件(txt, top_p, temperature, chatbot, history, s
 | 
			
		||||
        return
 | 
			
		||||
    yield from 解析源代码(file_manifest, project_folder, top_p, temperature, chatbot, history, systemPromptTxt)
 | 
			
		||||
 | 
			
		||||
@CatchException
 | 
			
		||||
def 解析一个C项目(txt, top_p, temperature, chatbot, history, systemPromptTxt, WEB_PORT):
 | 
			
		||||
    history = []    # 清空历史,以免输入溢出
 | 
			
		||||
    import glob, os
 | 
			
		||||
    if os.path.exists(txt):
 | 
			
		||||
        project_folder = txt
 | 
			
		||||
    else:
 | 
			
		||||
        if txt == "": txt = '空空如也的输入栏'
 | 
			
		||||
        report_execption(chatbot, history, a = f"解析项目: {txt}", b = f"找不到本地项目或无权访问: {txt}")
 | 
			
		||||
        yield chatbot, history, '正常'
 | 
			
		||||
        return
 | 
			
		||||
    file_manifest = [f for f in glob.glob(f'{project_folder}/**/*.h', recursive=True)]  + \
 | 
			
		||||
                    [f for f in glob.glob(f'{project_folder}/**/*.cpp', recursive=True)] + \
 | 
			
		||||
                    [f for f in glob.glob(f'{project_folder}/**/*.c', recursive=True)]
 | 
			
		||||
    if len(file_manifest) == 0:
 | 
			
		||||
        report_execption(chatbot, history, a = f"解析项目: {txt}", b = f"找不到任何.h头文件: {txt}")
 | 
			
		||||
        yield chatbot, history, '正常'
 | 
			
		||||
        return
 | 
			
		||||
    yield from 解析源代码(file_manifest, project_folder, top_p, temperature, chatbot, history, systemPromptTxt)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -5,25 +5,30 @@ def get_crazy_functionals():
 | 
			
		||||
    from crazy_functions.解析项目源代码 import 解析项目本身
 | 
			
		||||
    from crazy_functions.解析项目源代码 import 解析一个Python项目
 | 
			
		||||
    from crazy_functions.解析项目源代码 import 解析一个C项目的头文件
 | 
			
		||||
    from crazy_functions.解析项目源代码 import 解析一个C项目
 | 
			
		||||
    from crazy_functions.高级功能函数模板 import 高阶功能模板函数
 | 
			
		||||
 | 
			
		||||
    return {
 | 
			
		||||
        "[实验] 请解析并解构此项目本身": {
 | 
			
		||||
            "Function": 解析项目本身
 | 
			
		||||
        },
 | 
			
		||||
        "[实验] 解析整个py项目(input输入项目根路径)": {
 | 
			
		||||
        "[实验] 解析整个py项目(配合input输入框)": {
 | 
			
		||||
            "Color": "stop",    # 按钮颜色
 | 
			
		||||
            "Function": 解析一个Python项目
 | 
			
		||||
        },
 | 
			
		||||
        "[实验] 解析整个C++项目(input输入项目根路径)": {
 | 
			
		||||
        "[实验] 解析整个C++项目头文件(配合input输入框)": {
 | 
			
		||||
            "Color": "stop",    # 按钮颜色
 | 
			
		||||
            "Function": 解析一个C项目的头文件
 | 
			
		||||
        },
 | 
			
		||||
        "[实验] 读tex论文写摘要(input输入项目根路径)": {
 | 
			
		||||
        "[实验] 解析整个C++项目(配合input输入框)": {
 | 
			
		||||
            "Color": "stop",    # 按钮颜色
 | 
			
		||||
            "Function": 解析一个C项目
 | 
			
		||||
        },
 | 
			
		||||
        "[实验] 读tex论文写摘要(配合input输入框)": {
 | 
			
		||||
            "Color": "stop",    # 按钮颜色
 | 
			
		||||
            "Function": 读文章写摘要
 | 
			
		||||
        },
 | 
			
		||||
        "[实验] 批量生成函数注释(input输入项目根路径)": {
 | 
			
		||||
        "[实验] 批量生成函数注释(配合input输入框)": {
 | 
			
		||||
            "Color": "stop",    # 按钮颜色
 | 
			
		||||
            "Function": 批量生成函数注释
 | 
			
		||||
        },
 | 
			
		||||
@ -33,8 +38,29 @@ def get_crazy_functionals():
 | 
			
		||||
        },
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
def on_file_uploaded(file):
 | 
			
		||||
    with open(file[0].name,'r') as f: 
 | 
			
		||||
        print(f.read())
 | 
			
		||||
    print('uploaded')
 | 
			
		||||
    pass
 | 
			
		||||
def on_file_uploaded(files, chatbot, txt):
 | 
			
		||||
    import shutil, os, time, glob
 | 
			
		||||
    from toolbox import extract_archive
 | 
			
		||||
    try: shutil.rmtree('./private_upload/')
 | 
			
		||||
    except: pass
 | 
			
		||||
    time_tag = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime())
 | 
			
		||||
    os.makedirs(f'private_upload/{time_tag}', exist_ok=True)
 | 
			
		||||
    for file in files:
 | 
			
		||||
        file_origin_name = os.path.basename(file.orig_name)
 | 
			
		||||
        shutil.copy(file.name, f'private_upload/{time_tag}/{file_origin_name}')
 | 
			
		||||
        extract_archive(f'private_upload/{time_tag}/{file_origin_name}', 
 | 
			
		||||
                        dest_dir=f'private_upload/{time_tag}/{file_origin_name}.extract')
 | 
			
		||||
    moved_files = [fp for fp in glob.glob('private_upload/**/*', recursive=True)]
 | 
			
		||||
    txt = f'private_upload/{time_tag}'
 | 
			
		||||
    moved_files_str = '\t\n\n'.join(moved_files)
 | 
			
		||||
    chatbot.append(['我上传了文件,请查收', 
 | 
			
		||||
                    f'[Local Message] 收到以下文件: \n\n{moved_files_str}\n\n调用路径参数已自动修正到: \n\n{txt}\n\n现在您可以直接选择任意实现性功能'])
 | 
			
		||||
    return chatbot, txt
 | 
			
		||||
 | 
			
		||||
def on_report_generated(files, chatbot):
 | 
			
		||||
    from toolbox import find_recent_files
 | 
			
		||||
    report_files = find_recent_files('gpt_log')
 | 
			
		||||
    # files.extend(report_files)
 | 
			
		||||
    chatbot.append(['汇总报告如何远程获取?', '汇总报告已经添加到右侧文件上传区,请查收。'])
 | 
			
		||||
    return report_files, chatbot
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										18
									
								
								main.py
									
									
									
									
									
								
							
							
						
						
									
										18
									
								
								main.py
									
									
									
									
									
								
							@ -25,7 +25,7 @@ from functional import get_functionals
 | 
			
		||||
functional = get_functionals()
 | 
			
		||||
 | 
			
		||||
# 对一些丧心病狂的实验性功能模块进行测试
 | 
			
		||||
from functional_crazy import get_crazy_functionals, on_file_uploaded
 | 
			
		||||
from functional_crazy import get_crazy_functionals, on_file_uploaded, on_report_generated
 | 
			
		||||
crazy_functional = get_crazy_functionals()
 | 
			
		||||
 | 
			
		||||
# 处理markdown文本格式的转变
 | 
			
		||||
@ -59,15 +59,15 @@ with gr.Blocks(theme=set_theme, analytics_enabled=False) as demo:
 | 
			
		||||
                    variant = functional[k]["Color"] if "Color" in functional[k] else "secondary"
 | 
			
		||||
                    functional[k]["Button"] = gr.Button(k, variant=variant)
 | 
			
		||||
            with gr.Row():
 | 
			
		||||
                gr.Markdown("Input Directory Functions.")
 | 
			
		||||
                gr.Markdown("以下部分实验性功能需读取路径.")
 | 
			
		||||
            with gr.Row():
 | 
			
		||||
                for k in crazy_functional:
 | 
			
		||||
                    variant = crazy_functional[k]["Color"] if "Color" in crazy_functional[k] else "secondary"
 | 
			
		||||
                    crazy_functional[k]["Button"] = gr.Button(k, variant=variant)
 | 
			
		||||
            with gr.Row():
 | 
			
		||||
                gr.Markdown("Upload Files Functions.")
 | 
			
		||||
                gr.Markdown("上传本地文件,调用实验函数.")
 | 
			
		||||
            with gr.Row():
 | 
			
		||||
                file_upload = gr.Files(file_count="multiple")
 | 
			
		||||
                file_upload = gr.Files(label='任何文件,但推荐上传压缩文件(zip, tar)', file_count="multiple")
 | 
			
		||||
 | 
			
		||||
            from check_proxy import check_proxy
 | 
			
		||||
            statusDisplay = gr.Markdown(f"{check_proxy(proxies)}")
 | 
			
		||||
@ -82,10 +82,14 @@ with gr.Blocks(theme=set_theme, analytics_enabled=False) as demo:
 | 
			
		||||
    for k in functional:
 | 
			
		||||
        functional[k]["Button"].click(predict, 
 | 
			
		||||
            [txt, top_p, temperature, chatbot, history, systemPromptTxt, TRUE, gr.State(k)], [chatbot, history, statusDisplay], show_progress=True)
 | 
			
		||||
    file_upload.upload(on_file_uploaded, [file_upload, chatbot, txt], [chatbot, txt])
 | 
			
		||||
    for k in crazy_functional:
 | 
			
		||||
        crazy_functional[k]["Button"].click(crazy_functional[k]["Function"], 
 | 
			
		||||
            [txt, top_p, temperature, chatbot, history, systemPromptTxt, gr.State(PORT)], [chatbot, history, statusDisplay])
 | 
			
		||||
    file_upload.upload(on_file_uploaded, [file_upload])
 | 
			
		||||
        click_handle = crazy_functional[k]["Button"].click(crazy_functional[k]["Function"], 
 | 
			
		||||
            [txt, top_p, temperature, chatbot, history, systemPromptTxt, gr.State(PORT)], [chatbot, history, statusDisplay]
 | 
			
		||||
        )
 | 
			
		||||
        try: click_handle.then(on_report_generated, [file_upload, chatbot], [file_upload, chatbot])
 | 
			
		||||
        except: pass
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# 延迟函数,做一些准备工作,最后尝试打开浏览器
 | 
			
		||||
def auto_opentab_delay():
 | 
			
		||||
 | 
			
		||||
@ -1,3 +1,3 @@
 | 
			
		||||
gradio
 | 
			
		||||
gradio>=3.23
 | 
			
		||||
requests[socks]
 | 
			
		||||
mdtex2html
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										40
									
								
								toolbox.py
									
									
									
									
									
								
							
							
						
						
									
										40
									
								
								toolbox.py
									
									
									
									
									
								
							@ -143,3 +143,43 @@ def find_free_port():
 | 
			
		||||
        s.bind(('', 0))
 | 
			
		||||
        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 | 
			
		||||
        return s.getsockname()[1]
 | 
			
		||||
    
 | 
			
		||||
 | 
			
		||||
def extract_archive(file_path, dest_dir):
 | 
			
		||||
    import zipfile
 | 
			
		||||
    import tarfile
 | 
			
		||||
    import os
 | 
			
		||||
    # Get the file extension of the input file
 | 
			
		||||
    file_extension = os.path.splitext(file_path)[1]
 | 
			
		||||
 | 
			
		||||
    # Extract the archive based on its extension
 | 
			
		||||
    if file_extension == '.zip':
 | 
			
		||||
        with zipfile.ZipFile(file_path, 'r') as zipobj:
 | 
			
		||||
            zipobj.extractall(path=dest_dir)
 | 
			
		||||
            print("Successfully extracted zip archive to {}".format(dest_dir))
 | 
			
		||||
 | 
			
		||||
    elif file_extension in ['.tar', '.gz', '.bz2']:
 | 
			
		||||
        with tarfile.open(file_path, 'r:*') as tarobj:
 | 
			
		||||
            tarobj.extractall(path=dest_dir)
 | 
			
		||||
            print("Successfully extracted tar archive to {}".format(dest_dir))
 | 
			
		||||
    else:
 | 
			
		||||
        return
 | 
			
		||||
    
 | 
			
		||||
def find_recent_files(directory):
 | 
			
		||||
    """
 | 
			
		||||
        me: find files that is created with in one minutes under a directory with python, write a function
 | 
			
		||||
        gpt: here it is!
 | 
			
		||||
    """
 | 
			
		||||
    import os
 | 
			
		||||
    import time
 | 
			
		||||
    current_time = time.time()
 | 
			
		||||
    one_minute_ago = current_time - 60
 | 
			
		||||
    recent_files = []
 | 
			
		||||
 | 
			
		||||
    for filename in os.listdir(directory):
 | 
			
		||||
        file_path = os.path.join(directory, filename)
 | 
			
		||||
        created_time = os.path.getctime(file_path)
 | 
			
		||||
        if created_time >= one_minute_ago:
 | 
			
		||||
            recent_files.append(file_path)
 | 
			
		||||
 | 
			
		||||
    return recent_files
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user