diff --git a/custom_routes.py b/custom_routes.py index 670d3a0..ba311ab 100644 --- a/custom_routes.py +++ b/custom_routes.py @@ -1099,6 +1099,7 @@ async def send_json_override(self, event, data, sid=None): if prompt_id in comfy_message_queues: comfy_message_queues[prompt_id].put_nowait({"event": event, "data": data}) + asyncio.create_task(update_run_ws_event(prompt_id, event, data)) # event_emitter.emit("send_json", { # "event": event, # "data": data @@ -1239,6 +1240,30 @@ async def update_run_live_status(prompt_id, live_status, calculated_progress: fl await async_request_with_retry("POST", status_endpoint, token=token, json=body) +async def update_run_ws_event(prompt_id: str, event: str, data: dict): + if prompt_id not in prompt_metadata: + return + + # print("update_run_ws_event", prompt_id, event, data) + status_endpoint = prompt_metadata[prompt_id].status_endpoint + + if status_endpoint is None: + return + + token = prompt_metadata[prompt_id].token + gpu_event_id = prompt_metadata[prompt_id].gpu_event_id or None + + body = { + "run_id": prompt_id, + "ws_event": { + "event": event, + "data": data, + "gpu_event_id": gpu_event_id, + }, + } + await async_request_with_retry("POST", status_endpoint, token=token, json=body) + + async def update_run(prompt_id: str, status: Status): global last_read_line_number @@ -1539,7 +1564,7 @@ async def handle_upload( prompt_id: str, data, key: str, content_type_key: str, default_content_type: str ): items = data.get(key, []) - upload_tasks = [] + # upload_tasks = [] for item in items: # Skipping temp files @@ -1555,35 +1580,37 @@ async def handle_upload( elif file_extension == ".webp": file_type = "image/webp" - upload_tasks.append( - upload_file( - prompt_id, - item.get("filename"), - subfolder=item.get("subfolder"), - type=item.get("type"), - content_type=file_type, - item=item, - ) + # upload_tasks.append(upload_file( + # prompt_id, + # item.get("filename"), + # subfolder=item.get("subfolder"), + # type=item.get("type"), + # content_type=file_type, + # item=item + # )) + await upload_file( + prompt_id, + item.get("filename"), + subfolder=item.get("subfolder"), + type=item.get("type"), + content_type=file_type, + item=item, ) # Execute all upload tasks concurrently - await asyncio.gather(*upload_tasks) + # await asyncio.gather(*upload_tasks) async def upload_in_background( prompt_id: str, data, node_id=None, have_upload=True, node_meta=None ): try: - upload_tasks = [ - handle_upload(prompt_id, data, "images", "content_type", "image/png"), - handle_upload(prompt_id, data, "files", "content_type", "image/png"), - handle_upload(prompt_id, data, "gifs", "format", "image/gif"), - handle_upload( - prompt_id, data, "mesh", "format", "application/octet-stream" - ), - ] - - await asyncio.gather(*upload_tasks) + await handle_upload(prompt_id, data, "images", "content_type", "image/png") + await handle_upload(prompt_id, data, "files", "content_type", "image/png") + await handle_upload(prompt_id, data, "gifs", "format", "image/gif") + await handle_upload( + prompt_id, data, "mesh", "format", "application/octet-stream" + ) status_endpoint = prompt_metadata[prompt_id].status_endpoint token = prompt_metadata[prompt_id].token @@ -1622,6 +1649,7 @@ async def update_run_with_output( "node_meta": node_meta, "gpu_event_id": gpu_event_id, } + pprint(body) have_upload_media = False if data is not None: have_upload_media = ( @@ -1640,14 +1668,13 @@ async def update_run_with_output( if have_upload_media: await update_file_status(prompt_id, data, True, node_id=node_id) - asyncio.create_task( - upload_in_background( - prompt_id, - data, - node_id=node_id, - have_upload=have_upload_media, - node_meta=node_meta, - ) + # asyncio.create_task(upload_in_background(prompt_id, data, node_id=node_id, have_upload=have_upload_media, node_meta=node_meta)) + await upload_in_background( + prompt_id, + data, + node_id=node_id, + have_upload=have_upload_media, + node_meta=node_meta, ) # await upload_in_background(prompt_id, data, node_id=node_id, have_upload=have_upload)