From 86f14b5bce913425c2dbd6bdbf0e2af32544ff8f Mon Sep 17 00:00:00 2001 From: bennykok Date: Thu, 1 Feb 2024 22:11:17 +0800 Subject: [PATCH] fix: roll back to old app --- builder/modal-builder/src/template/app.py | 307 +++++++--------------- 1 file changed, 97 insertions(+), 210 deletions(-) diff --git a/builder/modal-builder/src/template/app.py b/builder/modal-builder/src/template/app.py index e332b3e..d76631f 100644 --- a/builder/modal-builder/src/template/app.py +++ b/builder/modal-builder/src/template/app.py @@ -9,7 +9,6 @@ from fastapi import FastAPI, Request, HTTPException from fastapi.responses import HTMLResponse from volume_setup import volumes from datetime import datetime -import aiohttp # deploy_test = False import os @@ -37,9 +36,7 @@ if not deploy_test: # dockerfile_image = Image.from_dockerfile(f"{current_directory}/Dockerfile", context_mount=Mount.from_local_dir(f"{current_directory}/data", remote_path="/data")) dockerfile_image = ( - modal.Image.debian_slim( - python_version="3.11", - ) + modal.Image.debian_slim() .apt_install("git", "wget") .pip_install( "git+https://github.com/modal-labs/asgiproxy.git", "httpx", "tqdm" @@ -52,7 +49,7 @@ if not deploy_test: # Install comfyui manager "cd /comfyui/custom_nodes && git clone https://github.com/ltdrdata/ComfyUI-Manager.git", - "cd /comfyui/custom_nodes/ComfyUI-Manager && git reset --hard 9c86f62b912f4625fe2b929c7fc61deb9d16f6d3", + "cd /comfyui/custom_nodes/ComfyUI-Manager && git reset --hard 8dd801435bb75aa1d24b7e382bac070a4c18bc51", "cd /comfyui/custom_nodes/ComfyUI-Manager && pip install -r requirements.txt", "cd /comfyui/custom_nodes/ComfyUI-Manager && mkdir startup-scripts", ) @@ -85,7 +82,7 @@ if not deploy_test: # Time to wait between API check attempts in milliseconds COMFY_API_AVAILABLE_INTERVAL_MS = 50 # Maximum number of API check attempts -COMFY_API_AVAILABLE_MAX_RETRIES = 1000 +COMFY_API_AVAILABLE_MAX_RETRIES = 5000 # Time to wait between poll attempts in milliseconds COMFY_POLLING_INTERVAL_MS = 250 # Maximum number of poll attempts @@ -94,34 +91,48 @@ COMFY_POLLING_MAX_RETRIES = 1000 COMFY_HOST = "127.0.0.1:8188" -async def check_server(url, retries=50, delay=500): - import aiohttp - # for i in range(retries): - while True: +def check_server(url, retries=50, delay=500): + import requests + import time + """ + Check if a server is reachable via HTTP GET request + + Args: + - url (str): The URL to check + - retries (int, optional): The number of times to attempt connecting to the server. Default is 50 + - delay (int, optional): The time in milliseconds to wait between retries. Default is 500 + + Returns: + bool: True if the server is reachable within the given number of retries, otherwise False + """ + + for i in range(retries): try: - async with aiohttp.ClientSession() as session: - async with session.get(url) as response: - # If the response status code is 200, the server is up and running - if response.status == 200: - print(f"comfy-modal - API is reachable") - return True - except Exception as e: + response = requests.get(url) + + # If the response status code is 200, the server is up and running + if response.status_code == 200: + print(f"modal-comfy - API is reachable") + return True + except requests.RequestException as e: # If an exception occurs, the server may not be ready pass + # print(f"runpod-worker-comfy - trying") + # Wait for the specified delay before retrying - await asyncio.sleep(delay / 1000) + time.sleep(delay / 1000) print( - f"comfy-modal - Failed to connect to server at {url} after {retries} attempts." + f"modal-comfy - Failed to connect to server at {url} after {retries} attempts." ) return False -async def check_status(prompt_id): - async with aiohttp.ClientSession() as session: - async with session.get(f"http://{COMFY_HOST}/comfyui-deploy/check-status?prompt_id={prompt_id}") as response: - return await response.json() +def check_status(prompt_id): + req = urllib.request.Request( + f"http://{COMFY_HOST}/comfyui-deploy/check-status?prompt_id={prompt_id}") + return json.loads(urllib.request.urlopen(req).read()) class Input(BaseModel): @@ -131,12 +142,12 @@ class Input(BaseModel): file_upload_endpoint: str -async def queue_workflow_comfy_deploy(data: Input): +def queue_workflow_comfy_deploy(data: Input): data_str = data.json() data_bytes = data_str.encode('utf-8') - async with aiohttp.ClientSession() as session: - async with session.post(f"http://{COMFY_HOST}/comfyui-deploy/run", data=data_bytes) as response: - return await response.json() + req = urllib.request.Request( + f"http://{COMFY_HOST}/comfyui-deploy/run", data=data_bytes) + return json.loads(urllib.request.urlopen(req).read()) class RequestInput(BaseModel): @@ -147,208 +158,86 @@ image = Image.debian_slim() target_image = image if deploy_test else dockerfile_image -run_timeout = config["run_timeout"] -idle_timeout = config["idle_timeout"] - -import asyncio - -@stub.cls(image=target_image, gpu=config["gpu"] ,volumes=volumes, timeout=60 * 10, container_idle_timeout=idle_timeout) +@stub.cls(image=target_image, gpu=config["gpu"] ,volumes=volumes, timeout=(config["run_timeout"] + 20), container_idle_timeout=config["idle_timeout"]) class ComfyDeployRunner: - machine_logs = [] - - async def read_stream(self, stream, isStderr): - import time - while True: - try: - line = await stream.readline() - if line: - l = line.decode('utf-8').strip() - - if l == "": - continue - - if not isStderr: - print(l, flush=True) - self.machine_logs.append({ - "logs": l, - "timestamp": time.time() - }) - - else: - # is error - # logger.error(l) - print(l, flush=True) - self.machine_logs.append({ - "logs": l, - "timestamp": time.time() - }) - else: - break - except asyncio.CancelledError: - # Handle the cancellation here if needed - break # Break out of the loop on cancellation - @enter() - async def setup(self): + def setup(self): import subprocess import time # Make sure that the ComfyUI API is available print(f"comfy-modal - check server") - self.server_process = await asyncio.subprocess.create_subprocess_shell( - f"python main.py --disable-auto-launch --disable-metadata", - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - cwd="/comfyui", - # env={**os.environ, "COLUMNS": "10000"} + command = ["python", "main.py", + "--disable-auto-launch", "--disable-metadata"] + + self.server_process = subprocess.Popen(command, cwd="/comfyui") + + check_server( + f"http://{COMFY_HOST}", + COMFY_API_AVAILABLE_MAX_RETRIES, + COMFY_API_AVAILABLE_INTERVAL_MS, ) @exit() - async def cleanup(self, exc_type, exc_value, traceback): - print(f"comfy-modal - cleanup", exc_type, exc_value, traceback) - # Get the current event loop - loop = asyncio.get_event_loop() - - # Check if the event loop is closed - if loop.is_closed(): - print("The event loop is closed.") - else: - try: - self.server_process.terminate() - await self.server_process.wait() - except Exception as e: - print("Issues when cleaning up", e) - print("The event loop is open.") + def cleanup(self, exc_type, exc_value, traceback): + self.server_process.terminate() @method() - async def run(self, input: Input): - import signal + def run(self, input: Input): import time - import asyncio - import aiohttp + data = json.dumps({ + "run_id": input.prompt_id, + "status": "started", + "time": datetime.now().isoformat() + }).encode('utf-8') + req = urllib.request.Request(input.status_endpoint, data=data, method='POST') + urllib.request.urlopen(req) + + job_input = input - stdout_task = asyncio.create_task( - self.read_stream(self.server_process.stdout, False)) - stderr_task = asyncio.create_task( - self.read_stream(self.server_process.stderr, True)) - try: - class TimeoutError(Exception): - pass + queued_workflow = queue_workflow_comfy_deploy(job_input) # queue_workflow(workflow) + prompt_id = queued_workflow["prompt_id"] + print(f"comfy-modal - queued workflow with ID {prompt_id}") + except Exception as e: + import traceback + print(traceback.format_exc()) + return {"error": f"Error queuing workflow: {str(e)}"} - def timeout_handler(signum, frame): - data = json.dumps({ - "run_id": input.prompt_id, - "status": "timeout", - "time": datetime.now().isoformat() - }).encode('utf-8') - req = urllib.request.Request(input.status_endpoint, data=data, method='POST') - urllib.request.urlopen(req) - raise TimeoutError("Operation timed out") - - signal.signal(signal.SIGALRM, timeout_handler) + # Poll for completion + print(f"comfy-modal - wait until image generation is complete") + retries = 0 + status = "" + try: + print("getting request") + while retries < COMFY_POLLING_MAX_RETRIES: + status_result = check_status(prompt_id=prompt_id) + # history = get_history(prompt_id) - try: - signal.alarm(run_timeout) # 5 seconds timeout + # Exit the loop if we have found the history + # if prompt_id in history and history[prompt_id].get("outputs"): + # break - ok = await check_server( - f"http://{COMFY_HOST}", - COMFY_API_AVAILABLE_MAX_RETRIES, - COMFY_API_AVAILABLE_INTERVAL_MS, - ) + # Exit the loop if we have found the status both success or failed + if 'status' in status_result and (status_result['status'] == 'success' or status_result['status'] == 'failed'): + status = status_result['status'] + print(status) + break + else: + # Wait before trying again + time.sleep(COMFY_POLLING_INTERVAL_MS / 1000) + retries += 1 + else: + return {"error": "Max retries reached while waiting for image generation"} + except Exception as e: + return {"error": f"Error waiting for image generation: {str(e)}"} - if not ok: - raise Exception("ComfyUI API is not available") - # Set an alarm for some seconds in the future + print(f"comfy-modal - Finished run") - data = json.dumps({ - "run_id": input.prompt_id, - "status": "started", - "time": datetime.now().isoformat() - }).encode('utf-8') - async with aiohttp.ClientSession() as session: - async with session.post(input.status_endpoint, data=data) as response: - pass + result = {"status": status} - job_input = input - - try: - queued_workflow = await queue_workflow_comfy_deploy(job_input) # queue_workflow(workflow) - prompt_id = queued_workflow["prompt_id"] - print(f"comfy-modal - queued workflow with ID {prompt_id}") - except Exception as e: - import traceback - print(traceback.format_exc()) - return {"error": f"Error queuing workflow: {str(e)}"} - - # Poll for completion - print(f"comfy-modal - wait until image generation is complete") - retries = 0 - status = "" - try: - print("getting request") - while retries < COMFY_POLLING_MAX_RETRIES: - status_result = await check_status(prompt_id=prompt_id) - if 'status' in status_result and (status_result['status'] == 'success' or status_result['status'] == 'failed'): - status = status_result['status'] - print(status) - break - else: - # Wait before trying again - await asyncio.sleep(COMFY_POLLING_INTERVAL_MS / 1000) - retries += 1 - else: - return {"error": "Max retries reached while waiting for image generation"} - except Exception as e: - return {"error": f"Error waiting for image generation: {str(e)}"} - - print(f"comfy-modal - Finished, turning off") - - result = {"status": status} - - except TimeoutError: - print("Operation timed out") - return {"status": "failed"} - except Exception as e: - print(f"Unexpected error occurred: {str(e)}") - data = json.dumps({ - "run_id": input.prompt_id, - "status": "failed", - "time": datetime.now().isoformat() - }).encode('utf-8') - async with aiohttp.ClientSession() as session: - async with session.post(input.status_endpoint, data=data) as response: - print("response", response) - self.machine_logs.append({ - "logs": str(e), - "timestamp": time.time() - }) - finally: - signal.alarm(0) - - print("uploading log_data") - data = json.dumps({ - "run_id": input.prompt_id, - "time": datetime.now().isoformat(), - "log_data": self.machine_logs - }).encode('utf-8') - print("my logs", len(self.machine_logs)) - # Clear logs - async with aiohttp.ClientSession() as session: - async with session.post(input.status_endpoint, data=data) as response: - print("response", response) - print("uploaded log_data") - # print(data) - self.machine_logs = [] - finally: - stdout_task.cancel() - stderr_task.cancel() - await stdout_task - await stderr_task - return result - @web_app.post("/run") @@ -364,12 +253,10 @@ async def post_run(request_input: RequestInput): urllib.request.urlopen(req) model = ComfyDeployRunner() - call = await model.run.spawn.aio(request_input.input) - - print("call", call) + call = model.run.spawn(request_input.input) # call = run.spawn() - return {"call_id": None} + return {"call_id": call.object_id} return {"call_id": None}