diff --git a/builder/modal-builder/src/template/app.py b/builder/modal-builder/src/template/app.py index d76631f..6efbdda 100644 --- a/builder/modal-builder/src/template/app.py +++ b/builder/modal-builder/src/template/app.py @@ -9,6 +9,8 @@ from fastapi import FastAPI, Request, HTTPException from fastapi.responses import HTMLResponse from volume_setup import volumes from datetime import datetime +import aiohttp +from aiohttp import TCPConnector # deploy_test = False import os @@ -82,7 +84,7 @@ if not deploy_test: # Time to wait between API check attempts in milliseconds COMFY_API_AVAILABLE_INTERVAL_MS = 50 # Maximum number of API check attempts -COMFY_API_AVAILABLE_MAX_RETRIES = 5000 +COMFY_API_AVAILABLE_MAX_RETRIES = 1000 # Time to wait between poll attempts in milliseconds COMFY_POLLING_INTERVAL_MS = 250 # Maximum number of poll attempts @@ -91,48 +93,34 @@ COMFY_POLLING_MAX_RETRIES = 1000 COMFY_HOST = "127.0.0.1:8188" -def check_server(url, retries=50, delay=500): - import requests - import time - """ - Check if a server is reachable via HTTP GET request - - Args: - - url (str): The URL to check - - retries (int, optional): The number of times to attempt connecting to the server. Default is 50 - - delay (int, optional): The time in milliseconds to wait between retries. Default is 500 - - Returns: - bool: True if the server is reachable within the given number of retries, otherwise False - """ - - for i in range(retries): +async def check_server(url, retries=50, delay=500): + import aiohttp + # for i in range(retries): + while True: try: - response = requests.get(url) - - # If the response status code is 200, the server is up and running - if response.status_code == 200: - print(f"modal-comfy - API is reachable") - return True - except requests.RequestException as e: + async with aiohttp.ClientSession() as session: + async with session.get(url) as response: + # If the response status code is 200, the server is up and running + if response.status == 200: + print(f"comfy-modal - API is reachable") + return True + except Exception as e: # If an exception occurs, the server may not be ready pass - # print(f"runpod-worker-comfy - trying") - # Wait for the specified delay before retrying - time.sleep(delay / 1000) + await asyncio.sleep(delay / 1000) print( - f"modal-comfy - Failed to connect to server at {url} after {retries} attempts." + f"comfy-modal - Failed to connect to server at {url} after {retries} attempts." ) return False -def check_status(prompt_id): - req = urllib.request.Request( - f"http://{COMFY_HOST}/comfyui-deploy/check-status?prompt_id={prompt_id}") - return json.loads(urllib.request.urlopen(req).read()) +async def check_status(prompt_id): + async with aiohttp.ClientSession() as session: + async with session.get(f"http://{COMFY_HOST}/comfyui-deploy/check-status?prompt_id={prompt_id}") as response: + return await response.json() class Input(BaseModel): @@ -142,12 +130,12 @@ class Input(BaseModel): file_upload_endpoint: str -def queue_workflow_comfy_deploy(data: Input): +async def queue_workflow_comfy_deploy(data: Input): data_str = data.json() data_bytes = data_str.encode('utf-8') - req = urllib.request.Request( - f"http://{COMFY_HOST}/comfyui-deploy/run", data=data_bytes) - return json.loads(urllib.request.urlopen(req).read()) + async with aiohttp.ClientSession() as session: + async with session.post(f"http://{COMFY_HOST}/comfyui-deploy/run", data=data_bytes) as response: + return await response.json() class RequestInput(BaseModel): @@ -158,86 +146,225 @@ image = Image.debian_slim() target_image = image if deploy_test else dockerfile_image -@stub.cls(image=target_image, gpu=config["gpu"] ,volumes=volumes, timeout=(config["run_timeout"] + 20), container_idle_timeout=config["idle_timeout"]) +run_timeout = config["run_timeout"] +idle_timeout = config["idle_timeout"] + +import asyncio + +@stub.cls( + image=target_image, + gpu=config["gpu"] , + volumes=volumes, + timeout=(config["run_timeout"] + 20), + container_idle_timeout=config["idle_timeout"], + allow_concurrent_inputs=1, +) class ComfyDeployRunner: + machine_logs = [] + + async def read_stream(self, stream, isStderr): + import time + while True: + try: + line = await stream.readline() + if line: + l = line.decode('utf-8').strip() + + if l == "": + continue + + if not isStderr: + print(l, flush=True) + self.machine_logs.append({ + "logs": l, + "timestamp": time.time() + }) + + else: + # is error + # logger.error(l) + print(l, flush=True) + self.machine_logs.append({ + "logs": l, + "timestamp": time.time() + }) + else: + break + except asyncio.CancelledError: + # Handle the cancellation here if needed + break # Break out of the loop on cancellation + @enter() - def setup(self): + async def setup(self): import subprocess import time + # Make sure that the ComfyUI API is available print(f"comfy-modal - check server") - command = ["python", "main.py", - "--disable-auto-launch", "--disable-metadata"] - - self.server_process = subprocess.Popen(command, cwd="/comfyui") - - check_server( - f"http://{COMFY_HOST}", - COMFY_API_AVAILABLE_MAX_RETRIES, - COMFY_API_AVAILABLE_INTERVAL_MS, + self.server_process = await asyncio.subprocess.create_subprocess_shell( + f"python main.py --disable-auto-launch --disable-metadata", + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + cwd="/comfyui", + # env={**os.environ, "COLUMNS": "10000"} ) @exit() - def cleanup(self, exc_type, exc_value, traceback): - self.server_process.terminate() + async def cleanup(self, exc_type, exc_value, traceback): + print(f"comfy-modal - cleanup", exc_type, exc_value, traceback) + # Get the current event loop + loop = asyncio.get_event_loop() + + # Check if the event loop is closed + if loop.is_closed(): + print("The event loop is closed.") + else: + try: + self.server_process.terminate() + await self.server_process.wait() + except Exception as e: + print("Issues when cleaning up", e) + print("The event loop is open.") @method() - def run(self, input: Input): + async def run(self, input: Input): + import signal import time - data = json.dumps({ - "run_id": input.prompt_id, - "status": "started", - "time": datetime.now().isoformat() - }).encode('utf-8') - req = urllib.request.Request(input.status_endpoint, data=data, method='POST') - urllib.request.urlopen(req) - - job_input = input + import aiohttp + stdout_task = asyncio.create_task( + self.read_stream(self.server_process.stdout, False)) + stderr_task = asyncio.create_task( + self.read_stream(self.server_process.stderr, True)) + try: - queued_workflow = queue_workflow_comfy_deploy(job_input) # queue_workflow(workflow) - prompt_id = queued_workflow["prompt_id"] - print(f"comfy-modal - queued workflow with ID {prompt_id}") - except Exception as e: - import traceback - print(traceback.format_exc()) - return {"error": f"Error queuing workflow: {str(e)}"} + class TimeoutError(Exception): + pass - # Poll for completion - print(f"comfy-modal - wait until image generation is complete") - retries = 0 - status = "" - try: - print("getting request") - while retries < COMFY_POLLING_MAX_RETRIES: - status_result = check_status(prompt_id=prompt_id) - # history = get_history(prompt_id) + def timeout_handler(signum, frame): + data = json.dumps({ + "run_id": input.prompt_id, + "status": "timeout", + "time": datetime.now().isoformat() + }).encode('utf-8') + req = urllib.request.Request(input.status_endpoint, data=data, method='POST') + urllib.request.urlopen(req) + raise TimeoutError("Operation timed out") + + signal.signal(signal.SIGALRM, timeout_handler) - # Exit the loop if we have found the history - # if prompt_id in history and history[prompt_id].get("outputs"): - # break + try: + signal.alarm(run_timeout) # 5 seconds timeout - # Exit the loop if we have found the status both success or failed - if 'status' in status_result and (status_result['status'] == 'success' or status_result['status'] == 'failed'): - status = status_result['status'] - print(status) - break - else: - # Wait before trying again - time.sleep(COMFY_POLLING_INTERVAL_MS / 1000) - retries += 1 - else: - return {"error": "Max retries reached while waiting for image generation"} - except Exception as e: - return {"error": f"Error waiting for image generation: {str(e)}"} + ok = await check_server( + f"http://{COMFY_HOST}", + COMFY_API_AVAILABLE_MAX_RETRIES, + COMFY_API_AVAILABLE_INTERVAL_MS, + ) - print(f"comfy-modal - Finished run") + if not ok: + raise Exception("ComfyUI API is not available") + # Set an alarm for some seconds in the future - result = {"status": status} + data = json.dumps({ + "run_id": input.prompt_id, + "status": "started", + "time": datetime.now().isoformat() + }).encode('utf-8') + async with aiohttp.ClientSession() as session: + async with session.post(input.status_endpoint, data=data) as response: + pass + job_input = input + + try: + queued_workflow = await queue_workflow_comfy_deploy(job_input) # queue_workflow(workflow) + prompt_id = queued_workflow["prompt_id"] + print(f"comfy-modal - queued workflow with ID {prompt_id}") + except Exception as e: + import traceback + print(traceback.format_exc()) + return {"error": f"Error queuing workflow: {str(e)}"} + + # Poll for completion + print(f"comfy-modal - wait until image generation is complete") + retries = 0 + status = "" + try: + print("getting request") + while retries < COMFY_POLLING_MAX_RETRIES: + status_result = await check_status(prompt_id=prompt_id) + if 'status' in status_result and (status_result['status'] == 'success' or status_result['status'] == 'failed'): + status = status_result['status'] + print(status) + break + else: + # Wait before trying again + await asyncio.sleep(COMFY_POLLING_INTERVAL_MS / 1000) + retries += 1 + else: + return {"error": "Max retries reached while waiting for image generation"} + except Exception as e: + return {"error": f"Error waiting for image generation: {str(e)}"} + + print(f"comfy-modal - Finished, turning off") + + result = {"status": status} + + except TimeoutError: + print("Operation timed out") + return {"status": "failed"} + except Exception as e: + print(f"Unexpected error occurred: {str(e)}") + data = json.dumps({ + "run_id": input.prompt_id, + "status": "failed", + "time": datetime.now().isoformat() + }).encode('utf-8') + async with aiohttp.ClientSession() as session: + async with session.post(input.status_endpoint, data=data) as response: + print("response", response) + self.machine_logs.append({ + "logs": str(e), + "timestamp": time.time() + }) + finally: + signal.alarm(0) + + print("uploading log_data") + data = json.dumps({ + "run_id": input.prompt_id, + "time": datetime.now().isoformat(), + "log_data": self.machine_logs + }).encode('utf-8') + print("my logs", len(self.machine_logs)) + # Clear logs + timeout = aiohttp.ClientTimeout(total=60) # 60 seconds total timeout + # Use HTTP/1.1 explicitly and increase the connection pool size + connector = TCPConnector(limit=100, force_close=True, enable_cleanup_closed=True) + + async with aiohttp.ClientSession(timeout=timeout, connector=connector) as session: + try: + async with session.post(input.status_endpoint, data=data) as response: + print("response", response) + # Process your response here + except asyncio.TimeoutError: + print("Request timed out") + except Exception as e: + print(f"An error occurred: {e}") + print("uploaded log_data") + # print(data) + self.machine_logs = [] + finally: + stdout_task.cancel() + stderr_task.cancel() + await stdout_task + await stderr_task + return result + @web_app.post("/run") @@ -253,10 +380,12 @@ async def post_run(request_input: RequestInput): urllib.request.urlopen(req) model = ComfyDeployRunner() - call = model.run.spawn(request_input.input) + call = await model.run.spawn.aio(request_input.input) + + print("call", call) # call = run.spawn() - return {"call_id": call.object_id} + return {"call_id": None} return {"call_id": None} diff --git a/builder/modal-builder/src/template/data/snapshot.json b/builder/modal-builder/src/template/data/snapshot.json index e99a2e8..f798981 100644 --- a/builder/modal-builder/src/template/data/snapshot.json +++ b/builder/modal-builder/src/template/data/snapshot.json @@ -2,7 +2,7 @@ "comfyui": "d0165d819afe76bd4e6bdd710eb5f3e571b6a804", "git_custom_nodes": { "https://github.com/BennyKok/comfyui-deploy.git": { - "hash": "a838cb7ad425e5652c3931fbafdc886b53c48a22", + "hash": "df46e3a0e5ad93fa71f5d216997e376af33b2a6d", "disabled": false } },