重构异步代码,增强可读性
This commit is contained in:
parent
2dc8718041
commit
9648d78453
@ -1,8 +1,36 @@
|
||||
import time, threading
|
||||
import time, threading, json
|
||||
|
||||
|
||||
class AliyunASR():
|
||||
|
||||
def test_on_sentence_begin(self, message, *args):
|
||||
print("test_on_sentence_begin:{}".format(message))
|
||||
|
||||
def test_on_sentence_end(self, message, *args):
|
||||
print("test_on_sentence_end:{}".format(message))
|
||||
message = json.loads(message)
|
||||
self.parsed_sentence = message['payload']['result']
|
||||
self.event_on_entence_end.set()
|
||||
|
||||
def test_on_start(self, message, *args):
|
||||
print("test_on_start:{}".format(message))
|
||||
|
||||
def test_on_error(self, message, *args):
|
||||
print("on_error args=>{}".format(args))
|
||||
|
||||
def test_on_close(self, *args):
|
||||
print("on_close: args=>{}".format(args))
|
||||
|
||||
def test_on_result_chg(self, message, *args):
|
||||
print("test_on_chg:{}".format(message))
|
||||
message = json.loads(message)
|
||||
self.parsed_text = message['payload']['result']
|
||||
self.event_on_result_chg.set()
|
||||
|
||||
def test_on_completed(self, message, *args):
|
||||
print("on_completed:args=>{} message=>{}".format(args, message))
|
||||
|
||||
|
||||
def audio_convertion_thread(self, uuid):
|
||||
# 在一个异步线程中采集音频
|
||||
import nls # pip install git+https://github.com/aliyun/alibabacloud-nls-python-sdk.git
|
||||
|
@ -7,15 +7,38 @@ import numpy as np
|
||||
from .live_audio.aliyunASR import AliyunASR
|
||||
import json
|
||||
|
||||
def gpt_thread_worker(i_say, llm_kwargs, history, sys_prompt, observe_window, index):
|
||||
try:
|
||||
gpt_say_partial = predict_no_ui_long_connection(inputs=i_say, llm_kwargs=llm_kwargs, history=[], sys_prompt=sys_prompt, observe_window=observe_window[index])
|
||||
except ConnectionAbortedError as token_exceed_err:
|
||||
print('至少一个线程任务Token溢出而失败', e)
|
||||
except Exception as e:
|
||||
print('至少一个线程任务意外失败', e)
|
||||
|
||||
|
||||
class AsyncGptTask():
|
||||
def __init__(self) -> None:
|
||||
self.observe_future = []
|
||||
self.observe_future_chatbot_index = []
|
||||
|
||||
def gpt_thread_worker(self, i_say, llm_kwargs, history, sys_prompt, observe_window, index):
|
||||
try:
|
||||
gpt_say_partial = predict_no_ui_long_connection(inputs=i_say, llm_kwargs=llm_kwargs, history=[], sys_prompt=sys_prompt, observe_window=observe_window[index])
|
||||
except ConnectionAbortedError as token_exceed_err:
|
||||
print('至少一个线程任务Token溢出而失败', e)
|
||||
except Exception as e:
|
||||
print('至少一个线程任务意外失败', e)
|
||||
|
||||
def add_async_gpt_task(self, i_say, chatbot_index, llm_kwargs, history, system_prompt):
|
||||
self.observe_future.append([""])
|
||||
self.observe_future_chatbot_index.append(chatbot_index)
|
||||
cur_index = len(self.observe_future)-1
|
||||
th_new = threading.Thread(target=self.gpt_thread_worker, args=(i_say, llm_kwargs, history, system_prompt, self.observe_future, cur_index))
|
||||
th_new.daemon = True
|
||||
th_new.start()
|
||||
|
||||
def update_chatbot(self, chatbot):
|
||||
for of, ofci in zip(self.observe_future, self.observe_future_chatbot_index):
|
||||
try:
|
||||
chatbot[ofci] = list(chatbot[ofci])
|
||||
chatbot[ofci][1] = of[0]
|
||||
except:
|
||||
self.observe_future = []
|
||||
self.observe_future_chatbot_index = []
|
||||
return chatbot
|
||||
|
||||
class InterviewAssistant(AliyunASR):
|
||||
def __init__(self):
|
||||
@ -38,33 +61,6 @@ class InterviewAssistant(AliyunASR):
|
||||
# th2.daemon = True
|
||||
# th2.start()
|
||||
|
||||
def test_on_sentence_begin(self, message, *args):
|
||||
print("test_on_sentence_begin:{}".format(message))
|
||||
|
||||
def test_on_sentence_end(self, message, *args):
|
||||
print("test_on_sentence_end:{}".format(message))
|
||||
message = json.loads(message)
|
||||
self.parsed_sentence = message['payload']['result']
|
||||
self.event_on_entence_end.set()
|
||||
|
||||
def test_on_start(self, message, *args):
|
||||
print("test_on_start:{}".format(message))
|
||||
|
||||
def test_on_error(self, message, *args):
|
||||
print("on_error args=>{}".format(args))
|
||||
|
||||
def test_on_close(self, *args):
|
||||
print("on_close: args=>{}".format(args))
|
||||
|
||||
def test_on_result_chg(self, message, *args):
|
||||
print("test_on_chg:{}".format(message))
|
||||
message = json.loads(message)
|
||||
self.parsed_text = message['payload']['result']
|
||||
self.event_on_result_chg.set()
|
||||
|
||||
def test_on_completed(self, message, *args):
|
||||
print("on_completed:args=>{} message=>{}".format(args, message))
|
||||
|
||||
def gpt_answer(self, text, chatbot, history, llm_kwargs):
|
||||
i_say = inputs_show_user = text
|
||||
gpt_say = yield from request_gpt_model_in_new_thread_with_ui_alive(
|
||||
@ -79,43 +75,29 @@ class InterviewAssistant(AliyunASR):
|
||||
# main plugin function
|
||||
self.init(chatbot)
|
||||
chatbot.append(["", ""])
|
||||
observe_future = []
|
||||
observe_future_chatbot_index = []
|
||||
yield from update_ui(chatbot=chatbot, history=history) # 刷新界面
|
||||
self.agt = AsyncGptTask()
|
||||
|
||||
while True:
|
||||
self.event_on_result_chg.wait(timeout=0.25) # run once every 0.25 second
|
||||
chatbot = self.agt.update_chatbot(chatbot) # 将子线程的gpt结果写入chatbot
|
||||
yield from update_ui(chatbot=chatbot, history=history) # 刷新界面
|
||||
|
||||
self.event_on_result_chg.wait(timeout=0.5)
|
||||
for of, ofci in zip(observe_future, observe_future_chatbot_index):
|
||||
try:
|
||||
chatbot[ofci] = list(chatbot[ofci])
|
||||
chatbot[ofci][1] = of[0]
|
||||
yield from update_ui(chatbot=chatbot, history=history) # 刷新界面
|
||||
except:
|
||||
observe_future = []
|
||||
observe_future_chatbot_index = []
|
||||
|
||||
if self.event_on_result_chg.is_set():
|
||||
self.event_on_result_chg.clear()
|
||||
|
||||
if self.event_on_result_chg.is_set():
|
||||
# update audio decode result
|
||||
self.event_on_result_chg.clear()
|
||||
chatbot[-1] = list(chatbot[-1])
|
||||
chatbot[-1][0] = self.parsed_text
|
||||
yield from update_ui(chatbot=chatbot, history=history) # 刷新界面
|
||||
|
||||
if self.event_on_entence_end.is_set():
|
||||
# called when a sentence is done
|
||||
# called when a sentence has ended
|
||||
self.event_on_entence_end.clear()
|
||||
chatbot[-1] = list(chatbot[-1])
|
||||
chatbot[-1][0] = self.parsed_sentence
|
||||
chatbot[-1][1] = "[waiting gpt reply]"
|
||||
chatbot[-1] = [self.parsed_sentence, "[waiting gpt reply]"]
|
||||
yield from update_ui(chatbot=chatbot, history=history) # 刷新界面
|
||||
# add gpt task
|
||||
observe_future.append([""])
|
||||
observe_future_chatbot_index.append(len(chatbot)-1)
|
||||
cur_index = len(observe_future)-1
|
||||
th_new = threading.Thread(target=gpt_thread_worker, args=(self.parsed_sentence, llm_kwargs, history, system_prompt, observe_future, cur_index))
|
||||
th_new.daemon = True
|
||||
th_new.start()
|
||||
# add gpt task 创建子线程请求gpt,避免线程阻塞
|
||||
self.agt.add_async_gpt_task(self.parsed_sentence, len(chatbot)-1, llm_kwargs, history, system_prompt)
|
||||
chatbot.append(["", ""])
|
||||
yield from update_ui(chatbot=chatbot, history=history) # 刷新界面
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user