From e692beb00928e202aa40d85c22beb1f52f6a289f Mon Sep 17 00:00:00 2001 From: bennykok Date: Mon, 16 Sep 2024 15:34:20 -0700 Subject: [PATCH 1/5] feat: realtime log sync --- custom_routes.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/custom_routes.py b/custom_routes.py index 979b298..65369ff 100644 --- a/custom_routes.py +++ b/custom_routes.py @@ -971,6 +971,7 @@ async def send_json_override(self, event, data, sid=None): "data": data }) + asyncio.create_task(update_run_ws_event(prompt_id, event, data)) # event_emitter.emit("send_json", { # "event": event, # "data": data @@ -1094,6 +1095,26 @@ async def update_run_live_status(prompt_id, live_status, calculated_progress: fl # requests.post(status_endpoint, json=body) await async_request_with_retry('POST', status_endpoint, token=token, json=body) +async def update_run_ws_event(prompt_id: str, event: str, data: dict): + if prompt_id not in prompt_metadata: + return + + # print("update_run_ws_event", prompt_id, event, data) + status_endpoint = prompt_metadata[prompt_id].status_endpoint + + if status_endpoint is None: + return + + token = prompt_metadata[prompt_id].token + body = { + "run_id": prompt_id, + "ws_event": { + "event": event, + "data": data, + }, + } + await async_request_with_retry('POST', status_endpoint, token=token, json=body) + async def update_run(prompt_id: str, status: Status): global last_read_line_number From 946571e32e4640147e8b45cc61a5e27070bde6d8 Mon Sep 17 00:00:00 2001 From: bennykok Date: Mon, 16 Sep 2024 18:54:05 -0700 Subject: [PATCH 2/5] fix: await --- custom_routes.py | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/custom_routes.py b/custom_routes.py index 65369ff..b20b2e8 100644 --- a/custom_routes.py +++ b/custom_routes.py @@ -1398,14 +1398,10 @@ async def handle_upload(prompt_id: str, data, key: str, content_type_key: str, d # Upload files in the background async def upload_in_background(prompt_id: str, data, node_id=None, have_upload=True, node_meta=None): try: - upload_tasks = [ - handle_upload(prompt_id, data, 'images', "content_type", "image/png"), - handle_upload(prompt_id, data, 'files', "content_type", "image/png"), - handle_upload(prompt_id, data, 'gifs', "format", "image/gif"), - handle_upload(prompt_id, data, 'mesh', "format", "application/octet-stream") - ] - - await asyncio.gather(*upload_tasks) + await handle_upload(prompt_id, data, 'images', "content_type", "image/png") + await handle_upload(prompt_id, data, 'files', "content_type", "image/png") + await handle_upload(prompt_id, data, 'gifs', "format", "image/gif") + await handle_upload(prompt_id, data, 'mesh', "format", "application/octet-stream") status_endpoint = prompt_metadata[prompt_id].status_endpoint token = prompt_metadata[prompt_id].token @@ -1451,7 +1447,8 @@ async def update_run_with_output(prompt_id, data, node_id=None, node_meta=None): if have_upload_media: await update_file_status(prompt_id, data, True, node_id=node_id) - asyncio.create_task(upload_in_background(prompt_id, data, node_id=node_id, have_upload=have_upload_media, node_meta=node_meta)) + # asyncio.create_task(upload_in_background(prompt_id, data, node_id=node_id, have_upload=have_upload_media, node_meta=node_meta)) + await upload_in_background(prompt_id, data, node_id=node_id, have_upload=have_upload_media, node_meta=node_meta) # await upload_in_background(prompt_id, data, node_id=node_id, have_upload=have_upload) except Exception as e: From c08b68c41f8f0bae587675f7592dcdde28d09627 Mon Sep 17 00:00:00 2001 From: bennykok Date: Tue, 17 Sep 2024 12:56:42 -0700 Subject: [PATCH 3/5] feat: experiment with await + asyncio.gather for multi file in same node --- custom_routes.py | 40 ++++++++++++++++++++++++---------------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/custom_routes.py b/custom_routes.py index b20b2e8..1ebd064 100644 --- a/custom_routes.py +++ b/custom_routes.py @@ -1359,7 +1359,7 @@ async def update_file_status(prompt_id: str, data, uploading, have_error=False, async def handle_upload(prompt_id: str, data, key: str, content_type_key: str, default_content_type: str): items = data.get(key, []) - # upload_tasks = [] + upload_tasks = [] for item in items: # Skipping temp files @@ -1375,33 +1375,41 @@ async def handle_upload(prompt_id: str, data, key: str, content_type_key: str, d elif file_extension == '.webp': file_type = 'image/webp' - # upload_tasks.append(upload_file( - # prompt_id, - # item.get("filename"), - # subfolder=item.get("subfolder"), - # type=item.get("type"), - # content_type=file_type, - # item=item - # )) - await upload_file( + upload_tasks.append(upload_file( prompt_id, item.get("filename"), subfolder=item.get("subfolder"), type=item.get("type"), content_type=file_type, item=item - ) + )) + # await upload_file( + # prompt_id, + # item.get("filename"), + # subfolder=item.get("subfolder"), + # type=item.get("type"), + # content_type=file_type, + # item=item + # ) # Execute all upload tasks concurrently - # await asyncio.gather(*upload_tasks) + await asyncio.gather(*upload_tasks) # Upload files in the background async def upload_in_background(prompt_id: str, data, node_id=None, have_upload=True, node_meta=None): try: - await handle_upload(prompt_id, data, 'images', "content_type", "image/png") - await handle_upload(prompt_id, data, 'files', "content_type", "image/png") - await handle_upload(prompt_id, data, 'gifs', "format", "image/gif") - await handle_upload(prompt_id, data, 'mesh', "format", "application/octet-stream") + # await handle_upload(prompt_id, data, 'images', "content_type", "image/png") + # await handle_upload(prompt_id, data, 'files', "content_type", "image/png") + # await handle_upload(prompt_id, data, 'gifs', "format", "image/gif") + # await handle_upload(prompt_id, data, 'mesh', "format", "application/octet-stream") + upload_tasks = [ + handle_upload(prompt_id, data, 'images', "content_type", "image/png"), + handle_upload(prompt_id, data, 'files', "content_type", "image/png"), + handle_upload(prompt_id, data, 'gifs', "format", "image/gif"), + handle_upload(prompt_id, data, 'mesh', "format", "application/octet-stream") + ] + + await asyncio.gather(*upload_tasks) status_endpoint = prompt_metadata[prompt_id].status_endpoint token = prompt_metadata[prompt_id].token From 212daa838c1d86b3cf6f57bdbd8a0ad9f52d1448 Mon Sep 17 00:00:00 2001 From: bennykok Date: Tue, 17 Sep 2024 14:25:13 -0700 Subject: [PATCH 4/5] Revert "feat: experiment with await + asyncio.gather for multi file in same node" This reverts commit c08b68c41f8f0bae587675f7592dcdde28d09627. --- custom_routes.py | 40 ++++++++++++++++------------------------ 1 file changed, 16 insertions(+), 24 deletions(-) diff --git a/custom_routes.py b/custom_routes.py index 1ebd064..b20b2e8 100644 --- a/custom_routes.py +++ b/custom_routes.py @@ -1359,7 +1359,7 @@ async def update_file_status(prompt_id: str, data, uploading, have_error=False, async def handle_upload(prompt_id: str, data, key: str, content_type_key: str, default_content_type: str): items = data.get(key, []) - upload_tasks = [] + # upload_tasks = [] for item in items: # Skipping temp files @@ -1375,41 +1375,33 @@ async def handle_upload(prompt_id: str, data, key: str, content_type_key: str, d elif file_extension == '.webp': file_type = 'image/webp' - upload_tasks.append(upload_file( - prompt_id, - item.get("filename"), - subfolder=item.get("subfolder"), - type=item.get("type"), - content_type=file_type, - item=item - )) - # await upload_file( + # upload_tasks.append(upload_file( # prompt_id, # item.get("filename"), # subfolder=item.get("subfolder"), # type=item.get("type"), # content_type=file_type, # item=item - # ) + # )) + await upload_file( + prompt_id, + item.get("filename"), + subfolder=item.get("subfolder"), + type=item.get("type"), + content_type=file_type, + item=item + ) # Execute all upload tasks concurrently - await asyncio.gather(*upload_tasks) + # await asyncio.gather(*upload_tasks) # Upload files in the background async def upload_in_background(prompt_id: str, data, node_id=None, have_upload=True, node_meta=None): try: - # await handle_upload(prompt_id, data, 'images', "content_type", "image/png") - # await handle_upload(prompt_id, data, 'files', "content_type", "image/png") - # await handle_upload(prompt_id, data, 'gifs', "format", "image/gif") - # await handle_upload(prompt_id, data, 'mesh', "format", "application/octet-stream") - upload_tasks = [ - handle_upload(prompt_id, data, 'images', "content_type", "image/png"), - handle_upload(prompt_id, data, 'files', "content_type", "image/png"), - handle_upload(prompt_id, data, 'gifs', "format", "image/gif"), - handle_upload(prompt_id, data, 'mesh', "format", "application/octet-stream") - ] - - await asyncio.gather(*upload_tasks) + await handle_upload(prompt_id, data, 'images', "content_type", "image/png") + await handle_upload(prompt_id, data, 'files', "content_type", "image/png") + await handle_upload(prompt_id, data, 'gifs', "format", "image/gif") + await handle_upload(prompt_id, data, 'mesh', "format", "application/octet-stream") status_endpoint = prompt_metadata[prompt_id].status_endpoint token = prompt_metadata[prompt_id].token From fb6bb2357a1c7e9f4a23e36be731ee953a219528 Mon Sep 17 00:00:00 2001 From: bennykok Date: Tue, 17 Sep 2024 14:28:56 -0700 Subject: [PATCH 5/5] Reapply "fix: back to sequential file upload" This reverts commit 1f5a88b88805f8f01ba1803b0ecec2e796937417. --- custom_routes.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/custom_routes.py b/custom_routes.py index 190e8b4..86caee0 100644 --- a/custom_routes.py +++ b/custom_routes.py @@ -1360,7 +1360,7 @@ async def update_file_status(prompt_id: str, data, uploading, have_error=False, async def handle_upload(prompt_id: str, data, key: str, content_type_key: str, default_content_type: str): items = data.get(key, []) - upload_tasks = [] + # upload_tasks = [] for item in items: # Skipping temp files @@ -1376,17 +1376,25 @@ async def handle_upload(prompt_id: str, data, key: str, content_type_key: str, d elif file_extension == '.webp': file_type = 'image/webp' - upload_tasks.append(upload_file( + # upload_tasks.append(upload_file( + # prompt_id, + # item.get("filename"), + # subfolder=item.get("subfolder"), + # type=item.get("type"), + # content_type=file_type, + # item=item + # )) + await upload_file( prompt_id, item.get("filename"), subfolder=item.get("subfolder"), type=item.get("type"), content_type=file_type, item=item - )) + ) # Execute all upload tasks concurrently - await asyncio.gather(*upload_tasks) + # await asyncio.gather(*upload_tasks) # Upload files in the background async def upload_in_background(prompt_id: str, data, node_id=None, have_upload=True, node_meta=None): @@ -1425,6 +1433,7 @@ async def update_run_with_output(prompt_id, data, node_id=None, node_meta=None): "output_data": data, "node_meta": node_meta, } + pprint(body) have_upload_media = False if data is not None: have_upload_media = 'images' in data or 'files' in data or 'gifs' in data or 'mesh' in data