diff --git a/comfy-nodes/external_face_model.py b/comfy-nodes/external_face_model.py new file mode 100644 index 0000000..3cc915e --- /dev/null +++ b/comfy-nodes/external_face_model.py @@ -0,0 +1,108 @@ +from PIL import Image, ImageOps +import numpy as np +import torch +import folder_paths + + +class AnyType(str): + def __ne__(self, __value: object) -> bool: + return False + + +WILDCARD = AnyType("*") + + +class ComfyUIDeployExternalFaceModel: + @classmethod + def INPUT_TYPES(s): + return { + "required": { + "input_id": ( + "STRING", + {"multiline": False, "default": "input_reactor_face_model"}, + ), + }, + "optional": { + "default_face_model_name": ( + "STRING", + {"multiline": False, "default": ""}, + ), + "face_model_save_name": ( # if `default_face_model_name` is a link to download a file, we will attempt to save it with this name + "STRING", + {"multiline": False, "default": ""}, + ), + "display_name": ( + "STRING", + {"multiline": False, "default": ""}, + ), + "description": ( + "STRING", + {"multiline": True, "default": ""}, + ), + "face_model_url": ( + "STRING", + {"multiline": False, "default": ""}, + ), + }, + } + + RETURN_TYPES = (WILDCARD,) + RETURN_NAMES = ("path",) + + FUNCTION = "run" + + CATEGORY = "deploy" + + def run( + self, + input_id, + default_face_model_name=None, + face_model_save_name=None, + display_name=None, + description=None, + face_model_url=None, + ): + import requests + import os + import uuid + + if face_model_url and face_model_url.startswith("http"): + if face_model_save_name: + existing_face_models = folder_paths.get_filename_list("reactor/faces") + # Check if face_model_save_name exists in the list + if face_model_save_name in existing_face_models: + print(f"using face model: {face_model_save_name}") + return (face_model_save_name,) + else: + face_model_save_name = str(uuid.uuid4()) + ".safetensors" + print(face_model_save_name) + print(folder_paths.folder_names_and_paths["reactor/faces"][0][0]) + destination_path = os.path.join( + folder_paths.folder_names_and_paths["reactor/faces"][0][0], + face_model_save_name, + ) + + print(destination_path) + print( + "Downloading external face model - " + + face_model_url + + " to " + + destination_path + ) + response = requests.get( + face_model_url, + headers={"User-Agent": "Mozilla/5.0"}, + allow_redirects=True, + ) + with open(destination_path, "wb") as out_file: + out_file.write(response.content) + return (face_model_save_name,) + else: + print(f"using face model: {default_face_model_name}") + return (default_face_model_name,) + + +NODE_CLASS_MAPPINGS = {"ComfyUIDeployExternalFaceModel": ComfyUIDeployExternalFaceModel} +NODE_DISPLAY_NAME_MAPPINGS = { + "ComfyUIDeployExternalFaceModel": "External Face Model (ComfyUI Deploy)" +} diff --git a/comfy-nodes/external_text_any.py b/comfy-nodes/external_text_any.py new file mode 100644 index 0000000..e84be41 --- /dev/null +++ b/comfy-nodes/external_text_any.py @@ -0,0 +1,46 @@ +class AnyType(str): + def __ne__(self, __value: object) -> bool: + return False + + +WILDCARD = AnyType("*") + +class ComfyUIDeployExternalTextAny: + @classmethod + def INPUT_TYPES(s): + return { + "required": { + "input_id": ( + "STRING", + {"multiline": False, "default": "input_text"}, + ), + }, + "optional": { + "default_value": ( + "STRING", + {"multiline": True, "default": ""}, + ), + "display_name": ( + "STRING", + {"multiline": False, "default": ""}, + ), + "description": ( + "STRING", + {"multiline": True, "default": ""}, + ), + } + } + + RETURN_TYPES = (WILDCARD,) + RETURN_NAMES = ("text",) + + FUNCTION = "run" + + CATEGORY = "text" + + def run(self, input_id, default_value=None, display_name=None, description=None): + return [default_value] + + +NODE_CLASS_MAPPINGS = {"ComfyUIDeployExternalTextAny": ComfyUIDeployExternalTextAny} +NODE_DISPLAY_NAME_MAPPINGS = {"ComfyUIDeployExternalTextAny": "External Text Any (ComfyUI Deploy)"} \ No newline at end of file diff --git a/custom_routes.py b/custom_routes.py index ba311ab..c8de436 100644 --- a/custom_routes.py +++ b/custom_routes.py @@ -24,32 +24,34 @@ from typing import Dict, List, Union, Any, Optional from PIL import Image import copy import struct -from aiohttp import web, ClientSession, ClientError, ClientTimeout +from aiohttp import web, ClientSession, ClientError, ClientTimeout, ClientResponseError import atexit # Global session -# client_session = None +client_session = None # def create_client_session(): # global client_session # if client_session is None: # client_session = aiohttp.ClientSession() -# async def ensure_client_session(): -# global client_session -# if client_session is None: -# client_session = aiohttp.ClientSession() -# async def cleanup(): -# global client_session -# if client_session: -# await client_session.close() +async def ensure_client_session(): + global client_session + if client_session is None: + client_session = aiohttp.ClientSession() + + +async def cleanup(): + global client_session + if client_session: + await client_session.close() def exit_handler(): print("Exiting the application. Initiating cleanup...") - # loop = asyncio.get_event_loop() - # loop.run_until_complete(cleanup()) + loop = asyncio.get_event_loop() + loop.run_until_complete(cleanup()) atexit.register(exit_handler) @@ -65,77 +67,68 @@ import time async def async_request_with_retry( method, url, disable_timeout=False, token=None, **kwargs ): - # global client_session - # await ensure_client_session() - async with aiohttp.ClientSession() as client_session: - retry_delay = 1 # Start with 1 second delay - initial_timeout = 5 # 5 seconds timeout for the initial connection + global client_session + await ensure_client_session() + retry_delay = 1 # Start with 1 second delay + initial_timeout = 5 # 5 seconds timeout for the initial connection - start_time = time.time() - for attempt in range(max_retries): - try: - if not disable_timeout: - timeout = ClientTimeout(total=None, connect=initial_timeout) - kwargs["timeout"] = timeout + start_time = time.time() + for attempt in range(max_retries): + try: + if not disable_timeout: + timeout = ClientTimeout(total=None, connect=initial_timeout) + kwargs["timeout"] = timeout - if token is not None: - if "headers" not in kwargs: - kwargs["headers"] = {} - kwargs["headers"]["Authorization"] = f"Bearer {token}" + if token is not None: + if "headers" not in kwargs: + kwargs["headers"] = {} + kwargs["headers"]["Authorization"] = f"Bearer {token}" - request_start = time.time() - async with client_session.request(method, url, **kwargs) as response: - request_end = time.time() - logger.info( - f"Request attempt {attempt + 1} took {request_end - request_start:.2f} seconds" - ) + request_start = time.time() + async with client_session.request(method, url, **kwargs) as response: + request_end = time.time() + # logger.info(f"Request attempt {attempt + 1} took {request_end - request_start:.2f} seconds") - if response.status != 200: - error_body = await response.text() - logger.error( - f"Request failed with status {response.status} and body {error_body}" - ) - # raise Exception(f"Request failed with status {response.status}") + if response.status != 200: + error_body = await response.text() + # logger.error(f"Request failed with status {response.status} and body {error_body}") + # raise Exception(f"Request failed with status {response.status}") - response.raise_for_status() - if method.upper() == "GET": - await response.read() + response.raise_for_status() + if method.upper() == "GET": + await response.read() - total_time = time.time() - start_time - logger.info( - f"Request succeeded after {total_time:.2f} seconds (attempt {attempt + 1}/{max_retries})" - ) - return response - except asyncio.TimeoutError: - logger.warning( - f"Request timed out after {initial_timeout} seconds (attempt {attempt + 1}/{max_retries})" - ) - except ClientError as e: - end_time = time.time() - logger.error( - f"Request failed (attempt {attempt + 1}/{max_retries}): {e}" - ) - logger.error( - f"Time taken for failed attempt: {end_time - request_start:.2f} seconds" - ) - logger.error(f"Total time elapsed: {end_time - start_time:.2f} seconds") + total_time = time.time() - start_time + # logger.info(f"Request succeeded after {total_time:.2f} seconds (attempt {attempt + 1}/{max_retries})") + return response + except asyncio.TimeoutError: + logger.warning( + f"Request timed out after {initial_timeout} seconds (attempt {attempt + 1}/{max_retries})" + ) + except ClientError as e: + end_time = time.time() + logger.error(f"Request failed (attempt {attempt + 1}/{max_retries}): {e}") + logger.error( + f"Time taken for failed attempt: {end_time - request_start:.2f} seconds" + ) + logger.error(f"Total time elapsed: {end_time - start_time:.2f} seconds") - # Log the response body for ClientError as well - if hasattr(e, "response") and e.response is not None: - error_body = await e.response.text() - logger.error(f"Error response body: {error_body}") + # Log the response body for ClientError as well + if hasattr(e, "response") and e.response is not None: + error_body = await e.response.text() + logger.error(f"Error response body: {error_body}") - if attempt == max_retries - 1: - logger.error(f"Request failed after {max_retries} attempts: {e}") - raise + if attempt == max_retries - 1: + logger.error(f"Request failed after {max_retries} attempts: {e}") + raise - await asyncio.sleep(retry_delay) - retry_delay *= retry_delay_multiplier + await asyncio.sleep(retry_delay) + retry_delay *= retry_delay_multiplier - total_time = time.time() - start_time - raise Exception( - f"Request failed after {max_retries} attempts and {total_time:.2f} seconds" - ) + total_time = time.time() - start_time + raise Exception( + f"Request failed after {max_retries} attempts and {total_time:.2f} seconds" + ) from logging import basicConfig, getLogger @@ -362,7 +355,10 @@ def apply_inputs_to_workflow(workflow_api: Any, inputs: Any, sid: str = None): value["inputs"]["input_id"] = new_value # Fix for external text default value - if value["class_type"] == "ComfyUIDeployExternalText": + if ( + value["class_type"] == "ComfyUIDeployExternalText" + or value["class_type"] == "ComfyUIDeployExternalTextAny" + ): value["inputs"]["default_value"] = new_value if value["class_type"] == "ComfyUIDeployExternalCheckpoint": @@ -380,6 +376,9 @@ def apply_inputs_to_workflow(workflow_api: Any, inputs: Any, sid: str = None): if value["class_type"] == "ComfyUIDeployExternalBoolean": value["inputs"]["default_value"] = new_value + if value["class_type"] == "ComfyUIDeployExternalFaceModel": + value["inputs"]["face_model_url"] = new_value + def send_prompt(sid: str, inputs: StreamingPrompt): # workflow_api = inputs.workflow_api @@ -422,15 +421,18 @@ def send_prompt(sid: str, inputs: StreamingPrompt): @server.PromptServer.instance.routes.post("/comfyui-deploy/run") async def comfy_deploy_run(request): # Extract the bearer token from the Authorization header - auth_header = request.headers.get("Authorization") - token = None - if auth_header: - parts = auth_header.split() - if len(parts) == 2 and parts[0].lower() == "bearer": - token = parts[1] - data = await request.json() + if "cd_token" in data: + token = data["cd_token"] + else: + auth_header = request.headers.get("Authorization") + token = None + if auth_header: + parts = auth_header.split() + if len(parts) == 2 and parts[0].lower() == "bearer": + token = parts[1] + # In older version, we use workflow_api, but this has inputs already swapped in nextjs frontend, which is tricky workflow_api = data.get("workflow_api_raw") # The prompt id generated from comfy deploy, can be None @@ -1357,6 +1359,53 @@ async def update_run(prompt_id: str, status: Status): ) +async def file_sender(file_object, chunk_size): + while True: + chunk = await file_object.read(chunk_size) + if not chunk: + break + yield chunk + + +chunk_size = 1024 * 1024 # 1MB chunks, adjust as needed + + +async def upload_with_retry( + session, url, headers, data, max_retries=3, initial_delay=1 +): + start_time = time.time() # Start timing here + for attempt in range(max_retries): + try: + async with session.put(url, headers=headers, data=data) as response: + upload_duration = time.time() - start_time + logger.info( + f"Upload attempt {attempt + 1} completed in {upload_duration:.2f} seconds" + ) + logger.info(f"Upload response status: {response.status}") + + response.raise_for_status() # This will raise an exception for 4xx and 5xx status codes + + response_text = await response.text() + logger.info(f"Response body: {response_text[:1000]}...") + + logger.info("Upload successful") + return response # Successful upload, exit the retry loop + + except (ClientError, ClientResponseError) as e: + logger.error(f"Upload attempt {attempt + 1} failed: {str(e)}") + if attempt < max_retries - 1: # If it's not the last attempt + delay = initial_delay * (2**attempt) # Exponential backoff + logger.info(f"Retrying in {delay} seconds...") + await asyncio.sleep(delay) + else: + logger.error("Max retries reached. Upload failed.") + raise # Re-raise the last exception if all retries are exhausted + except Exception as e: + logger.error(f"Unexpected error during upload: {str(e)}") + logger.error(traceback.format_exc()) + raise # Re-raise unexpected exceptions immediately + + async def upload_file( prompt_id, filename, @@ -1402,52 +1451,63 @@ async def upload_file( prompt_id = quote(prompt_id) content_type = quote(content_type) + target_url = f"{file_upload_endpoint}?file_name={filename}&run_id={prompt_id}&type={content_type}&version=v2" + + start_time = time.time() # Start timing here + logger.info(f"Target URL: {target_url}") + result = await async_request_with_retry( + "GET", target_url, disable_timeout=True, token=token + ) + end_time = time.time() # End timing after the request is complete + logger.info( + "Time taken for getting file upload endpoint: {:.2f} seconds".format( + end_time - start_time + ) + ) + ok = await result.json() + + logger.info(f"Result: {ok}") + async with aiofiles.open(file, "rb") as f: data = await f.read() size = str(len(data)) - target_url = f"{file_upload_endpoint}?file_name={filename}&run_id={prompt_id}&type={content_type}&version=v2" - - start_time = time.time() # Start timing here - logger.info(f"Target URL: {target_url}") - result = await async_request_with_retry( - "GET", target_url, disable_timeout=True, token=token - ) - end_time = time.time() # End timing after the request is complete - logger.info( - "Time taken for getting file upload endpoint: {:.2f} seconds".format( - end_time - start_time - ) - ) - ok = await result.json() - - logger.info(f"Result: {ok}") + # logger.info(f"Image size: {size}") start_time = time.time() # Start timing here headers = { "Content-Type": content_type, - # "Content-Length": size, + "Content-Length": size, } + logger.info(headers) + if ok.get("include_acl") is True: headers["x-amz-acl"] = "public-read" # response = requests.put(ok.get("url"), headers=headers, data=data) - response = await async_request_with_retry( - "PUT", ok.get("url"), headers=headers, data=data - ) - logger.info( - f"Upload file response status: {response.status}, status text: {response.reason}" - ) + # response = await async_request_with_retry('PUT', ok.get("url"), headers=headers, data=data) + # logger.info(f"Upload file response status: {response.status}, status text: {response.reason}") + + async with aiohttp.ClientSession() as session: + try: + response = await upload_with_retry( + session, ok.get("url"), headers, data + ) + # Process successful response... + except Exception as e: + # Handle final failure... + logger.error(f"Upload ultimately failed: {str(e)}") + end_time = time.time() # End timing after the request is complete logger.info("Upload time: {:.2f} seconds".format(end_time - start_time)) - if item is not None: - file_download_url = ok.get("download_url") - if file_download_url is not None: - item["url"] = file_download_url - item["upload_duration"] = end_time - start_time - if ok.get("is_public") is not None: - item["is_public"] = ok.get("is_public") + if item is not None: + file_download_url = ok.get("download_url") + if file_download_url is not None: + item["url"] = file_download_url + item["upload_duration"] = end_time - start_time + if ok.get("is_public") is not None: + item["is_public"] = ok.get("is_public") def have_pending_upload(prompt_id): @@ -1564,7 +1624,7 @@ async def handle_upload( prompt_id: str, data, key: str, content_type_key: str, default_content_type: str ): items = data.get(key, []) - # upload_tasks = [] + upload_tasks = [] for item in items: # Skipping temp files @@ -1580,37 +1640,47 @@ async def handle_upload( elif file_extension == ".webp": file_type = "image/webp" - # upload_tasks.append(upload_file( + upload_tasks.append( + upload_file( + prompt_id, + item.get("filename"), + subfolder=item.get("subfolder"), + type=item.get("type"), + content_type=file_type, + item=item, + ) + ) + # await upload_file( # prompt_id, # item.get("filename"), # subfolder=item.get("subfolder"), # type=item.get("type"), # content_type=file_type, # item=item - # )) - await upload_file( - prompt_id, - item.get("filename"), - subfolder=item.get("subfolder"), - type=item.get("type"), - content_type=file_type, - item=item, - ) + # ) # Execute all upload tasks concurrently - # await asyncio.gather(*upload_tasks) + await asyncio.gather(*upload_tasks) async def upload_in_background( prompt_id: str, data, node_id=None, have_upload=True, node_meta=None ): try: - await handle_upload(prompt_id, data, "images", "content_type", "image/png") - await handle_upload(prompt_id, data, "files", "content_type", "image/png") - await handle_upload(prompt_id, data, "gifs", "format", "image/gif") - await handle_upload( - prompt_id, data, "mesh", "format", "application/octet-stream" - ) + # await handle_upload(prompt_id, data, 'images', "content_type", "image/png") + # await handle_upload(prompt_id, data, 'files', "content_type", "image/png") + # await handle_upload(prompt_id, data, 'gifs', "format", "image/gif") + # await handle_upload(prompt_id, data, 'mesh', "format", "application/octet-stream") + upload_tasks = [ + handle_upload(prompt_id, data, "images", "content_type", "image/png"), + handle_upload(prompt_id, data, "files", "content_type", "image/png"), + handle_upload(prompt_id, data, "gifs", "format", "image/gif"), + handle_upload( + prompt_id, data, "mesh", "format", "application/octet-stream" + ), + ] + + await asyncio.gather(*upload_tasks) status_endpoint = prompt_metadata[prompt_id].status_endpoint token = prompt_metadata[prompt_id].token diff --git a/web/src/components/customInputNodes.tsx b/web/src/components/customInputNodes.tsx index d6f9d68..7b2b7e7 100644 --- a/web/src/components/customInputNodes.tsx +++ b/web/src/components/customInputNodes.tsx @@ -6,4 +6,5 @@ export const customInputNodes: Record = { ComfyUIDeployExternalNumberInt: "integer", ComfyUIDeployExternalLora: "string - (public lora download url)", ComfyUIDeployExternalCheckpoint: "string - (public checkpoints download url)", + ComfyUIDeployExternalFaceModel: "string - (public face model download url)", };