From 4927d81e73d73577f76f923506490f55e20287c0 Mon Sep 17 00:00:00 2001 From: bennykok Date: Tue, 17 Sep 2024 18:57:15 -0700 Subject: [PATCH 01/14] chore: accept cd_token --- custom_routes.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/custom_routes.py b/custom_routes.py index 86caee0..2c0a55a 100644 --- a/custom_routes.py +++ b/custom_routes.py @@ -368,14 +368,17 @@ def send_prompt(sid: str, inputs: StreamingPrompt): @server.PromptServer.instance.routes.post("/comfyui-deploy/run") async def comfy_deploy_run(request): # Extract the bearer token from the Authorization header - auth_header = request.headers.get('Authorization') - token = None - if auth_header: - parts = auth_header.split() - if len(parts) == 2 and parts[0].lower() == 'bearer': - token = parts[1] - data = await request.json() + + if "cd_token" in data: + token = data["cd_token"] + else: + auth_header = request.headers.get('Authorization') + token = None + if auth_header: + parts = auth_header.split() + if len(parts) == 2 and parts[0].lower() == 'bearer': + token = parts[1] # In older version, we use workflow_api, but this has inputs already swapped in nextjs frontend, which is tricky workflow_api = data.get("workflow_api_raw") From b8dded153546fbf129eaf1dac8699c19ef25172b Mon Sep 17 00:00:00 2001 From: bennykok Date: Tue, 17 Sep 2024 20:26:39 -0700 Subject: [PATCH 02/14] Revert "fix: roll back to unique session per request" This reverts commit 5a78ca97bd1eeeb8de6d96a18aa9f1a2d51869b6. --- custom_routes.py | 121 +++++++++++++++++++++++------------------------ 1 file changed, 60 insertions(+), 61 deletions(-) diff --git a/custom_routes.py b/custom_routes.py index 2c0a55a..4d7414e 100644 --- a/custom_routes.py +++ b/custom_routes.py @@ -28,27 +28,27 @@ from aiohttp import web, ClientSession, ClientError, ClientTimeout import atexit # Global session -# client_session = None +client_session = None # def create_client_session(): # global client_session # if client_session is None: # client_session = aiohttp.ClientSession() -# async def ensure_client_session(): -# global client_session -# if client_session is None: -# client_session = aiohttp.ClientSession() +async def ensure_client_session(): + global client_session + if client_session is None: + client_session = aiohttp.ClientSession() -# async def cleanup(): -# global client_session -# if client_session: -# await client_session.close() +async def cleanup(): + global client_session + if client_session: + await client_session.close() def exit_handler(): print("Exiting the application. Initiating cleanup...") - # loop = asyncio.get_event_loop() - # loop.run_until_complete(cleanup()) + loop = asyncio.get_event_loop() + loop.run_until_complete(cleanup()) atexit.register(exit_handler) @@ -60,63 +60,62 @@ print(f"max_retries: {max_retries}, retry_delay_multiplier: {retry_delay_multipl import time async def async_request_with_retry(method, url, disable_timeout=False, token=None, **kwargs): - # global client_session - # await ensure_client_session() - async with aiohttp.ClientSession() as client_session: - retry_delay = 1 # Start with 1 second delay - initial_timeout = 5 # 5 seconds timeout for the initial connection + global client_session + await ensure_client_session() + retry_delay = 1 # Start with 1 second delay + initial_timeout = 5 # 5 seconds timeout for the initial connection - start_time = time.time() - for attempt in range(max_retries): - try: - if not disable_timeout: - timeout = ClientTimeout(total=None, connect=initial_timeout) - kwargs['timeout'] = timeout + start_time = time.time() + for attempt in range(max_retries): + try: + if not disable_timeout: + timeout = ClientTimeout(total=None, connect=initial_timeout) + kwargs['timeout'] = timeout - if token is not None: - if 'headers' not in kwargs: - kwargs['headers'] = {} - kwargs['headers']['Authorization'] = f"Bearer {token}" + if token is not None: + if 'headers' not in kwargs: + kwargs['headers'] = {} + kwargs['headers']['Authorization'] = f"Bearer {token}" - request_start = time.time() - async with client_session.request(method, url, **kwargs) as response: - request_end = time.time() - logger.info(f"Request attempt {attempt + 1} took {request_end - request_start:.2f} seconds") - - if response.status != 200: - error_body = await response.text() - logger.error(f"Request failed with status {response.status} and body {error_body}") - # raise Exception(f"Request failed with status {response.status}") - - response.raise_for_status() - if method.upper() == 'GET': - await response.read() - - total_time = time.time() - start_time - logger.info(f"Request succeeded after {total_time:.2f} seconds (attempt {attempt + 1}/{max_retries})") - return response - except asyncio.TimeoutError: - logger.warning(f"Request timed out after {initial_timeout} seconds (attempt {attempt + 1}/{max_retries})") - except ClientError as e: - end_time = time.time() - logger.error(f"Request failed (attempt {attempt + 1}/{max_retries}): {e}") - logger.error(f"Time taken for failed attempt: {end_time - request_start:.2f} seconds") - logger.error(f"Total time elapsed: {end_time - start_time:.2f} seconds") + request_start = time.time() + async with client_session.request(method, url, **kwargs) as response: + request_end = time.time() + logger.info(f"Request attempt {attempt + 1} took {request_end - request_start:.2f} seconds") - # Log the response body for ClientError as well - if hasattr(e, 'response') and e.response is not None: - error_body = await e.response.text() - logger.error(f"Error response body: {error_body}") + if response.status != 200: + error_body = await response.text() + logger.error(f"Request failed with status {response.status} and body {error_body}") + # raise Exception(f"Request failed with status {response.status}") - if attempt == max_retries - 1: - logger.error(f"Request failed after {max_retries} attempts: {e}") - raise + response.raise_for_status() + if method.upper() == 'GET': + await response.read() + + total_time = time.time() - start_time + logger.info(f"Request succeeded after {total_time:.2f} seconds (attempt {attempt + 1}/{max_retries})") + return response + except asyncio.TimeoutError: + logger.warning(f"Request timed out after {initial_timeout} seconds (attempt {attempt + 1}/{max_retries})") + except ClientError as e: + end_time = time.time() + logger.error(f"Request failed (attempt {attempt + 1}/{max_retries}): {e}") + logger.error(f"Time taken for failed attempt: {end_time - request_start:.2f} seconds") + logger.error(f"Total time elapsed: {end_time - start_time:.2f} seconds") - await asyncio.sleep(retry_delay) - retry_delay *= retry_delay_multiplier + # Log the response body for ClientError as well + if hasattr(e, 'response') and e.response is not None: + error_body = await e.response.text() + logger.error(f"Error response body: {error_body}") + + if attempt == max_retries - 1: + logger.error(f"Request failed after {max_retries} attempts: {e}") + raise + + await asyncio.sleep(retry_delay) + retry_delay *= retry_delay_multiplier - total_time = time.time() - start_time - raise Exception(f"Request failed after {max_retries} attempts and {total_time:.2f} seconds") + total_time = time.time() - start_time + raise Exception(f"Request failed after {max_retries} attempts and {total_time:.2f} seconds") from logging import basicConfig, getLogger From 0083b38dcc53f073aeaf998349849dac73038b54 Mon Sep 17 00:00:00 2001 From: bennykok Date: Tue, 17 Sep 2024 20:44:44 -0700 Subject: [PATCH 03/14] chore: log image size --- custom_routes.py | 1 + 1 file changed, 1 insertion(+) diff --git a/custom_routes.py b/custom_routes.py index 4d7414e..7c98e41 100644 --- a/custom_routes.py +++ b/custom_routes.py @@ -1240,6 +1240,7 @@ async def upload_file(prompt_id, filename, subfolder=None, content_type="image/p target_url = f"{file_upload_endpoint}?file_name={filename}&run_id={prompt_id}&type={content_type}&version=v2" start_time = time.time() # Start timing here + logger.info(f"Image size: {size}") logger.info(f"Target URL: {target_url}") result = await async_request_with_retry("GET", target_url, disable_timeout=True, token=token) end_time = time.time() # End timing after the request is complete From 0d1537963c1b7546695cac3a13abd018e6fa9d0d Mon Sep 17 00:00:00 2001 From: bennykok Date: Tue, 17 Sep 2024 21:48:42 -0700 Subject: [PATCH 04/14] fix --- custom_routes.py | 42 ++++++++++++++++++++++++++---------------- 1 file changed, 26 insertions(+), 16 deletions(-) diff --git a/custom_routes.py b/custom_routes.py index 7c98e41..7a229be 100644 --- a/custom_routes.py +++ b/custom_routes.py @@ -1363,7 +1363,7 @@ async def update_file_status(prompt_id: str, data, uploading, have_error=False, async def handle_upload(prompt_id: str, data, key: str, content_type_key: str, default_content_type: str): items = data.get(key, []) - # upload_tasks = [] + upload_tasks = [] for item in items: # Skipping temp files @@ -1379,33 +1379,43 @@ async def handle_upload(prompt_id: str, data, key: str, content_type_key: str, d elif file_extension == '.webp': file_type = 'image/webp' - # upload_tasks.append(upload_file( - # prompt_id, - # item.get("filename"), - # subfolder=item.get("subfolder"), - # type=item.get("type"), - # content_type=file_type, - # item=item - # )) - await upload_file( + upload_tasks.append(upload_file( prompt_id, item.get("filename"), subfolder=item.get("subfolder"), type=item.get("type"), content_type=file_type, item=item - ) + )) + # await upload_file( + # prompt_id, + # item.get("filename"), + # subfolder=item.get("subfolder"), + # type=item.get("type"), + # content_type=file_type, + # item=item + # ) # Execute all upload tasks concurrently - # await asyncio.gather(*upload_tasks) + await asyncio.gather(*upload_tasks) # Upload files in the background async def upload_in_background(prompt_id: str, data, node_id=None, have_upload=True, node_meta=None): try: - await handle_upload(prompt_id, data, 'images', "content_type", "image/png") - await handle_upload(prompt_id, data, 'files', "content_type", "image/png") - await handle_upload(prompt_id, data, 'gifs', "format", "image/gif") - await handle_upload(prompt_id, data, 'mesh', "format", "application/octet-stream") + # await handle_upload(prompt_id, data, 'images', "content_type", "image/png") + # await handle_upload(prompt_id, data, 'files', "content_type", "image/png") + # await handle_upload(prompt_id, data, 'gifs', "format", "image/gif") + # await handle_upload(prompt_id, data, 'mesh', "format", "application/octet-stream") + upload_tasks = [ + handle_upload(prompt_id, data, "images", "content_type", "image/png"), + handle_upload(prompt_id, data, "files", "content_type", "image/png"), + handle_upload(prompt_id, data, "gifs", "format", "image/gif"), + handle_upload( + prompt_id, data, "mesh", "format", "application/octet-stream" + ), + ] + + await asyncio.gather(*upload_tasks) status_endpoint = prompt_metadata[prompt_id].status_endpoint token = prompt_metadata[prompt_id].token From 1243fa4e58d6c2453ab8b4c0ce2645a4d5adfe20 Mon Sep 17 00:00:00 2001 From: bennykok Date: Tue, 17 Sep 2024 22:55:08 -0700 Subject: [PATCH 05/14] fix --- custom_routes.py | 70 +++++++++++++++++++++++++----------------------- 1 file changed, 36 insertions(+), 34 deletions(-) diff --git a/custom_routes.py b/custom_routes.py index 7a229be..067f959 100644 --- a/custom_routes.py +++ b/custom_routes.py @@ -1234,43 +1234,45 @@ async def upload_file(prompt_id, filename, subfolder=None, content_type="image/p prompt_id = quote(prompt_id) content_type = quote(content_type) + target_url = f"{file_upload_endpoint}?file_name={filename}&run_id={prompt_id}&type={content_type}&version=v2" + + start_time = time.time() # Start timing here + logger.info(f"Target URL: {target_url}") + result = await async_request_with_retry("GET", target_url, disable_timeout=True, token=token) + end_time = time.time() # End timing after the request is complete + logger.info("Time taken for getting file upload endpoint: {:.2f} seconds".format(end_time - start_time)) + ok = await result.json() + + logger.info(f"Result: {ok}") + async with aiofiles.open(file, 'rb') as f: data = await f.read() - size = str(len(data)) - target_url = f"{file_upload_endpoint}?file_name={filename}&run_id={prompt_id}&type={content_type}&version=v2" - - start_time = time.time() # Start timing here - logger.info(f"Image size: {size}") - logger.info(f"Target URL: {target_url}") - result = await async_request_with_retry("GET", target_url, disable_timeout=True, token=token) - end_time = time.time() # End timing after the request is complete - logger.info("Time taken for getting file upload endpoint: {:.2f} seconds".format(end_time - start_time)) - ok = await result.json() - logger.info(f"Result: {ok}") - - start_time = time.time() # Start timing here - headers = { - "Content-Type": content_type, - # "Content-Length": size, - } - - if ok.get('include_acl') is True: - headers["x-amz-acl"] = "public-read" - - # response = requests.put(ok.get("url"), headers=headers, data=data) - response = await async_request_with_retry('PUT', ok.get("url"), headers=headers, data=data) - logger.info(f"Upload file response status: {response.status}, status text: {response.reason}") - end_time = time.time() # End timing after the request is complete - logger.info("Upload time: {:.2f} seconds".format(end_time - start_time)) - - if item is not None: - file_download_url = ok.get("download_url") - if file_download_url is not None: - item["url"] = file_download_url - item["upload_duration"] = end_time - start_time - if ok.get("is_public") is not None: - item["is_public"] = ok.get("is_public") + size = str(len(data)) + logger.info(f"Image size: {size}") + + start_time = time.time() # Start timing here + headers = { + "Content-Type": content_type, + # "Content-Length": size, + } + + if ok.get('include_acl') is True: + headers["x-amz-acl"] = "public-read" + + # response = requests.put(ok.get("url"), headers=headers, data=data) + response = await async_request_with_retry('PUT', ok.get("url"), headers=headers, data=data) + logger.info(f"Upload file response status: {response.status}, status text: {response.reason}") + end_time = time.time() # End timing after the request is complete + logger.info("Upload time: {:.2f} seconds".format(end_time - start_time)) + + if item is not None: + file_download_url = ok.get("download_url") + if file_download_url is not None: + item["url"] = file_download_url + item["upload_duration"] = end_time - start_time + if ok.get("is_public") is not None: + item["is_public"] = ok.get("is_public") def have_pending_upload(prompt_id): if prompt_id in prompt_metadata and len(prompt_metadata[prompt_id].uploading_nodes) > 0: From ba9b187dcc844d7a72e64c1b7dc63889dabce654 Mon Sep 17 00:00:00 2001 From: bennykok Date: Tue, 17 Sep 2024 22:59:27 -0700 Subject: [PATCH 06/14] fix --- custom_routes.py | 46 ++++++++++++++++++++++++++++------------------ 1 file changed, 28 insertions(+), 18 deletions(-) diff --git a/custom_routes.py b/custom_routes.py index 067f959..7f4e905 100644 --- a/custom_routes.py +++ b/custom_routes.py @@ -1199,6 +1199,16 @@ async def update_run(prompt_id: str, status: Status): }) +async def file_sender(file_object, chunk_size): + while True: + chunk = await file_object.read(chunk_size) + if not chunk: + break + yield chunk + + +chunk_size = 1024 * 1024 # 1MB chunks, adjust as needed + async def upload_file(prompt_id, filename, subfolder=None, content_type="image/png", type="output", item=None): """ Uploads file to S3 bucket using S3 client object @@ -1246,25 +1256,25 @@ async def upload_file(prompt_id, filename, subfolder=None, content_type="image/p logger.info(f"Result: {ok}") async with aiofiles.open(file, 'rb') as f: - data = await f.read() + # data = await f.read() - size = str(len(data)) - logger.info(f"Image size: {size}") - - start_time = time.time() # Start timing here - headers = { - "Content-Type": content_type, - # "Content-Length": size, - } - - if ok.get('include_acl') is True: - headers["x-amz-acl"] = "public-read" - - # response = requests.put(ok.get("url"), headers=headers, data=data) - response = await async_request_with_retry('PUT', ok.get("url"), headers=headers, data=data) - logger.info(f"Upload file response status: {response.status}, status text: {response.reason}") - end_time = time.time() # End timing after the request is complete - logger.info("Upload time: {:.2f} seconds".format(end_time - start_time)) + # size = str(len(data)) + # logger.info(f"Image size: {size}") + + start_time = time.time() # Start timing here + headers = { + "Content-Type": content_type, + # "Content-Length": size, + } + + if ok.get('include_acl') is True: + headers["x-amz-acl"] = "public-read" + + # response = requests.put(ok.get("url"), headers=headers, data=data) + response = await async_request_with_retry('PUT', ok.get("url"), headers=headers, data=file_sender(f, chunk_size)) + logger.info(f"Upload file response status: {response.status}, status text: {response.reason}") + end_time = time.time() # End timing after the request is complete + logger.info("Upload time: {:.2f} seconds".format(end_time - start_time)) if item is not None: file_download_url = ok.get("download_url") From a403daa314dc0f8773882f0092106c07f50eb6ad Mon Sep 17 00:00:00 2001 From: bennykok Date: Tue, 17 Sep 2024 23:09:42 -0700 Subject: [PATCH 07/14] fix --- custom_routes.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/custom_routes.py b/custom_routes.py index 7f4e905..a7de81b 100644 --- a/custom_routes.py +++ b/custom_routes.py @@ -1256,22 +1256,21 @@ async def upload_file(prompt_id, filename, subfolder=None, content_type="image/p logger.info(f"Result: {ok}") async with aiofiles.open(file, 'rb') as f: - # data = await f.read() - - # size = str(len(data)) + data = await f.read() + size = str(len(data)) # logger.info(f"Image size: {size}") start_time = time.time() # Start timing here headers = { "Content-Type": content_type, - # "Content-Length": size, + "Content-Length": size, } if ok.get('include_acl') is True: headers["x-amz-acl"] = "public-read" # response = requests.put(ok.get("url"), headers=headers, data=data) - response = await async_request_with_retry('PUT', ok.get("url"), headers=headers, data=file_sender(f, chunk_size)) + response = await async_request_with_retry('PUT', ok.get("url"), headers=headers, data=data) logger.info(f"Upload file response status: {response.status}, status text: {response.reason}") end_time = time.time() # End timing after the request is complete logger.info("Upload time: {:.2f} seconds".format(end_time - start_time)) From 7ed4284a6f1fac84e4998f68f3a1a7888fd918f8 Mon Sep 17 00:00:00 2001 From: bennykok Date: Tue, 17 Sep 2024 23:25:19 -0700 Subject: [PATCH 08/14] fix --- custom_routes.py | 55 ++++++++++++++++++++++++++++++++++++------------ 1 file changed, 41 insertions(+), 14 deletions(-) diff --git a/custom_routes.py b/custom_routes.py index a7de81b..c47e97e 100644 --- a/custom_routes.py +++ b/custom_routes.py @@ -1260,20 +1260,47 @@ async def upload_file(prompt_id, filename, subfolder=None, content_type="image/p size = str(len(data)) # logger.info(f"Image size: {size}") - start_time = time.time() # Start timing here - headers = { - "Content-Type": content_type, - "Content-Length": size, - } - - if ok.get('include_acl') is True: - headers["x-amz-acl"] = "public-read" - - # response = requests.put(ok.get("url"), headers=headers, data=data) - response = await async_request_with_retry('PUT', ok.get("url"), headers=headers, data=data) - logger.info(f"Upload file response status: {response.status}, status text: {response.reason}") - end_time = time.time() # End timing after the request is complete - logger.info("Upload time: {:.2f} seconds".format(end_time - start_time)) + start_time = time.time() # Start timing here + headers = { + "Content-Type": content_type, + "Content-Length": size, + } + + if ok.get('include_acl') is True: + headers["x-amz-acl"] = "public-read" + + # response = requests.put(ok.get("url"), headers=headers, data=data) + # response = await async_request_with_retry('PUT', ok.get("url"), headers=headers, data=data) + # logger.info(f"Upload file response status: {response.status}, status text: {response.reason}") + + async with aiohttp.ClientSession() as session: + try: + async with session.put(ok.get("url"), headers=headers, data=data) as response: + upload_duration = time.time() - start_time + logger.info(f"Upload completed in {upload_duration:.2f} seconds") + logger.info(f"Upload response status: {response.status}") + logger.info(f"Upload response reason: {response.reason}") + + response_headers = response.headers + logger.info(f"Response headers: {dict(response_headers)}") + + response_text = await response.text() + logger.info(f"Response body: {response_text[:1000]}...") # Log first 1000 characters of response body + + if response.status not in [200, 201, 204]: + logger.error(f"Upload failed with status {response.status}") + logger.error(f"Full response body: {response_text}") + else: + logger.info("Upload successful") + + except aiohttp.ClientError as e: + logger.error(f"Client error during upload: {str(e)}") + except Exception as e: + logger.error(f"Unexpected error during upload: {str(e)}") + logger.error(traceback.format_exc()) + + end_time = time.time() # End timing after the request is complete + logger.info("Upload time: {:.2f} seconds".format(end_time - start_time)) if item is not None: file_download_url = ok.get("download_url") From aa47f3523f32a632cb0f40c80a5dfbc32caa18d0 Mon Sep 17 00:00:00 2001 From: bennykok Date: Tue, 17 Sep 2024 23:36:24 -0700 Subject: [PATCH 09/14] fix --- custom_routes.py | 82 +++++++++++++++++++++++++----------------------- 1 file changed, 42 insertions(+), 40 deletions(-) diff --git a/custom_routes.py b/custom_routes.py index c47e97e..d747ea4 100644 --- a/custom_routes.py +++ b/custom_routes.py @@ -1260,47 +1260,49 @@ async def upload_file(prompt_id, filename, subfolder=None, content_type="image/p size = str(len(data)) # logger.info(f"Image size: {size}") - start_time = time.time() # Start timing here - headers = { - "Content-Type": content_type, - "Content-Length": size, - } - - if ok.get('include_acl') is True: - headers["x-amz-acl"] = "public-read" - - # response = requests.put(ok.get("url"), headers=headers, data=data) - # response = await async_request_with_retry('PUT', ok.get("url"), headers=headers, data=data) - # logger.info(f"Upload file response status: {response.status}, status text: {response.reason}") - - async with aiohttp.ClientSession() as session: - try: - async with session.put(ok.get("url"), headers=headers, data=data) as response: - upload_duration = time.time() - start_time - logger.info(f"Upload completed in {upload_duration:.2f} seconds") - logger.info(f"Upload response status: {response.status}") - logger.info(f"Upload response reason: {response.reason}") - - response_headers = response.headers - logger.info(f"Response headers: {dict(response_headers)}") - - response_text = await response.text() - logger.info(f"Response body: {response_text[:1000]}...") # Log first 1000 characters of response body - - if response.status not in [200, 201, 204]: - logger.error(f"Upload failed with status {response.status}") - logger.error(f"Full response body: {response_text}") - else: - logger.info("Upload successful") + start_time = time.time() # Start timing here + headers = { + "Content-Type": content_type, + "Content-Length": size, + } - except aiohttp.ClientError as e: - logger.error(f"Client error during upload: {str(e)}") - except Exception as e: - logger.error(f"Unexpected error during upload: {str(e)}") - logger.error(traceback.format_exc()) - - end_time = time.time() # End timing after the request is complete - logger.info("Upload time: {:.2f} seconds".format(end_time - start_time)) + logger.info(headers) + + if ok.get('include_acl') is True: + headers["x-amz-acl"] = "public-read" + + # response = requests.put(ok.get("url"), headers=headers, data=data) + # response = await async_request_with_retry('PUT', ok.get("url"), headers=headers, data=data) + # logger.info(f"Upload file response status: {response.status}, status text: {response.reason}") + + async with aiohttp.ClientSession() as session: + try: + async with session.put(ok.get("url"), headers=headers, data=data) as response: + upload_duration = time.time() - start_time + logger.info(f"Upload completed in {upload_duration:.2f} seconds") + logger.info(f"Upload response status: {response.status}") + logger.info(f"Upload response reason: {response.reason}") + + response_headers = response.headers + logger.info(f"Response headers: {dict(response_headers)}") + + response_text = await response.text() + logger.info(f"Response body: {response_text[:1000]}...") # Log first 1000 characters of response body + + if response.status not in [200, 201, 204]: + logger.error(f"Upload failed with status {response.status}") + logger.error(f"Full response body: {response_text}") + else: + logger.info("Upload successful") + + except aiohttp.ClientError as e: + logger.error(f"Client error during upload: {str(e)}") + except Exception as e: + logger.error(f"Unexpected error during upload: {str(e)}") + logger.error(traceback.format_exc()) + + end_time = time.time() # End timing after the request is complete + logger.info("Upload time: {:.2f} seconds".format(end_time - start_time)) if item is not None: file_download_url = ok.get("download_url") From 61acffd355aeab6cc225cde039a12e3e6d7b6916 Mon Sep 17 00:00:00 2001 From: bennykok Date: Wed, 18 Sep 2024 08:20:35 -0700 Subject: [PATCH 10/14] fix --- custom_routes.py | 59 +++++++++++++++++++++++++++++------------------- 1 file changed, 36 insertions(+), 23 deletions(-) diff --git a/custom_routes.py b/custom_routes.py index d747ea4..1ca3832 100644 --- a/custom_routes.py +++ b/custom_routes.py @@ -24,7 +24,7 @@ from typing import Dict, List, Union, Any, Optional from PIL import Image import copy import struct -from aiohttp import web, ClientSession, ClientError, ClientTimeout +from aiohttp import web, ClientSession, ClientError, ClientTimeout, ClientResponseError import atexit # Global session @@ -1209,6 +1209,37 @@ async def file_sender(file_object, chunk_size): chunk_size = 1024 * 1024 # 1MB chunks, adjust as needed +async def upload_with_retry(session, url, headers, data, max_retries=3, initial_delay=1): + start_time = time.time() # Start timing here + for attempt in range(max_retries): + try: + async with session.put(url, headers=headers, data=data) as response: + upload_duration = time.time() - start_time + logger.info(f"Upload attempt {attempt + 1} completed in {upload_duration:.2f} seconds") + logger.info(f"Upload response status: {response.status}") + + response.raise_for_status() # This will raise an exception for 4xx and 5xx status codes + + response_text = await response.text() + logger.info(f"Response body: {response_text[:1000]}...") + + logger.info("Upload successful") + return response # Successful upload, exit the retry loop + + except (ClientError, ClientResponseError) as e: + logger.error(f"Upload attempt {attempt + 1} failed: {str(e)}") + if attempt < max_retries - 1: # If it's not the last attempt + delay = initial_delay * (2 ** attempt) # Exponential backoff + logger.info(f"Retrying in {delay} seconds...") + await asyncio.sleep(delay) + else: + logger.error("Max retries reached. Upload failed.") + raise # Re-raise the last exception if all retries are exhausted + except Exception as e: + logger.error(f"Unexpected error during upload: {str(e)}") + logger.error(traceback.format_exc()) + raise # Re-raise unexpected exceptions immediately + async def upload_file(prompt_id, filename, subfolder=None, content_type="image/png", type="output", item=None): """ Uploads file to S3 bucket using S3 client object @@ -1277,29 +1308,11 @@ async def upload_file(prompt_id, filename, subfolder=None, content_type="image/p async with aiohttp.ClientSession() as session: try: - async with session.put(ok.get("url"), headers=headers, data=data) as response: - upload_duration = time.time() - start_time - logger.info(f"Upload completed in {upload_duration:.2f} seconds") - logger.info(f"Upload response status: {response.status}") - logger.info(f"Upload response reason: {response.reason}") - - response_headers = response.headers - logger.info(f"Response headers: {dict(response_headers)}") - - response_text = await response.text() - logger.info(f"Response body: {response_text[:1000]}...") # Log first 1000 characters of response body - - if response.status not in [200, 201, 204]: - logger.error(f"Upload failed with status {response.status}") - logger.error(f"Full response body: {response_text}") - else: - logger.info("Upload successful") - - except aiohttp.ClientError as e: - logger.error(f"Client error during upload: {str(e)}") + response = await upload_with_retry(session, ok.get("url"), headers, data) + # Process successful response... except Exception as e: - logger.error(f"Unexpected error during upload: {str(e)}") - logger.error(traceback.format_exc()) + # Handle final failure... + logger.error(f"Upload ultimately failed: {str(e)}") end_time = time.time() # End timing after the request is complete logger.info("Upload time: {:.2f} seconds".format(end_time - start_time)) From 764a8fee82bdb0591225982e912ec4b11e7be340 Mon Sep 17 00:00:00 2001 From: Fawaz Kadem Date: Wed, 18 Sep 2024 17:00:51 -0700 Subject: [PATCH 11/14] Add new external deploy node for face models (#66) --- comfy-nodes/external_face_model.py | 107 ++++++++++++++++++++++++ custom_routes.py | 3 + web/src/components/customInputNodes.tsx | 1 + 3 files changed, 111 insertions(+) create mode 100644 comfy-nodes/external_face_model.py diff --git a/comfy-nodes/external_face_model.py b/comfy-nodes/external_face_model.py new file mode 100644 index 0000000..237daca --- /dev/null +++ b/comfy-nodes/external_face_model.py @@ -0,0 +1,107 @@ +from PIL import Image, ImageOps +import numpy as np +import torch +import folder_paths + + +class AnyType(str): + def __ne__(self, __value: object) -> bool: + return False + + +WILDCARD = AnyType("*") + + +class ComfyUIDeployExternalFaceModel: + @classmethod + def INPUT_TYPES(s): + return { + "required": { + "input_id": ( + "STRING", + {"multiline": False, "default": "input_reactor_face_model"}, + ), + }, + "optional": { + "default_face_model_name": ( + folder_paths.get_filename_list("reactor/faces"), + ), + "face_model_save_name": ( # if `default_face_model_name` is a link to download a file, we will attempt to save it with this name + "STRING", + {"multiline": False, "default": ""}, + ), + "display_name": ( + "STRING", + {"multiline": False, "default": ""}, + ), + "description": ( + "STRING", + {"multiline": True, "default": ""}, + ), + "face_model_url": ( + "STRING", + {"multiline": False, "default": ""}, + ), + }, + } + + RETURN_TYPES = (WILDCARD,) + RETURN_NAMES = ("path",) + + FUNCTION = "run" + + CATEGORY = "deploy" + + def run( + self, + input_id, + default_face_model_name=None, + face_model_save_name=None, + display_name=None, + description=None, + face_model_url=None, + ): + import requests + import os + import uuid + + if face_model_url and face_model_url.startswith("http"): + if face_model_save_name: + existing_face_models = folder_paths.get_filename_list("reactor/faces") + # Check if face_model_save_name exists in the list + if face_model_save_name in existing_face_models: + print(f"using face model: {face_model_save_name}") + return (face_model_save_name,) + else: + face_model_save_name = str(uuid.uuid4()) + ".safetensors" + print(face_model_save_name) + print(folder_paths.folder_names_and_paths["reactor/faces"][0][0]) + destination_path = os.path.join( + folder_paths.folder_names_and_paths["reactor/faces"][0][0], + face_model_save_name, + ) + + print(destination_path) + print( + "Downloading external face model - " + + face_model_url + + " to " + + destination_path + ) + response = requests.get( + face_model_url, + headers={"User-Agent": "Mozilla/5.0"}, + allow_redirects=True, + ) + with open(destination_path, "wb") as out_file: + out_file.write(response.content) + return (face_model_save_name,) + else: + print(f"using face model: {default_face_model_name}") + return (default_face_model_name,) + + +NODE_CLASS_MAPPINGS = {"ComfyUIDeployExternalFaceModel": ComfyUIDeployExternalFaceModel} +NODE_DISPLAY_NAME_MAPPINGS = { + "ComfyUIDeployExternalFaceModel": "External Face Model (ComfyUI Deploy)" +} diff --git a/custom_routes.py b/custom_routes.py index 1ca3832..4e3b5b0 100644 --- a/custom_routes.py +++ b/custom_routes.py @@ -327,6 +327,9 @@ def apply_inputs_to_workflow(workflow_api: Any, inputs: Any, sid: str = None): if value["class_type"] == "ComfyUIDeployExternalBoolean": value["inputs"]["default_value"] = new_value + if value["class_type"] == "ComfyUIDeployExternalFaceModel": + value["inputs"]["face_model_url"] = new_value + def send_prompt(sid: str, inputs: StreamingPrompt): # workflow_api = inputs.workflow_api workflow_api = copy.deepcopy(inputs.workflow_api) diff --git a/web/src/components/customInputNodes.tsx b/web/src/components/customInputNodes.tsx index d6f9d68..7b2b7e7 100644 --- a/web/src/components/customInputNodes.tsx +++ b/web/src/components/customInputNodes.tsx @@ -6,4 +6,5 @@ export const customInputNodes: Record = { ComfyUIDeployExternalNumberInt: "integer", ComfyUIDeployExternalLora: "string - (public lora download url)", ComfyUIDeployExternalCheckpoint: "string - (public checkpoints download url)", + ComfyUIDeployExternalFaceModel: "string - (public face model download url)", }; From 02430ee62d3681221f9d94c11dbae91afb08710f Mon Sep 17 00:00:00 2001 From: bennykok Date: Fri, 20 Sep 2024 18:10:04 -0700 Subject: [PATCH 12/14] remove some logs --- custom_routes.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/custom_routes.py b/custom_routes.py index 4e3b5b0..3794cf1 100644 --- a/custom_routes.py +++ b/custom_routes.py @@ -80,11 +80,11 @@ async def async_request_with_retry(method, url, disable_timeout=False, token=Non request_start = time.time() async with client_session.request(method, url, **kwargs) as response: request_end = time.time() - logger.info(f"Request attempt {attempt + 1} took {request_end - request_start:.2f} seconds") + # logger.info(f"Request attempt {attempt + 1} took {request_end - request_start:.2f} seconds") if response.status != 200: error_body = await response.text() - logger.error(f"Request failed with status {response.status} and body {error_body}") + # logger.error(f"Request failed with status {response.status} and body {error_body}") # raise Exception(f"Request failed with status {response.status}") response.raise_for_status() @@ -92,7 +92,7 @@ async def async_request_with_retry(method, url, disable_timeout=False, token=Non await response.read() total_time = time.time() - start_time - logger.info(f"Request succeeded after {total_time:.2f} seconds (attempt {attempt + 1}/{max_retries})") + # logger.info(f"Request succeeded after {total_time:.2f} seconds (attempt {attempt + 1}/{max_retries})") return response except asyncio.TimeoutError: logger.warning(f"Request timed out after {initial_timeout} seconds (attempt {attempt + 1}/{max_retries})") From 8c5e5c4277c6e33f2d9b4be4038435871101d7f9 Mon Sep 17 00:00:00 2001 From: bennykok Date: Sat, 21 Sep 2024 10:39:34 -0700 Subject: [PATCH 13/14] feat: add ComfyUIDeployExternalTextAny --- comfy-nodes/external_text_any.py | 46 ++++++++++++++++++++++++++++++++ custom_routes.py | 2 +- 2 files changed, 47 insertions(+), 1 deletion(-) create mode 100644 comfy-nodes/external_text_any.py diff --git a/comfy-nodes/external_text_any.py b/comfy-nodes/external_text_any.py new file mode 100644 index 0000000..e84be41 --- /dev/null +++ b/comfy-nodes/external_text_any.py @@ -0,0 +1,46 @@ +class AnyType(str): + def __ne__(self, __value: object) -> bool: + return False + + +WILDCARD = AnyType("*") + +class ComfyUIDeployExternalTextAny: + @classmethod + def INPUT_TYPES(s): + return { + "required": { + "input_id": ( + "STRING", + {"multiline": False, "default": "input_text"}, + ), + }, + "optional": { + "default_value": ( + "STRING", + {"multiline": True, "default": ""}, + ), + "display_name": ( + "STRING", + {"multiline": False, "default": ""}, + ), + "description": ( + "STRING", + {"multiline": True, "default": ""}, + ), + } + } + + RETURN_TYPES = (WILDCARD,) + RETURN_NAMES = ("text",) + + FUNCTION = "run" + + CATEGORY = "text" + + def run(self, input_id, default_value=None, display_name=None, description=None): + return [default_value] + + +NODE_CLASS_MAPPINGS = {"ComfyUIDeployExternalTextAny": ComfyUIDeployExternalTextAny} +NODE_DISPLAY_NAME_MAPPINGS = {"ComfyUIDeployExternalTextAny": "External Text Any (ComfyUI Deploy)"} \ No newline at end of file diff --git a/custom_routes.py b/custom_routes.py index 3794cf1..68cbe7e 100644 --- a/custom_routes.py +++ b/custom_routes.py @@ -309,7 +309,7 @@ def apply_inputs_to_workflow(workflow_api: Any, inputs: Any, sid: str = None): value['inputs']["input_id"] = new_value # Fix for external text default value - if (value["class_type"] == "ComfyUIDeployExternalText"): + if (value["class_type"] == "ComfyUIDeployExternalText" or value["class_type"] == "ComfyUIDeployExternalTextAny"): value['inputs']["default_value"] = new_value if (value["class_type"] == "ComfyUIDeployExternalCheckpoint"): From aea456cba9f500d6191e09e60f1c70238c5ef939 Mon Sep 17 00:00:00 2001 From: bennykok Date: Sat, 21 Sep 2024 10:51:51 -0700 Subject: [PATCH 14/14] fix face loader extenal load --- comfy-nodes/external_face_model.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/comfy-nodes/external_face_model.py b/comfy-nodes/external_face_model.py index 237daca..3cc915e 100644 --- a/comfy-nodes/external_face_model.py +++ b/comfy-nodes/external_face_model.py @@ -24,7 +24,8 @@ class ComfyUIDeployExternalFaceModel: }, "optional": { "default_face_model_name": ( - folder_paths.get_filename_list("reactor/faces"), + "STRING", + {"multiline": False, "default": ""}, ), "face_model_save_name": ( # if `default_face_model_name` is a link to download a file, we will attempt to save it with this name "STRING",