From 9648d784535cec9f93fb63f958cc965ff41266cd Mon Sep 17 00:00:00 2001 From: 505030475 Date: Mon, 3 Jul 2023 22:44:10 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E6=9E=84=E5=BC=82=E6=AD=A5=E4=BB=A3?= =?UTF-8?q?=E7=A0=81=EF=BC=8C=E5=A2=9E=E5=BC=BA=E5=8F=AF=E8=AF=BB=E6=80=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- crazy_functions/live_audio/aliyunASR.py | 30 ++++++- crazy_functions/辅助面试.py | 100 ++++++++++-------------- 2 files changed, 70 insertions(+), 60 deletions(-) diff --git a/crazy_functions/live_audio/aliyunASR.py b/crazy_functions/live_audio/aliyunASR.py index da9437a..3b56e49 100644 --- a/crazy_functions/live_audio/aliyunASR.py +++ b/crazy_functions/live_audio/aliyunASR.py @@ -1,8 +1,36 @@ -import time, threading +import time, threading, json class AliyunASR(): + def test_on_sentence_begin(self, message, *args): + print("test_on_sentence_begin:{}".format(message)) + + def test_on_sentence_end(self, message, *args): + print("test_on_sentence_end:{}".format(message)) + message = json.loads(message) + self.parsed_sentence = message['payload']['result'] + self.event_on_entence_end.set() + + def test_on_start(self, message, *args): + print("test_on_start:{}".format(message)) + + def test_on_error(self, message, *args): + print("on_error args=>{}".format(args)) + + def test_on_close(self, *args): + print("on_close: args=>{}".format(args)) + + def test_on_result_chg(self, message, *args): + print("test_on_chg:{}".format(message)) + message = json.loads(message) + self.parsed_text = message['payload']['result'] + self.event_on_result_chg.set() + + def test_on_completed(self, message, *args): + print("on_completed:args=>{} message=>{}".format(args, message)) + + def audio_convertion_thread(self, uuid): # 在一个异步线程中采集音频 import nls # pip install git+https://github.com/aliyun/alibabacloud-nls-python-sdk.git diff --git a/crazy_functions/辅助面试.py b/crazy_functions/辅助面试.py index 8ffb3be..d1d79e3 100644 --- a/crazy_functions/辅助面试.py +++ b/crazy_functions/辅助面试.py @@ -7,15 +7,38 @@ import numpy as np from .live_audio.aliyunASR import AliyunASR import json -def gpt_thread_worker(i_say, llm_kwargs, history, sys_prompt, observe_window, index): - try: - gpt_say_partial = predict_no_ui_long_connection(inputs=i_say, llm_kwargs=llm_kwargs, history=[], sys_prompt=sys_prompt, observe_window=observe_window[index]) - except ConnectionAbortedError as token_exceed_err: - print('至少一个线程任务Token溢出而失败', e) - except Exception as e: - print('至少一个线程任务意外失败', e) +class AsyncGptTask(): + def __init__(self) -> None: + self.observe_future = [] + self.observe_future_chatbot_index = [] + + def gpt_thread_worker(self, i_say, llm_kwargs, history, sys_prompt, observe_window, index): + try: + gpt_say_partial = predict_no_ui_long_connection(inputs=i_say, llm_kwargs=llm_kwargs, history=[], sys_prompt=sys_prompt, observe_window=observe_window[index]) + except ConnectionAbortedError as token_exceed_err: + print('至少一个线程任务Token溢出而失败', e) + except Exception as e: + print('至少一个线程任务意外失败', e) + + def add_async_gpt_task(self, i_say, chatbot_index, llm_kwargs, history, system_prompt): + self.observe_future.append([""]) + self.observe_future_chatbot_index.append(chatbot_index) + cur_index = len(self.observe_future)-1 + th_new = threading.Thread(target=self.gpt_thread_worker, args=(i_say, llm_kwargs, history, system_prompt, self.observe_future, cur_index)) + th_new.daemon = True + th_new.start() + + def update_chatbot(self, chatbot): + for of, ofci in zip(self.observe_future, self.observe_future_chatbot_index): + try: + chatbot[ofci] = list(chatbot[ofci]) + chatbot[ofci][1] = of[0] + except: + self.observe_future = [] + self.observe_future_chatbot_index = [] + return chatbot class InterviewAssistant(AliyunASR): def __init__(self): @@ -38,33 +61,6 @@ class InterviewAssistant(AliyunASR): # th2.daemon = True # th2.start() - def test_on_sentence_begin(self, message, *args): - print("test_on_sentence_begin:{}".format(message)) - - def test_on_sentence_end(self, message, *args): - print("test_on_sentence_end:{}".format(message)) - message = json.loads(message) - self.parsed_sentence = message['payload']['result'] - self.event_on_entence_end.set() - - def test_on_start(self, message, *args): - print("test_on_start:{}".format(message)) - - def test_on_error(self, message, *args): - print("on_error args=>{}".format(args)) - - def test_on_close(self, *args): - print("on_close: args=>{}".format(args)) - - def test_on_result_chg(self, message, *args): - print("test_on_chg:{}".format(message)) - message = json.loads(message) - self.parsed_text = message['payload']['result'] - self.event_on_result_chg.set() - - def test_on_completed(self, message, *args): - print("on_completed:args=>{} message=>{}".format(args, message)) - def gpt_answer(self, text, chatbot, history, llm_kwargs): i_say = inputs_show_user = text gpt_say = yield from request_gpt_model_in_new_thread_with_ui_alive( @@ -79,43 +75,29 @@ class InterviewAssistant(AliyunASR): # main plugin function self.init(chatbot) chatbot.append(["", ""]) - observe_future = [] - observe_future_chatbot_index = [] yield from update_ui(chatbot=chatbot, history=history) # 刷新界面 + self.agt = AsyncGptTask() + while True: + self.event_on_result_chg.wait(timeout=0.25) # run once every 0.25 second + chatbot = self.agt.update_chatbot(chatbot) # 将子线程的gpt结果写入chatbot + yield from update_ui(chatbot=chatbot, history=history) # 刷新界面 - self.event_on_result_chg.wait(timeout=0.5) - for of, ofci in zip(observe_future, observe_future_chatbot_index): - try: - chatbot[ofci] = list(chatbot[ofci]) - chatbot[ofci][1] = of[0] - yield from update_ui(chatbot=chatbot, history=history) # 刷新界面 - except: - observe_future = [] - observe_future_chatbot_index = [] - - if self.event_on_result_chg.is_set(): - self.event_on_result_chg.clear() - + if self.event_on_result_chg.is_set(): # update audio decode result + self.event_on_result_chg.clear() chatbot[-1] = list(chatbot[-1]) chatbot[-1][0] = self.parsed_text yield from update_ui(chatbot=chatbot, history=history) # 刷新界面 if self.event_on_entence_end.is_set(): - # called when a sentence is done + # called when a sentence has ended self.event_on_entence_end.clear() chatbot[-1] = list(chatbot[-1]) - chatbot[-1][0] = self.parsed_sentence - chatbot[-1][1] = "[waiting gpt reply]" + chatbot[-1] = [self.parsed_sentence, "[waiting gpt reply]"] yield from update_ui(chatbot=chatbot, history=history) # 刷新界面 - # add gpt task - observe_future.append([""]) - observe_future_chatbot_index.append(len(chatbot)-1) - cur_index = len(observe_future)-1 - th_new = threading.Thread(target=gpt_thread_worker, args=(self.parsed_sentence, llm_kwargs, history, system_prompt, observe_future, cur_index)) - th_new.daemon = True - th_new.start() + # add gpt task 创建子线程请求gpt,避免线程阻塞 + self.agt.add_async_gpt_task(self.parsed_sentence, len(chatbot)-1, llm_kwargs, history, system_prompt) chatbot.append(["", ""]) yield from update_ui(chatbot=chatbot, history=history) # 刷新界面