From 2d033570f4a8828610486e81ba5fb5cee6dbbcf2 Mon Sep 17 00:00:00 2001 From: bennykok Date: Wed, 31 Jan 2024 00:41:41 +0800 Subject: [PATCH] fix: modal logging with complete async io rewrite --- builder/modal-builder/src/template/app.py | 104 ++++++++++------------ 1 file changed, 46 insertions(+), 58 deletions(-) diff --git a/builder/modal-builder/src/template/app.py b/builder/modal-builder/src/template/app.py index 8ab86c2..ef88b61 100644 --- a/builder/modal-builder/src/template/app.py +++ b/builder/modal-builder/src/template/app.py @@ -9,6 +9,7 @@ from fastapi import FastAPI, Request, HTTPException from fastapi.responses import HTMLResponse from volume_setup import volumes from datetime import datetime +import aiohttp # deploy_test = False import os @@ -91,37 +92,22 @@ COMFY_POLLING_MAX_RETRIES = 1000 COMFY_HOST = "127.0.0.1:8188" -def check_server(url, retries=50, delay=500): - import requests - import time - """ - Check if a server is reachable via HTTP GET request - - Args: - - url (str): The URL to check - - retries (int, optional): The number of times to attempt connecting to the server. Default is 50 - - delay (int, optional): The time in milliseconds to wait between retries. Default is 500 - - Returns: - bool: True if the server is reachable within the given number of retries, otherwise False - """ - +async def check_server(url, retries=50, delay=500): + import aiohttp for i in range(retries): try: - response = requests.get(url) - - # If the response status code is 200, the server is up and running - if response.status_code == 200: - print(f"comfy-modal - API is reachable") - return True - except requests.RequestException as e: + async with aiohttp.ClientSession() as session: + async with session.get(url) as response: + # If the response status code is 200, the server is up and running + if response.status == 200: + print(f"comfy-modal - API is reachable") + return True + except Exception as e: # If an exception occurs, the server may not be ready pass - # print(f"runpod-worker-comfy - trying") - # Wait for the specified delay before retrying - time.sleep(delay / 1000) + await asyncio.sleep(delay / 1000) print( f"comfy-modal - Failed to connect to server at {url} after {retries} attempts." @@ -129,10 +115,10 @@ def check_server(url, retries=50, delay=500): return False -def check_status(prompt_id): - req = urllib.request.Request( - f"http://{COMFY_HOST}/comfyui-deploy/check-status?prompt_id={prompt_id}") - return json.loads(urllib.request.urlopen(req).read()) +async def check_status(prompt_id): + async with aiohttp.ClientSession() as session: + async with session.get(f"http://{COMFY_HOST}/comfyui-deploy/check-status?prompt_id={prompt_id}") as response: + return await response.json() class Input(BaseModel): @@ -142,12 +128,12 @@ class Input(BaseModel): file_upload_endpoint: str -def queue_workflow_comfy_deploy(data: Input): +async def queue_workflow_comfy_deploy(data: Input): data_str = data.json() data_bytes = data_str.encode('utf-8') - req = urllib.request.Request( - f"http://{COMFY_HOST}/comfyui-deploy/run", data=data_bytes) - return json.loads(urllib.request.urlopen(req).read()) + async with aiohttp.ClientSession() as session: + async with session.post(f"http://{COMFY_HOST}/comfyui-deploy/run", data=data_bytes) as response: + return await response.json() class RequestInput(BaseModel): @@ -203,11 +189,6 @@ class ComfyDeployRunner: # Make sure that the ComfyUI API is available print(f"comfy-modal - check server") - # command = ["python", "main.py", - # "--disable-auto-launch", "--disable-metadata"] - - # self.server_process = subprocess.Popen(command, cwd="/comfyui") - self.server_process = await asyncio.subprocess.create_subprocess_shell( f"python main.py --disable-auto-launch --disable-metadata", stdout=asyncio.subprocess.PIPE, @@ -216,36 +197,35 @@ class ComfyDeployRunner: # env={**os.environ, "COLUMNS": "10000"} ) - self.stdout_task = asyncio.create_task( + stdout_task = asyncio.create_task( self.read_stream(self.server_process.stdout, False)) - self.stderr_task = asyncio.create_task( + stderr_task = asyncio.create_task( self.read_stream(self.server_process.stderr, True)) - check_server( + await check_server( f"http://{COMFY_HOST}", COMFY_API_AVAILABLE_MAX_RETRIES, COMFY_API_AVAILABLE_INTERVAL_MS, ) + stdout_task.cancel() + stderr_task.cancel() + @exit() async def cleanup(self, exc_type, exc_value, traceback): print(f"comfy-modal - cleanup", exc_type, exc_value, traceback) - self.stderr_task.cancel() - self.stdout_task.cancel() # self.server_process.kill() @method() async def run(self, input: Input): import signal import time - # import asyncio + import asyncio + import aiohttp - self.stderr_task.cancel() - self.stdout_task.cancel() - - self.stdout_task = asyncio.create_task( + stdout_task = asyncio.create_task( self.read_stream(self.server_process.stdout, False)) - self.stderr_task = asyncio.create_task( + stderr_task = asyncio.create_task( self.read_stream(self.server_process.stderr, True)) class TimeoutError(Exception): @@ -272,13 +252,14 @@ class ComfyDeployRunner: "status": "started", "time": datetime.now().isoformat() }).encode('utf-8') - req = urllib.request.Request(input.status_endpoint, data=data, method='POST') - urllib.request.urlopen(req) + async with aiohttp.ClientSession() as session: + async with session.post(input.status_endpoint, data=data) as response: + pass job_input = input try: - queued_workflow = queue_workflow_comfy_deploy(job_input) # queue_workflow(workflow) + queued_workflow = await queue_workflow_comfy_deploy(job_input) # queue_workflow(workflow) prompt_id = queued_workflow["prompt_id"] print(f"comfy-modal - queued workflow with ID {prompt_id}") except Exception as e: @@ -293,14 +274,14 @@ class ComfyDeployRunner: try: print("getting request") while retries < COMFY_POLLING_MAX_RETRIES: - status_result = check_status(prompt_id=prompt_id) + status_result = await check_status(prompt_id=prompt_id) if 'status' in status_result and (status_result['status'] == 'success' or status_result['status'] == 'failed'): status = status_result['status'] print(status) break else: # Wait before trying again - time.sleep(COMFY_POLLING_INTERVAL_MS / 1000) + await asyncio.sleep(COMFY_POLLING_INTERVAL_MS / 1000) retries += 1 else: return {"error": "Max retries reached while waiting for image generation"} @@ -314,19 +295,26 @@ class ComfyDeployRunner: except TimeoutError: print("Operation timed out") return {"status": "failed"} + finally: + signal.alarm(0) - print("uploading log_data") data = json.dumps({ "run_id": input.prompt_id, "time": datetime.now().isoformat(), - "log_data": json.dumps(self.machine_logs) + "log_data": self.machine_logs }).encode('utf-8') print("my logs", len(self.machine_logs)) # Clear logs + async with aiohttp.ClientSession() as session: + async with session.post(input.status_endpoint, data=data) as response: + print("response", response) + print("uploaded log_data") + # print(data) self.machine_logs = [] - req = urllib.request.Request(input.status_endpoint, data=data, method='POST') - urllib.request.urlopen(req) + + stdout_task.cancel() + stderr_task.cancel() return result