From c115c22a911ecbfeba8d46b4590ee68648657a3f Mon Sep 17 00:00:00 2001 From: bennykok Date: Fri, 4 Oct 2024 16:03:53 -0700 Subject: [PATCH] fix: send ws after cd logic --- custom_routes.py | 39 ++++++++++++++++----------------------- 1 file changed, 16 insertions(+), 23 deletions(-) diff --git a/custom_routes.py b/custom_routes.py index 047c5bc..f342e1a 100644 --- a/custom_routes.py +++ b/custom_routes.py @@ -395,7 +395,7 @@ async def comfy_deploy_run(request): # We proxy the request to Comfy Deploy, this is a native run if "is_native_run" in data: async with aiohttp.ClientSession() as session: - pprint(data) + # pprint(data) # headers = request.headers.copy() # headers['Content-Type'] = 'application/json' async with session.post(data.get("native_run_api_endpoint"), json=data, headers={ @@ -403,7 +403,7 @@ async def comfy_deploy_run(request): 'Authorization': request.headers.get('Authorization') }) as response: data = await response.json() - print(data) + # print(data) if "cd_token" in data: token = data["cd_token"] @@ -1002,24 +1002,14 @@ async def send_json_override(self, event, data, sid=None): target_sid = sid if target_sid == "comfy_deploy_instance": target_sid = None - + # now we send everything await asyncio.wait([ asyncio.create_task(send(event, data, sid=target_sid)), asyncio.create_task(self.send_json_original(event, data, sid)) ]) - if prompt_id in comfy_message_queues: - comfy_message_queues[prompt_id].put_nowait({ - "event": event, - "data": data - }) - asyncio.create_task(update_run_ws_event(prompt_id, event, data)) - # event_emitter.emit("send_json", { - # "event": event, - # "data": data - # }) if event == 'execution_start': await update_run(prompt_id, Status.RUNNING) @@ -1058,7 +1048,7 @@ async def send_json_override(self, event, data, sid=None): return prompt_metadata[prompt_id].last_updated_node = node class_type = prompt_metadata[prompt_id].workflow_api[node]['class_type'] - logger.info(f"At: {calculated_progress * 100}% - {class_type}") + logger.info(f"At: {round(calculated_progress * 100)}% - {class_type}") await send("live_status", { "prompt_id": prompt_id, "current_node": class_type, @@ -1087,7 +1077,6 @@ async def send_json_override(self, event, data, sid=None): if prompt_id in prompt_metadata: node = data.get('node') class_type = prompt_metadata[prompt_id].workflow_api[node]['class_type'] - logger.info(f"Executed {class_type} {data}") node_meta = { "node_id": node, "node_class": class_type, @@ -1095,12 +1084,16 @@ async def send_json_override(self, event, data, sid=None): if class_type == "PreviewImage": logger.info("Skipping preview image") return + await update_run_with_output(prompt_id, data.get('output'), node_id=data.get('node'), node_meta=node_meta) + logger.info(f"Executed {class_type} {data}") else: logger.info(f"Executed {data}") - await update_run_with_output(prompt_id, data.get('output'), node_id=data.get('node'), node_meta=node_meta) - # await update_run_with_output(prompt_id, data.get('output'), node_id=data.get('node')) - # update_run_with_output(prompt_id, data.get('output')) + if prompt_id in comfy_message_queues: + comfy_message_queues[prompt_id].put_nowait({ + "event": event, + "data": data + }) # Global variable to keep track of the last read line number last_read_line_number = 0 @@ -1262,7 +1255,7 @@ async def upload_with_retry(session, url, headers, data, max_retries=3, initial_ response.raise_for_status() # This will raise an exception for 4xx and 5xx status codes response_text = await response.text() - logger.info(f"Response body: {response_text[:1000]}...") + # logger.info(f"Response body: {response_text[:1000]}...") logger.info("Upload successful") return response # Successful upload, exit the retry loop @@ -1319,13 +1312,13 @@ async def upload_file(prompt_id, filename, subfolder=None, content_type="image/p target_url = f"{file_upload_endpoint}?file_name={filename}&run_id={prompt_id}&type={content_type}&version=v2" start_time = time.time() # Start timing here - logger.info(f"Target URL: {target_url}") + # logger.info(f"Target URL: {target_url}") result = await async_request_with_retry("GET", target_url, disable_timeout=True, token=token) end_time = time.time() # End timing after the request is complete logger.info("Time taken for getting file upload endpoint: {:.2f} seconds".format(end_time - start_time)) ok = await result.json() - logger.info(f"Result: {ok}") + # logger.info(f"Result: {ok}") async with aiofiles.open(file, 'rb') as f: data = await f.read() @@ -1338,7 +1331,7 @@ async def upload_file(prompt_id, filename, subfolder=None, content_type="image/p "Content-Length": size, } - logger.info(headers) + # logger.info(headers) if ok.get('include_acl') is True: headers["x-amz-acl"] = "public-read" @@ -1542,7 +1535,7 @@ async def update_run_with_output(prompt_id, data, node_id=None, node_meta=None): "output_data": data, "node_meta": node_meta, } - pprint(body) + # pprint(body) have_upload_media = False if data is not None: have_upload_media = 'images' in data or 'files' in data or 'gifs' in data or 'mesh' in data