diff --git a/custom_routes.py b/custom_routes.py index 0400ce5..25542ca 100644 --- a/custom_routes.py +++ b/custom_routes.py @@ -94,19 +94,24 @@ async def comfy_deploy_run(request): "prompt_id": prompt_id } + prompt_metadata[prompt_id] = { + 'status_endpoint': data.get('status_endpoint'), + 'file_upload_endpoint': data.get('file_upload_endpoint'), + } + try: res = post_prompt(prompt) except Exception as e: error_type = type(e).__name__ - stack_trace = traceback.format_exc().strip().split('\n')[-2] + stack_trace_short = traceback.format_exc().strip().split('\n')[-2] + stack_trace = traceback.format_exc().strip() print(f"error: {error_type}, {e}") - print(f"stack trace: {stack_trace}") - return web.Response(status=500, reason=f"{error_type}: {e}, {stack_trace}") - - prompt_metadata[res['prompt_id']] = { - 'status_endpoint': data.get('status_endpoint'), - 'file_upload_endpoint': data.get('file_upload_endpoint'), - } + print(f"stack trace: {stack_trace_short}") + asyncio.create_task(update_run_with_output(prompt_id, { + "error_type": error_type, + "stack_trace": stack_trace + })) + return web.Response(status=500, reason=f"{error_type}: {e}, {stack_trace_short}") status = 200 if "error" in res: diff --git a/prestartup_script.py b/prestartup_script.py index bad440e..5e6ea3b 100644 --- a/prestartup_script.py +++ b/prestartup_script.py @@ -7,33 +7,42 @@ import threading import logging from logging.handlers import RotatingFileHandler -class StreamToLogger(object): - def __init__(self, original, logger, log_level): - self.original_stdout = original - self.logger = logger - self.log_level = log_level - - def write(self, buf): - self.original_stdout.write(buf) - self.original_stdout.flush() - for line in buf.rstrip().splitlines(): - self.logger.log(self.log_level, line.rstrip()) - - def flush(self): - self.original_stdout.flush() - - -logger = logging.getLogger() -logger.setLevel(logging.INFO) - -# Create a handler that rotates log files every 500KB handler = RotatingFileHandler('comfy-deploy.log', maxBytes=500000, backupCount=5) -logger.addHandler(handler) -# Store original streams original_stdout = sys.stdout original_stderr = sys.stderr +class StreamToLogger(): + def __init__(self, log_level): + self.log_level = log_level + + def write(self, buf): + if (self.log_level == logging.INFO): + original_stdout.write(buf) + original_stdout.flush() + elif (self.log_level == logging.ERROR): + original_stderr.write(buf) + original_stderr.flush() + + for line in buf.rstrip().splitlines(): + handler.handle( + logging.LogRecord( + name="comfy-deploy", + level=self.log_level, + pathname="prestartup_script.py", + lineno=1, + msg=line.rstrip(), + args=None, + exc_info=None + ) + ) + + def flush(self): + if (self.log_level == logging.INFO): + original_stdout.flush() + elif (self.log_level == logging.ERROR): + original_stderr.flush() + # Redirect stdout and stderr to the logger -sys.stdout = StreamToLogger(original_stdout, logger, logging.INFO) -sys.stderr = StreamToLogger(original_stderr, logger, logging.ERROR) \ No newline at end of file +sys.stdout = StreamToLogger(logging.INFO) +sys.stderr = StreamToLogger(logging.ERROR) \ No newline at end of file diff --git a/web/src/server/createRun.ts b/web/src/server/createRun.ts index 2158054..76826f6 100644 --- a/web/src/server/createRun.ts +++ b/web/src/server/createRun.ts @@ -58,49 +58,7 @@ export const createRun = withServerPromise( file_upload_endpoint: `${origin}/api/file-upload`, }; - switch (machine.type) { - case "runpod-serverless": - prompt_id = v4(); - const data = { - input: { - ...shareData, - prompt_id: prompt_id, - }, - }; - console.log(data); - - if (!machine.auth_token) { - throw new Error("Machine auth token not found"); - } - - const __result = await fetch(`${machine.endpoint}/run`, { - method: "POST", - headers: { - "Content-Type": "application/json", - Authorization: `Bearer ${machine.auth_token}`, - }, - body: JSON.stringify(data), - cache: "no-store", - }); - console.log(__result); - if (!__result.ok) - throw new Error(`Error creating run, ${__result.statusText}`); - console.log(data, __result); - break; - case "classic": - const body = shareData; - const comfyui_endpoint = `${machine.endpoint}/comfyui-deploy/run`; - const _result = await fetch(comfyui_endpoint, { - method: "POST", - body: JSON.stringify(body), - cache: "no-store", - }); - if (!_result.ok) - throw new Error(`Error creating run, ${_result.statusText}`); - const result = await ComfyAPI_Run.parseAsync(await _result.json()); - prompt_id = result.prompt_id; - break; - } + prompt_id = v4(); // Add to our db const workflow_run = await db @@ -117,6 +75,76 @@ export const createRun = withServerPromise( revalidatePath(`/${workflow_version_data.workflow_id}`); + try { + switch (machine.type) { + case "runpod-serverless": + const data = { + input: { + ...shareData, + prompt_id: prompt_id, + }, + }; + + if ( + !machine.auth_token && + !machine.endpoint.includes("localhost") && + !machine.endpoint.includes("127.0.0.1") + ) { + throw new Error("Machine auth token not found"); + } + + const __result = await fetch(`${machine.endpoint}/run`, { + method: "POST", + headers: { + "Content-Type": "application/json", + Authorization: `Bearer ${machine.auth_token}`, + }, + body: JSON.stringify(data), + cache: "no-store", + }); + console.log(__result); + if (!__result.ok) + throw new Error(`Error creating run, ${__result.statusText}`); + console.log(data, __result); + break; + case "classic": + const body = { + ...shareData, + prompt_id: prompt_id, + }; + // console.log(body); + const comfyui_endpoint = `${machine.endpoint}/comfyui-deploy/run`; + const _result = await fetch(comfyui_endpoint, { + method: "POST", + body: JSON.stringify(body), + cache: "no-store", + }); + // console.log(_result); + + if (!_result.ok) { + let message = `Error creating run, ${_result.statusText}`; + try { + const result = await ComfyAPI_Run.parseAsync( + await _result.json() + ); + message += ` ${result.node_errors}`; + } catch (error) {} + throw new Error(message); + } + // prompt_id = result.prompt_id; + break; + } + } catch (e) { + console.error(e); + await db + .update(workflowRunsTable) + .set({ + status: "failed", + }) + .where(eq(workflowRunsTable.id, workflow_run[0].id)); + throw e; + } + return { workflow_run_id: workflow_run[0].id, message: "Successful workflow run",