This commit is contained in:
nick 2024-09-24 23:16:49 -07:00
commit 79650f48d0
4 changed files with 354 additions and 129 deletions

View File

@ -0,0 +1,108 @@
from PIL import Image, ImageOps
import numpy as np
import torch
import folder_paths
class AnyType(str):
def __ne__(self, __value: object) -> bool:
return False
WILDCARD = AnyType("*")
class ComfyUIDeployExternalFaceModel:
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"input_id": (
"STRING",
{"multiline": False, "default": "input_reactor_face_model"},
),
},
"optional": {
"default_face_model_name": (
"STRING",
{"multiline": False, "default": ""},
),
"face_model_save_name": ( # if `default_face_model_name` is a link to download a file, we will attempt to save it with this name
"STRING",
{"multiline": False, "default": ""},
),
"display_name": (
"STRING",
{"multiline": False, "default": ""},
),
"description": (
"STRING",
{"multiline": True, "default": ""},
),
"face_model_url": (
"STRING",
{"multiline": False, "default": ""},
),
},
}
RETURN_TYPES = (WILDCARD,)
RETURN_NAMES = ("path",)
FUNCTION = "run"
CATEGORY = "deploy"
def run(
self,
input_id,
default_face_model_name=None,
face_model_save_name=None,
display_name=None,
description=None,
face_model_url=None,
):
import requests
import os
import uuid
if face_model_url and face_model_url.startswith("http"):
if face_model_save_name:
existing_face_models = folder_paths.get_filename_list("reactor/faces")
# Check if face_model_save_name exists in the list
if face_model_save_name in existing_face_models:
print(f"using face model: {face_model_save_name}")
return (face_model_save_name,)
else:
face_model_save_name = str(uuid.uuid4()) + ".safetensors"
print(face_model_save_name)
print(folder_paths.folder_names_and_paths["reactor/faces"][0][0])
destination_path = os.path.join(
folder_paths.folder_names_and_paths["reactor/faces"][0][0],
face_model_save_name,
)
print(destination_path)
print(
"Downloading external face model - "
+ face_model_url
+ " to "
+ destination_path
)
response = requests.get(
face_model_url,
headers={"User-Agent": "Mozilla/5.0"},
allow_redirects=True,
)
with open(destination_path, "wb") as out_file:
out_file.write(response.content)
return (face_model_save_name,)
else:
print(f"using face model: {default_face_model_name}")
return (default_face_model_name,)
NODE_CLASS_MAPPINGS = {"ComfyUIDeployExternalFaceModel": ComfyUIDeployExternalFaceModel}
NODE_DISPLAY_NAME_MAPPINGS = {
"ComfyUIDeployExternalFaceModel": "External Face Model (ComfyUI Deploy)"
}

View File

@ -0,0 +1,46 @@
class AnyType(str):
def __ne__(self, __value: object) -> bool:
return False
WILDCARD = AnyType("*")
class ComfyUIDeployExternalTextAny:
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"input_id": (
"STRING",
{"multiline": False, "default": "input_text"},
),
},
"optional": {
"default_value": (
"STRING",
{"multiline": True, "default": ""},
),
"display_name": (
"STRING",
{"multiline": False, "default": ""},
),
"description": (
"STRING",
{"multiline": True, "default": ""},
),
}
}
RETURN_TYPES = (WILDCARD,)
RETURN_NAMES = ("text",)
FUNCTION = "run"
CATEGORY = "text"
def run(self, input_id, default_value=None, display_name=None, description=None):
return [default_value]
NODE_CLASS_MAPPINGS = {"ComfyUIDeployExternalTextAny": ComfyUIDeployExternalTextAny}
NODE_DISPLAY_NAME_MAPPINGS = {"ComfyUIDeployExternalTextAny": "External Text Any (ComfyUI Deploy)"}

View File

@ -24,32 +24,34 @@ from typing import Dict, List, Union, Any, Optional
from PIL import Image
import copy
import struct
from aiohttp import web, ClientSession, ClientError, ClientTimeout
from aiohttp import web, ClientSession, ClientError, ClientTimeout, ClientResponseError
import atexit
# Global session
# client_session = None
client_session = None
# def create_client_session():
# global client_session
# if client_session is None:
# client_session = aiohttp.ClientSession()
# async def ensure_client_session():
# global client_session
# if client_session is None:
# client_session = aiohttp.ClientSession()
# async def cleanup():
# global client_session
# if client_session:
# await client_session.close()
async def ensure_client_session():
global client_session
if client_session is None:
client_session = aiohttp.ClientSession()
async def cleanup():
global client_session
if client_session:
await client_session.close()
def exit_handler():
print("Exiting the application. Initiating cleanup...")
# loop = asyncio.get_event_loop()
# loop.run_until_complete(cleanup())
loop = asyncio.get_event_loop()
loop.run_until_complete(cleanup())
atexit.register(exit_handler)
@ -65,77 +67,68 @@ import time
async def async_request_with_retry(
method, url, disable_timeout=False, token=None, **kwargs
):
# global client_session
# await ensure_client_session()
async with aiohttp.ClientSession() as client_session:
retry_delay = 1 # Start with 1 second delay
initial_timeout = 5 # 5 seconds timeout for the initial connection
global client_session
await ensure_client_session()
retry_delay = 1 # Start with 1 second delay
initial_timeout = 5 # 5 seconds timeout for the initial connection
start_time = time.time()
for attempt in range(max_retries):
try:
if not disable_timeout:
timeout = ClientTimeout(total=None, connect=initial_timeout)
kwargs["timeout"] = timeout
start_time = time.time()
for attempt in range(max_retries):
try:
if not disable_timeout:
timeout = ClientTimeout(total=None, connect=initial_timeout)
kwargs["timeout"] = timeout
if token is not None:
if "headers" not in kwargs:
kwargs["headers"] = {}
kwargs["headers"]["Authorization"] = f"Bearer {token}"
if token is not None:
if "headers" not in kwargs:
kwargs["headers"] = {}
kwargs["headers"]["Authorization"] = f"Bearer {token}"
request_start = time.time()
async with client_session.request(method, url, **kwargs) as response:
request_end = time.time()
logger.info(
f"Request attempt {attempt + 1} took {request_end - request_start:.2f} seconds"
)
request_start = time.time()
async with client_session.request(method, url, **kwargs) as response:
request_end = time.time()
# logger.info(f"Request attempt {attempt + 1} took {request_end - request_start:.2f} seconds")
if response.status != 200:
error_body = await response.text()
logger.error(
f"Request failed with status {response.status} and body {error_body}"
)
# raise Exception(f"Request failed with status {response.status}")
if response.status != 200:
error_body = await response.text()
# logger.error(f"Request failed with status {response.status} and body {error_body}")
# raise Exception(f"Request failed with status {response.status}")
response.raise_for_status()
if method.upper() == "GET":
await response.read()
response.raise_for_status()
if method.upper() == "GET":
await response.read()
total_time = time.time() - start_time
logger.info(
f"Request succeeded after {total_time:.2f} seconds (attempt {attempt + 1}/{max_retries})"
)
return response
except asyncio.TimeoutError:
logger.warning(
f"Request timed out after {initial_timeout} seconds (attempt {attempt + 1}/{max_retries})"
)
except ClientError as e:
end_time = time.time()
logger.error(
f"Request failed (attempt {attempt + 1}/{max_retries}): {e}"
)
logger.error(
f"Time taken for failed attempt: {end_time - request_start:.2f} seconds"
)
logger.error(f"Total time elapsed: {end_time - start_time:.2f} seconds")
total_time = time.time() - start_time
# logger.info(f"Request succeeded after {total_time:.2f} seconds (attempt {attempt + 1}/{max_retries})")
return response
except asyncio.TimeoutError:
logger.warning(
f"Request timed out after {initial_timeout} seconds (attempt {attempt + 1}/{max_retries})"
)
except ClientError as e:
end_time = time.time()
logger.error(f"Request failed (attempt {attempt + 1}/{max_retries}): {e}")
logger.error(
f"Time taken for failed attempt: {end_time - request_start:.2f} seconds"
)
logger.error(f"Total time elapsed: {end_time - start_time:.2f} seconds")
# Log the response body for ClientError as well
if hasattr(e, "response") and e.response is not None:
error_body = await e.response.text()
logger.error(f"Error response body: {error_body}")
# Log the response body for ClientError as well
if hasattr(e, "response") and e.response is not None:
error_body = await e.response.text()
logger.error(f"Error response body: {error_body}")
if attempt == max_retries - 1:
logger.error(f"Request failed after {max_retries} attempts: {e}")
raise
if attempt == max_retries - 1:
logger.error(f"Request failed after {max_retries} attempts: {e}")
raise
await asyncio.sleep(retry_delay)
retry_delay *= retry_delay_multiplier
await asyncio.sleep(retry_delay)
retry_delay *= retry_delay_multiplier
total_time = time.time() - start_time
raise Exception(
f"Request failed after {max_retries} attempts and {total_time:.2f} seconds"
)
total_time = time.time() - start_time
raise Exception(
f"Request failed after {max_retries} attempts and {total_time:.2f} seconds"
)
from logging import basicConfig, getLogger
@ -362,7 +355,10 @@ def apply_inputs_to_workflow(workflow_api: Any, inputs: Any, sid: str = None):
value["inputs"]["input_id"] = new_value
# Fix for external text default value
if value["class_type"] == "ComfyUIDeployExternalText":
if (
value["class_type"] == "ComfyUIDeployExternalText"
or value["class_type"] == "ComfyUIDeployExternalTextAny"
):
value["inputs"]["default_value"] = new_value
if value["class_type"] == "ComfyUIDeployExternalCheckpoint":
@ -380,6 +376,9 @@ def apply_inputs_to_workflow(workflow_api: Any, inputs: Any, sid: str = None):
if value["class_type"] == "ComfyUIDeployExternalBoolean":
value["inputs"]["default_value"] = new_value
if value["class_type"] == "ComfyUIDeployExternalFaceModel":
value["inputs"]["face_model_url"] = new_value
def send_prompt(sid: str, inputs: StreamingPrompt):
# workflow_api = inputs.workflow_api
@ -422,15 +421,18 @@ def send_prompt(sid: str, inputs: StreamingPrompt):
@server.PromptServer.instance.routes.post("/comfyui-deploy/run")
async def comfy_deploy_run(request):
# Extract the bearer token from the Authorization header
auth_header = request.headers.get("Authorization")
token = None
if auth_header:
parts = auth_header.split()
if len(parts) == 2 and parts[0].lower() == "bearer":
token = parts[1]
data = await request.json()
if "cd_token" in data:
token = data["cd_token"]
else:
auth_header = request.headers.get("Authorization")
token = None
if auth_header:
parts = auth_header.split()
if len(parts) == 2 and parts[0].lower() == "bearer":
token = parts[1]
# In older version, we use workflow_api, but this has inputs already swapped in nextjs frontend, which is tricky
workflow_api = data.get("workflow_api_raw")
# The prompt id generated from comfy deploy, can be None
@ -1357,6 +1359,53 @@ async def update_run(prompt_id: str, status: Status):
)
async def file_sender(file_object, chunk_size):
while True:
chunk = await file_object.read(chunk_size)
if not chunk:
break
yield chunk
chunk_size = 1024 * 1024 # 1MB chunks, adjust as needed
async def upload_with_retry(
session, url, headers, data, max_retries=3, initial_delay=1
):
start_time = time.time() # Start timing here
for attempt in range(max_retries):
try:
async with session.put(url, headers=headers, data=data) as response:
upload_duration = time.time() - start_time
logger.info(
f"Upload attempt {attempt + 1} completed in {upload_duration:.2f} seconds"
)
logger.info(f"Upload response status: {response.status}")
response.raise_for_status() # This will raise an exception for 4xx and 5xx status codes
response_text = await response.text()
logger.info(f"Response body: {response_text[:1000]}...")
logger.info("Upload successful")
return response # Successful upload, exit the retry loop
except (ClientError, ClientResponseError) as e:
logger.error(f"Upload attempt {attempt + 1} failed: {str(e)}")
if attempt < max_retries - 1: # If it's not the last attempt
delay = initial_delay * (2**attempt) # Exponential backoff
logger.info(f"Retrying in {delay} seconds...")
await asyncio.sleep(delay)
else:
logger.error("Max retries reached. Upload failed.")
raise # Re-raise the last exception if all retries are exhausted
except Exception as e:
logger.error(f"Unexpected error during upload: {str(e)}")
logger.error(traceback.format_exc())
raise # Re-raise unexpected exceptions immediately
async def upload_file(
prompt_id,
filename,
@ -1402,52 +1451,63 @@ async def upload_file(
prompt_id = quote(prompt_id)
content_type = quote(content_type)
target_url = f"{file_upload_endpoint}?file_name={filename}&run_id={prompt_id}&type={content_type}&version=v2"
start_time = time.time() # Start timing here
logger.info(f"Target URL: {target_url}")
result = await async_request_with_retry(
"GET", target_url, disable_timeout=True, token=token
)
end_time = time.time() # End timing after the request is complete
logger.info(
"Time taken for getting file upload endpoint: {:.2f} seconds".format(
end_time - start_time
)
)
ok = await result.json()
logger.info(f"Result: {ok}")
async with aiofiles.open(file, "rb") as f:
data = await f.read()
size = str(len(data))
target_url = f"{file_upload_endpoint}?file_name={filename}&run_id={prompt_id}&type={content_type}&version=v2"
start_time = time.time() # Start timing here
logger.info(f"Target URL: {target_url}")
result = await async_request_with_retry(
"GET", target_url, disable_timeout=True, token=token
)
end_time = time.time() # End timing after the request is complete
logger.info(
"Time taken for getting file upload endpoint: {:.2f} seconds".format(
end_time - start_time
)
)
ok = await result.json()
logger.info(f"Result: {ok}")
# logger.info(f"Image size: {size}")
start_time = time.time() # Start timing here
headers = {
"Content-Type": content_type,
# "Content-Length": size,
"Content-Length": size,
}
logger.info(headers)
if ok.get("include_acl") is True:
headers["x-amz-acl"] = "public-read"
# response = requests.put(ok.get("url"), headers=headers, data=data)
response = await async_request_with_retry(
"PUT", ok.get("url"), headers=headers, data=data
)
logger.info(
f"Upload file response status: {response.status}, status text: {response.reason}"
)
# response = await async_request_with_retry('PUT', ok.get("url"), headers=headers, data=data)
# logger.info(f"Upload file response status: {response.status}, status text: {response.reason}")
async with aiohttp.ClientSession() as session:
try:
response = await upload_with_retry(
session, ok.get("url"), headers, data
)
# Process successful response...
except Exception as e:
# Handle final failure...
logger.error(f"Upload ultimately failed: {str(e)}")
end_time = time.time() # End timing after the request is complete
logger.info("Upload time: {:.2f} seconds".format(end_time - start_time))
if item is not None:
file_download_url = ok.get("download_url")
if file_download_url is not None:
item["url"] = file_download_url
item["upload_duration"] = end_time - start_time
if ok.get("is_public") is not None:
item["is_public"] = ok.get("is_public")
if item is not None:
file_download_url = ok.get("download_url")
if file_download_url is not None:
item["url"] = file_download_url
item["upload_duration"] = end_time - start_time
if ok.get("is_public") is not None:
item["is_public"] = ok.get("is_public")
def have_pending_upload(prompt_id):
@ -1564,7 +1624,7 @@ async def handle_upload(
prompt_id: str, data, key: str, content_type_key: str, default_content_type: str
):
items = data.get(key, [])
# upload_tasks = []
upload_tasks = []
for item in items:
# Skipping temp files
@ -1580,37 +1640,47 @@ async def handle_upload(
elif file_extension == ".webp":
file_type = "image/webp"
# upload_tasks.append(upload_file(
upload_tasks.append(
upload_file(
prompt_id,
item.get("filename"),
subfolder=item.get("subfolder"),
type=item.get("type"),
content_type=file_type,
item=item,
)
)
# await upload_file(
# prompt_id,
# item.get("filename"),
# subfolder=item.get("subfolder"),
# type=item.get("type"),
# content_type=file_type,
# item=item
# ))
await upload_file(
prompt_id,
item.get("filename"),
subfolder=item.get("subfolder"),
type=item.get("type"),
content_type=file_type,
item=item,
)
# )
# Execute all upload tasks concurrently
# await asyncio.gather(*upload_tasks)
await asyncio.gather(*upload_tasks)
async def upload_in_background(
prompt_id: str, data, node_id=None, have_upload=True, node_meta=None
):
try:
await handle_upload(prompt_id, data, "images", "content_type", "image/png")
await handle_upload(prompt_id, data, "files", "content_type", "image/png")
await handle_upload(prompt_id, data, "gifs", "format", "image/gif")
await handle_upload(
prompt_id, data, "mesh", "format", "application/octet-stream"
)
# await handle_upload(prompt_id, data, 'images', "content_type", "image/png")
# await handle_upload(prompt_id, data, 'files', "content_type", "image/png")
# await handle_upload(prompt_id, data, 'gifs', "format", "image/gif")
# await handle_upload(prompt_id, data, 'mesh', "format", "application/octet-stream")
upload_tasks = [
handle_upload(prompt_id, data, "images", "content_type", "image/png"),
handle_upload(prompt_id, data, "files", "content_type", "image/png"),
handle_upload(prompt_id, data, "gifs", "format", "image/gif"),
handle_upload(
prompt_id, data, "mesh", "format", "application/octet-stream"
),
]
await asyncio.gather(*upload_tasks)
status_endpoint = prompt_metadata[prompt_id].status_endpoint
token = prompt_metadata[prompt_id].token

View File

@ -6,4 +6,5 @@ export const customInputNodes: Record<string, string> = {
ComfyUIDeployExternalNumberInt: "integer",
ComfyUIDeployExternalLora: "string - (public lora download url)",
ComfyUIDeployExternalCheckpoint: "string - (public checkpoints download url)",
ComfyUIDeployExternalFaceModel: "string - (public face model download url)",
};