This commit is contained in:
nick 2024-09-17 16:37:39 -07:00
commit 0e70db4013

View File

@ -1099,6 +1099,7 @@ async def send_json_override(self, event, data, sid=None):
if prompt_id in comfy_message_queues: if prompt_id in comfy_message_queues:
comfy_message_queues[prompt_id].put_nowait({"event": event, "data": data}) comfy_message_queues[prompt_id].put_nowait({"event": event, "data": data})
asyncio.create_task(update_run_ws_event(prompt_id, event, data))
# event_emitter.emit("send_json", { # event_emitter.emit("send_json", {
# "event": event, # "event": event,
# "data": data # "data": data
@ -1239,6 +1240,30 @@ async def update_run_live_status(prompt_id, live_status, calculated_progress: fl
await async_request_with_retry("POST", status_endpoint, token=token, json=body) await async_request_with_retry("POST", status_endpoint, token=token, json=body)
async def update_run_ws_event(prompt_id: str, event: str, data: dict):
if prompt_id not in prompt_metadata:
return
# print("update_run_ws_event", prompt_id, event, data)
status_endpoint = prompt_metadata[prompt_id].status_endpoint
if status_endpoint is None:
return
token = prompt_metadata[prompt_id].token
gpu_event_id = prompt_metadata[prompt_id].gpu_event_id or None
body = {
"run_id": prompt_id,
"ws_event": {
"event": event,
"data": data,
"gpu_event_id": gpu_event_id,
},
}
await async_request_with_retry("POST", status_endpoint, token=token, json=body)
async def update_run(prompt_id: str, status: Status): async def update_run(prompt_id: str, status: Status):
global last_read_line_number global last_read_line_number
@ -1539,7 +1564,7 @@ async def handle_upload(
prompt_id: str, data, key: str, content_type_key: str, default_content_type: str prompt_id: str, data, key: str, content_type_key: str, default_content_type: str
): ):
items = data.get(key, []) items = data.get(key, [])
upload_tasks = [] # upload_tasks = []
for item in items: for item in items:
# Skipping temp files # Skipping temp files
@ -1555,8 +1580,15 @@ async def handle_upload(
elif file_extension == ".webp": elif file_extension == ".webp":
file_type = "image/webp" file_type = "image/webp"
upload_tasks.append( # upload_tasks.append(upload_file(
upload_file( # prompt_id,
# item.get("filename"),
# subfolder=item.get("subfolder"),
# type=item.get("type"),
# content_type=file_type,
# item=item
# ))
await upload_file(
prompt_id, prompt_id,
item.get("filename"), item.get("filename"),
subfolder=item.get("subfolder"), subfolder=item.get("subfolder"),
@ -1564,26 +1596,21 @@ async def handle_upload(
content_type=file_type, content_type=file_type,
item=item, item=item,
) )
)
# Execute all upload tasks concurrently # Execute all upload tasks concurrently
await asyncio.gather(*upload_tasks) # await asyncio.gather(*upload_tasks)
async def upload_in_background( async def upload_in_background(
prompt_id: str, data, node_id=None, have_upload=True, node_meta=None prompt_id: str, data, node_id=None, have_upload=True, node_meta=None
): ):
try: try:
upload_tasks = [ await handle_upload(prompt_id, data, "images", "content_type", "image/png")
handle_upload(prompt_id, data, "images", "content_type", "image/png"), await handle_upload(prompt_id, data, "files", "content_type", "image/png")
handle_upload(prompt_id, data, "files", "content_type", "image/png"), await handle_upload(prompt_id, data, "gifs", "format", "image/gif")
handle_upload(prompt_id, data, "gifs", "format", "image/gif"), await handle_upload(
handle_upload(
prompt_id, data, "mesh", "format", "application/octet-stream" prompt_id, data, "mesh", "format", "application/octet-stream"
), )
]
await asyncio.gather(*upload_tasks)
status_endpoint = prompt_metadata[prompt_id].status_endpoint status_endpoint = prompt_metadata[prompt_id].status_endpoint
token = prompt_metadata[prompt_id].token token = prompt_metadata[prompt_id].token
@ -1622,6 +1649,7 @@ async def update_run_with_output(
"node_meta": node_meta, "node_meta": node_meta,
"gpu_event_id": gpu_event_id, "gpu_event_id": gpu_event_id,
} }
pprint(body)
have_upload_media = False have_upload_media = False
if data is not None: if data is not None:
have_upload_media = ( have_upload_media = (
@ -1640,15 +1668,14 @@ async def update_run_with_output(
if have_upload_media: if have_upload_media:
await update_file_status(prompt_id, data, True, node_id=node_id) await update_file_status(prompt_id, data, True, node_id=node_id)
asyncio.create_task( # asyncio.create_task(upload_in_background(prompt_id, data, node_id=node_id, have_upload=have_upload_media, node_meta=node_meta))
upload_in_background( await upload_in_background(
prompt_id, prompt_id,
data, data,
node_id=node_id, node_id=node_id,
have_upload=have_upload_media, have_upload=have_upload_media,
node_meta=node_meta, node_meta=node_meta,
) )
)
# await upload_in_background(prompt_id, data, node_id=node_id, have_upload=have_upload) # await upload_in_background(prompt_id, data, node_id=node_id, have_upload=have_upload)
except Exception as e: except Exception as e: