fix: modal logging with complete async io rewrite
This commit is contained in:
parent
7ae25aa162
commit
2d033570f4
@ -9,6 +9,7 @@ from fastapi import FastAPI, Request, HTTPException
|
|||||||
from fastapi.responses import HTMLResponse
|
from fastapi.responses import HTMLResponse
|
||||||
from volume_setup import volumes
|
from volume_setup import volumes
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
import aiohttp
|
||||||
# deploy_test = False
|
# deploy_test = False
|
||||||
|
|
||||||
import os
|
import os
|
||||||
@ -91,37 +92,22 @@ COMFY_POLLING_MAX_RETRIES = 1000
|
|||||||
COMFY_HOST = "127.0.0.1:8188"
|
COMFY_HOST = "127.0.0.1:8188"
|
||||||
|
|
||||||
|
|
||||||
def check_server(url, retries=50, delay=500):
|
async def check_server(url, retries=50, delay=500):
|
||||||
import requests
|
import aiohttp
|
||||||
import time
|
|
||||||
"""
|
|
||||||
Check if a server is reachable via HTTP GET request
|
|
||||||
|
|
||||||
Args:
|
|
||||||
- url (str): The URL to check
|
|
||||||
- retries (int, optional): The number of times to attempt connecting to the server. Default is 50
|
|
||||||
- delay (int, optional): The time in milliseconds to wait between retries. Default is 500
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
bool: True if the server is reachable within the given number of retries, otherwise False
|
|
||||||
"""
|
|
||||||
|
|
||||||
for i in range(retries):
|
for i in range(retries):
|
||||||
try:
|
try:
|
||||||
response = requests.get(url)
|
async with aiohttp.ClientSession() as session:
|
||||||
|
async with session.get(url) as response:
|
||||||
# If the response status code is 200, the server is up and running
|
# If the response status code is 200, the server is up and running
|
||||||
if response.status_code == 200:
|
if response.status == 200:
|
||||||
print(f"comfy-modal - API is reachable")
|
print(f"comfy-modal - API is reachable")
|
||||||
return True
|
return True
|
||||||
except requests.RequestException as e:
|
except Exception as e:
|
||||||
# If an exception occurs, the server may not be ready
|
# If an exception occurs, the server may not be ready
|
||||||
pass
|
pass
|
||||||
|
|
||||||
# print(f"runpod-worker-comfy - trying")
|
|
||||||
|
|
||||||
# Wait for the specified delay before retrying
|
# Wait for the specified delay before retrying
|
||||||
time.sleep(delay / 1000)
|
await asyncio.sleep(delay / 1000)
|
||||||
|
|
||||||
print(
|
print(
|
||||||
f"comfy-modal - Failed to connect to server at {url} after {retries} attempts."
|
f"comfy-modal - Failed to connect to server at {url} after {retries} attempts."
|
||||||
@ -129,10 +115,10 @@ def check_server(url, retries=50, delay=500):
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
def check_status(prompt_id):
|
async def check_status(prompt_id):
|
||||||
req = urllib.request.Request(
|
async with aiohttp.ClientSession() as session:
|
||||||
f"http://{COMFY_HOST}/comfyui-deploy/check-status?prompt_id={prompt_id}")
|
async with session.get(f"http://{COMFY_HOST}/comfyui-deploy/check-status?prompt_id={prompt_id}") as response:
|
||||||
return json.loads(urllib.request.urlopen(req).read())
|
return await response.json()
|
||||||
|
|
||||||
|
|
||||||
class Input(BaseModel):
|
class Input(BaseModel):
|
||||||
@ -142,12 +128,12 @@ class Input(BaseModel):
|
|||||||
file_upload_endpoint: str
|
file_upload_endpoint: str
|
||||||
|
|
||||||
|
|
||||||
def queue_workflow_comfy_deploy(data: Input):
|
async def queue_workflow_comfy_deploy(data: Input):
|
||||||
data_str = data.json()
|
data_str = data.json()
|
||||||
data_bytes = data_str.encode('utf-8')
|
data_bytes = data_str.encode('utf-8')
|
||||||
req = urllib.request.Request(
|
async with aiohttp.ClientSession() as session:
|
||||||
f"http://{COMFY_HOST}/comfyui-deploy/run", data=data_bytes)
|
async with session.post(f"http://{COMFY_HOST}/comfyui-deploy/run", data=data_bytes) as response:
|
||||||
return json.loads(urllib.request.urlopen(req).read())
|
return await response.json()
|
||||||
|
|
||||||
|
|
||||||
class RequestInput(BaseModel):
|
class RequestInput(BaseModel):
|
||||||
@ -203,11 +189,6 @@ class ComfyDeployRunner:
|
|||||||
# Make sure that the ComfyUI API is available
|
# Make sure that the ComfyUI API is available
|
||||||
print(f"comfy-modal - check server")
|
print(f"comfy-modal - check server")
|
||||||
|
|
||||||
# command = ["python", "main.py",
|
|
||||||
# "--disable-auto-launch", "--disable-metadata"]
|
|
||||||
|
|
||||||
# self.server_process = subprocess.Popen(command, cwd="/comfyui")
|
|
||||||
|
|
||||||
self.server_process = await asyncio.subprocess.create_subprocess_shell(
|
self.server_process = await asyncio.subprocess.create_subprocess_shell(
|
||||||
f"python main.py --disable-auto-launch --disable-metadata",
|
f"python main.py --disable-auto-launch --disable-metadata",
|
||||||
stdout=asyncio.subprocess.PIPE,
|
stdout=asyncio.subprocess.PIPE,
|
||||||
@ -216,36 +197,35 @@ class ComfyDeployRunner:
|
|||||||
# env={**os.environ, "COLUMNS": "10000"}
|
# env={**os.environ, "COLUMNS": "10000"}
|
||||||
)
|
)
|
||||||
|
|
||||||
self.stdout_task = asyncio.create_task(
|
stdout_task = asyncio.create_task(
|
||||||
self.read_stream(self.server_process.stdout, False))
|
self.read_stream(self.server_process.stdout, False))
|
||||||
self.stderr_task = asyncio.create_task(
|
stderr_task = asyncio.create_task(
|
||||||
self.read_stream(self.server_process.stderr, True))
|
self.read_stream(self.server_process.stderr, True))
|
||||||
|
|
||||||
check_server(
|
await check_server(
|
||||||
f"http://{COMFY_HOST}",
|
f"http://{COMFY_HOST}",
|
||||||
COMFY_API_AVAILABLE_MAX_RETRIES,
|
COMFY_API_AVAILABLE_MAX_RETRIES,
|
||||||
COMFY_API_AVAILABLE_INTERVAL_MS,
|
COMFY_API_AVAILABLE_INTERVAL_MS,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
stdout_task.cancel()
|
||||||
|
stderr_task.cancel()
|
||||||
|
|
||||||
@exit()
|
@exit()
|
||||||
async def cleanup(self, exc_type, exc_value, traceback):
|
async def cleanup(self, exc_type, exc_value, traceback):
|
||||||
print(f"comfy-modal - cleanup", exc_type, exc_value, traceback)
|
print(f"comfy-modal - cleanup", exc_type, exc_value, traceback)
|
||||||
self.stderr_task.cancel()
|
|
||||||
self.stdout_task.cancel()
|
|
||||||
# self.server_process.kill()
|
# self.server_process.kill()
|
||||||
|
|
||||||
@method()
|
@method()
|
||||||
async def run(self, input: Input):
|
async def run(self, input: Input):
|
||||||
import signal
|
import signal
|
||||||
import time
|
import time
|
||||||
# import asyncio
|
import asyncio
|
||||||
|
import aiohttp
|
||||||
|
|
||||||
self.stderr_task.cancel()
|
stdout_task = asyncio.create_task(
|
||||||
self.stdout_task.cancel()
|
|
||||||
|
|
||||||
self.stdout_task = asyncio.create_task(
|
|
||||||
self.read_stream(self.server_process.stdout, False))
|
self.read_stream(self.server_process.stdout, False))
|
||||||
self.stderr_task = asyncio.create_task(
|
stderr_task = asyncio.create_task(
|
||||||
self.read_stream(self.server_process.stderr, True))
|
self.read_stream(self.server_process.stderr, True))
|
||||||
|
|
||||||
class TimeoutError(Exception):
|
class TimeoutError(Exception):
|
||||||
@ -272,13 +252,14 @@ class ComfyDeployRunner:
|
|||||||
"status": "started",
|
"status": "started",
|
||||||
"time": datetime.now().isoformat()
|
"time": datetime.now().isoformat()
|
||||||
}).encode('utf-8')
|
}).encode('utf-8')
|
||||||
req = urllib.request.Request(input.status_endpoint, data=data, method='POST')
|
async with aiohttp.ClientSession() as session:
|
||||||
urllib.request.urlopen(req)
|
async with session.post(input.status_endpoint, data=data) as response:
|
||||||
|
pass
|
||||||
|
|
||||||
job_input = input
|
job_input = input
|
||||||
|
|
||||||
try:
|
try:
|
||||||
queued_workflow = queue_workflow_comfy_deploy(job_input) # queue_workflow(workflow)
|
queued_workflow = await queue_workflow_comfy_deploy(job_input) # queue_workflow(workflow)
|
||||||
prompt_id = queued_workflow["prompt_id"]
|
prompt_id = queued_workflow["prompt_id"]
|
||||||
print(f"comfy-modal - queued workflow with ID {prompt_id}")
|
print(f"comfy-modal - queued workflow with ID {prompt_id}")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@ -293,14 +274,14 @@ class ComfyDeployRunner:
|
|||||||
try:
|
try:
|
||||||
print("getting request")
|
print("getting request")
|
||||||
while retries < COMFY_POLLING_MAX_RETRIES:
|
while retries < COMFY_POLLING_MAX_RETRIES:
|
||||||
status_result = check_status(prompt_id=prompt_id)
|
status_result = await check_status(prompt_id=prompt_id)
|
||||||
if 'status' in status_result and (status_result['status'] == 'success' or status_result['status'] == 'failed'):
|
if 'status' in status_result and (status_result['status'] == 'success' or status_result['status'] == 'failed'):
|
||||||
status = status_result['status']
|
status = status_result['status']
|
||||||
print(status)
|
print(status)
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
# Wait before trying again
|
# Wait before trying again
|
||||||
time.sleep(COMFY_POLLING_INTERVAL_MS / 1000)
|
await asyncio.sleep(COMFY_POLLING_INTERVAL_MS / 1000)
|
||||||
retries += 1
|
retries += 1
|
||||||
else:
|
else:
|
||||||
return {"error": "Max retries reached while waiting for image generation"}
|
return {"error": "Max retries reached while waiting for image generation"}
|
||||||
@ -314,19 +295,26 @@ class ComfyDeployRunner:
|
|||||||
except TimeoutError:
|
except TimeoutError:
|
||||||
print("Operation timed out")
|
print("Operation timed out")
|
||||||
return {"status": "failed"}
|
return {"status": "failed"}
|
||||||
|
finally:
|
||||||
|
signal.alarm(0)
|
||||||
|
|
||||||
print("uploading log_data")
|
print("uploading log_data")
|
||||||
data = json.dumps({
|
data = json.dumps({
|
||||||
"run_id": input.prompt_id,
|
"run_id": input.prompt_id,
|
||||||
"time": datetime.now().isoformat(),
|
"time": datetime.now().isoformat(),
|
||||||
"log_data": json.dumps(self.machine_logs)
|
"log_data": self.machine_logs
|
||||||
}).encode('utf-8')
|
}).encode('utf-8')
|
||||||
print("my logs", len(self.machine_logs))
|
print("my logs", len(self.machine_logs))
|
||||||
# Clear logs
|
# Clear logs
|
||||||
|
async with aiohttp.ClientSession() as session:
|
||||||
|
async with session.post(input.status_endpoint, data=data) as response:
|
||||||
|
print("response", response)
|
||||||
|
print("uploaded log_data")
|
||||||
|
# print(data)
|
||||||
self.machine_logs = []
|
self.machine_logs = []
|
||||||
req = urllib.request.Request(input.status_endpoint, data=data, method='POST')
|
|
||||||
urllib.request.urlopen(req)
|
stdout_task.cancel()
|
||||||
|
stderr_task.cancel()
|
||||||
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user