diff --git a/config.py b/config.py
index 55510fe..65f8fd2 100644
--- a/config.py
+++ b/config.py
@@ -74,7 +74,7 @@ MAX_RETRY = 2
# 插件分类默认选项
-DEFAULT_FN_GROUPS = ['对话', '编程', '学术']
+DEFAULT_FN_GROUPS = ['对话', '编程', '学术', '智能体']
# 模型选择是 (注意: LLM_MODEL是默认选中的模型, 它*必须*被包含在AVAIL_LLM_MODELS列表中 )
diff --git a/crazy_functional.py b/crazy_functional.py
index 4df53f5..a9f8cef 100644
--- a/crazy_functional.py
+++ b/crazy_functional.py
@@ -38,7 +38,7 @@ def get_crazy_functions():
function_plugins = {
"虚空终端": {
- "Group": "对话|编程|学术",
+ "Group": "对话|编程|学术|智能体",
"Color": "stop",
"AsButton": True,
"Function": HotReload(虚空终端)
@@ -513,6 +513,18 @@ def get_crazy_functions():
except:
print('Load function plugin failed')
+ try:
+ from crazy_functions.函数动态生成 import 函数动态生成
+ function_plugins.update({
+ "动态代码解释器(CodeInterpreter)": {
+ "Group": "智能体",
+ "Color": "stop",
+ "AsButton": True,
+ "Function": HotReload(函数动态生成)
+ }
+ })
+ except:
+ print('Load function plugin failed')
# try:
# from crazy_functions.CodeInterpreter import 虚空终端CodeInterpreter
diff --git a/crazy_functions/crazy_utils.py b/crazy_functions/crazy_utils.py
index ee1ab90..4bdd1fd 100644
--- a/crazy_functions/crazy_utils.py
+++ b/crazy_functions/crazy_utils.py
@@ -807,3 +807,10 @@ class construct_html():
with open(os.path.join(get_log_folder(), file_name), 'w', encoding='utf8') as f:
f.write(self.html_string.encode('utf-8', 'ignore').decode())
return os.path.join(get_log_folder(), file_name)
+
+
+def get_plugin_arg(plugin_kwargs, key, default):
+ # 如果参数是空的
+ if (key in plugin_kwargs) and (plugin_kwargs[key] == ""): plugin_kwargs.pop(key)
+ # 正常情况
+ return plugin_kwargs.get(key, default)
diff --git a/crazy_functions/gen_fns/gen_fns_shared.py b/crazy_functions/gen_fns/gen_fns_shared.py
new file mode 100644
index 0000000..8e73794
--- /dev/null
+++ b/crazy_functions/gen_fns/gen_fns_shared.py
@@ -0,0 +1,70 @@
+import time
+import importlib
+from toolbox import trimmed_format_exc, gen_time_str, get_log_folder
+from toolbox import CatchException, update_ui, gen_time_str, trimmed_format_exc, is_the_upload_folder
+from toolbox import promote_file_to_downloadzone, get_log_folder, update_ui_lastest_msg
+import multiprocessing
+
+def get_class_name(class_string):
+ import re
+ # Use regex to extract the class name
+ class_name = re.search(r'class (\w+)\(', class_string).group(1)
+ return class_name
+
+def try_make_module(code, chatbot):
+ module_file = 'gpt_fn_' + gen_time_str().replace('-','_')
+ fn_path = f'{get_log_folder(plugin_name="gen_plugin_verify")}/{module_file}.py'
+ with open(fn_path, 'w', encoding='utf8') as f: f.write(code)
+ promote_file_to_downloadzone(fn_path, chatbot=chatbot)
+ class_name = get_class_name(code)
+ manager = multiprocessing.Manager()
+ return_dict = manager.dict()
+ p = multiprocessing.Process(target=is_function_successfully_generated, args=(fn_path, class_name, return_dict))
+ # only has 10 seconds to run
+ p.start(); p.join(timeout=10)
+ if p.is_alive(): p.terminate(); p.join()
+ p.close()
+ return return_dict["success"], return_dict['traceback']
+
+# check is_function_successfully_generated
+def is_function_successfully_generated(fn_path, class_name, return_dict):
+ return_dict['success'] = False
+ return_dict['traceback'] = ""
+ try:
+ # Create a spec for the module
+ module_spec = importlib.util.spec_from_file_location('example_module', fn_path)
+ # Load the module
+ example_module = importlib.util.module_from_spec(module_spec)
+ module_spec.loader.exec_module(example_module)
+ # Now you can use the module
+ some_class = getattr(example_module, class_name)
+ # Now you can create an instance of the class
+ instance = some_class()
+ return_dict['success'] = True
+ return
+ except:
+ return_dict['traceback'] = trimmed_format_exc()
+ return
+
+def subprocess_worker(code, file_path, return_dict):
+ return_dict['result'] = None
+ return_dict['success'] = False
+ return_dict['traceback'] = ""
+ try:
+ module_file = 'gpt_fn_' + gen_time_str().replace('-','_')
+ fn_path = f'{get_log_folder(plugin_name="gen_plugin_run")}/{module_file}.py'
+ with open(fn_path, 'w', encoding='utf8') as f: f.write(code)
+ class_name = get_class_name(code)
+ # Create a spec for the module
+ module_spec = importlib.util.spec_from_file_location('example_module', fn_path)
+ # Load the module
+ example_module = importlib.util.module_from_spec(module_spec)
+ module_spec.loader.exec_module(example_module)
+ # Now you can use the module
+ some_class = getattr(example_module, class_name)
+ # Now you can create an instance of the class
+ instance = some_class()
+ return_dict['result'] = instance.run(file_path)
+ return_dict['success'] = True
+ except:
+ return_dict['traceback'] = trimmed_format_exc()
diff --git a/crazy_functions/函数动态生成.py b/crazy_functions/函数动态生成.py
new file mode 100644
index 0000000..d16ef88
--- /dev/null
+++ b/crazy_functions/函数动态生成.py
@@ -0,0 +1,252 @@
+# 本源代码中, ⭐ = 关键步骤
+"""
+测试:
+ - 裁剪图像,保留下半部分
+ - 交换图像的蓝色通道和红色通道
+ - 将图像转为灰度图像
+ - 将csv文件转excel表格
+
+Testing:
+ - Crop the image, keeping the bottom half.
+ - Swap the blue channel and red channel of the image.
+ - Convert the image to grayscale.
+ - Convert the CSV file to an Excel spreadsheet.
+"""
+
+
+from toolbox import CatchException, update_ui, gen_time_str, trimmed_format_exc, is_the_upload_folder
+from toolbox import promote_file_to_downloadzone, get_log_folder, update_ui_lastest_msg
+from .crazy_utils import request_gpt_model_in_new_thread_with_ui_alive, get_plugin_arg
+from .crazy_utils import input_clipping, try_install_deps
+from crazy_functions.gen_fns.gen_fns_shared import is_function_successfully_generated
+from crazy_functions.gen_fns.gen_fns_shared import get_class_name
+from crazy_functions.gen_fns.gen_fns_shared import subprocess_worker
+from crazy_functions.gen_fns.gen_fns_shared import try_make_module
+import os
+import time
+import glob
+import multiprocessing
+
+templete = """
+```python
+import ... # Put dependencies here, e.g. import numpy as np.
+
+class TerminalFunction(object): # Do not change the name of the class, The name of the class must be `TerminalFunction`
+
+ def run(self, path): # The name of the function must be `run`, it takes only a positional argument.
+ # rewrite the function you have just written here
+ ...
+ return generated_file_path
+```
+"""
+
+def inspect_dependency(chatbot, history):
+ yield from update_ui(chatbot=chatbot, history=history) # 刷新界面
+ return True
+
+def get_code_block(reply):
+ import re
+ pattern = r"```([\s\S]*?)```" # regex pattern to match code blocks
+ matches = re.findall(pattern, reply) # find all code blocks in text
+ if len(matches) == 1:
+ return matches[0].strip('python') # code block
+ for match in matches:
+ if 'class TerminalFunction' in match:
+ return match.strip('python') # code block
+ raise RuntimeError("GPT is not generating proper code.")
+
+def gpt_interact_multi_step(txt, file_type, llm_kwargs, chatbot, history):
+ # 输入
+ prompt_compose = [
+ f'Your job:\n'
+ f'1. write a single Python function, which takes a path of a `{file_type}` file as the only argument and returns a `string` containing the result of analysis or the path of generated files. \n',
+ f"2. You should write this function to perform following task: " + txt + "\n",
+ f"3. Wrap the output python function with markdown codeblock."
+ ]
+ i_say = "".join(prompt_compose)
+ demo = []
+
+ # 第一步
+ gpt_say = yield from request_gpt_model_in_new_thread_with_ui_alive(
+ inputs=i_say, inputs_show_user=i_say,
+ llm_kwargs=llm_kwargs, chatbot=chatbot, history=demo,
+ sys_prompt= r"You are a world-class programmer."
+ )
+ history.extend([i_say, gpt_say])
+ yield from update_ui(chatbot=chatbot, history=history) # 刷新界面 # 界面更新
+
+ # 第二步
+ prompt_compose = [
+ "If previous stage is successful, rewrite the function you have just written to satisfy following templete: \n",
+ templete
+ ]
+ i_say = "".join(prompt_compose); inputs_show_user = "If previous stage is successful, rewrite the function you have just written to satisfy executable templete. "
+ gpt_say = yield from request_gpt_model_in_new_thread_with_ui_alive(
+ inputs=i_say, inputs_show_user=inputs_show_user,
+ llm_kwargs=llm_kwargs, chatbot=chatbot, history=history,
+ sys_prompt= r"You are a programmer. You need to replace `...` with valid packages, do not give `...` in your answer!"
+ )
+ code_to_return = gpt_say
+ history.extend([i_say, gpt_say])
+ yield from update_ui(chatbot=chatbot, history=history) # 刷新界面 # 界面更新
+
+ # # 第三步
+ # i_say = "Please list to packages to install to run the code above. Then show me how to use `try_install_deps` function to install them."
+ # i_say += 'For instance. `try_install_deps(["opencv-python", "scipy", "numpy"])`'
+ # installation_advance = yield from request_gpt_model_in_new_thread_with_ui_alive(
+ # inputs=i_say, inputs_show_user=inputs_show_user,
+ # llm_kwargs=llm_kwargs, chatbot=chatbot, history=history,
+ # sys_prompt= r"You are a programmer."
+ # )
+
+ # # # 第三步
+ # i_say = "Show me how to use `pip` to install packages to run the code above. "
+ # i_say += 'For instance. `pip install -r opencv-python scipy numpy`'
+ # installation_advance = yield from request_gpt_model_in_new_thread_with_ui_alive(
+ # inputs=i_say, inputs_show_user=i_say,
+ # llm_kwargs=llm_kwargs, chatbot=chatbot, history=history,
+ # sys_prompt= r"You are a programmer."
+ # )
+ installation_advance = ""
+
+ return code_to_return, installation_advance, txt, file_type, llm_kwargs, chatbot, history
+
+
+
+
+def for_immediate_show_off_when_possible(file_type, fp, chatbot):
+ if file_type in ['png', 'jpg']:
+ image_path = os.path.abspath(fp)
+ chatbot.append(['这是一张图片, 展示如下:',
+ f'本地文件地址:
`{image_path}`
'+
+ f'本地文件预览: