fix: first run error will be logged, log rotation handling revamped, prompt id will be generated first.

This commit is contained in:
BennyKok 2023-12-28 22:48:23 +08:00
parent 8fe38fde3d
commit 61894c0b46
3 changed files with 117 additions and 75 deletions

View File

@ -94,19 +94,24 @@ async def comfy_deploy_run(request):
"prompt_id": prompt_id "prompt_id": prompt_id
} }
prompt_metadata[prompt_id] = {
'status_endpoint': data.get('status_endpoint'),
'file_upload_endpoint': data.get('file_upload_endpoint'),
}
try: try:
res = post_prompt(prompt) res = post_prompt(prompt)
except Exception as e: except Exception as e:
error_type = type(e).__name__ error_type = type(e).__name__
stack_trace = traceback.format_exc().strip().split('\n')[-2] stack_trace_short = traceback.format_exc().strip().split('\n')[-2]
stack_trace = traceback.format_exc().strip()
print(f"error: {error_type}, {e}") print(f"error: {error_type}, {e}")
print(f"stack trace: {stack_trace}") print(f"stack trace: {stack_trace_short}")
return web.Response(status=500, reason=f"{error_type}: {e}, {stack_trace}") asyncio.create_task(update_run_with_output(prompt_id, {
"error_type": error_type,
prompt_metadata[res['prompt_id']] = { "stack_trace": stack_trace
'status_endpoint': data.get('status_endpoint'), }))
'file_upload_endpoint': data.get('file_upload_endpoint'), return web.Response(status=500, reason=f"{error_type}: {e}, {stack_trace_short}")
}
status = 200 status = 200
if "error" in res: if "error" in res:

View File

@ -7,33 +7,42 @@ import threading
import logging import logging
from logging.handlers import RotatingFileHandler from logging.handlers import RotatingFileHandler
class StreamToLogger(object):
def __init__(self, original, logger, log_level):
self.original_stdout = original
self.logger = logger
self.log_level = log_level
def write(self, buf):
self.original_stdout.write(buf)
self.original_stdout.flush()
for line in buf.rstrip().splitlines():
self.logger.log(self.log_level, line.rstrip())
def flush(self):
self.original_stdout.flush()
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Create a handler that rotates log files every 500KB
handler = RotatingFileHandler('comfy-deploy.log', maxBytes=500000, backupCount=5) handler = RotatingFileHandler('comfy-deploy.log', maxBytes=500000, backupCount=5)
logger.addHandler(handler)
# Store original streams
original_stdout = sys.stdout original_stdout = sys.stdout
original_stderr = sys.stderr original_stderr = sys.stderr
class StreamToLogger():
def __init__(self, log_level):
self.log_level = log_level
def write(self, buf):
if (self.log_level == logging.INFO):
original_stdout.write(buf)
original_stdout.flush()
elif (self.log_level == logging.ERROR):
original_stderr.write(buf)
original_stderr.flush()
for line in buf.rstrip().splitlines():
handler.handle(
logging.LogRecord(
name="comfy-deploy",
level=self.log_level,
pathname="prestartup_script.py",
lineno=1,
msg=line.rstrip(),
args=None,
exc_info=None
)
)
def flush(self):
if (self.log_level == logging.INFO):
original_stdout.flush()
elif (self.log_level == logging.ERROR):
original_stderr.flush()
# Redirect stdout and stderr to the logger # Redirect stdout and stderr to the logger
sys.stdout = StreamToLogger(original_stdout, logger, logging.INFO) sys.stdout = StreamToLogger(logging.INFO)
sys.stderr = StreamToLogger(original_stderr, logger, logging.ERROR) sys.stderr = StreamToLogger(logging.ERROR)

View File

@ -58,49 +58,7 @@ export const createRun = withServerPromise(
file_upload_endpoint: `${origin}/api/file-upload`, file_upload_endpoint: `${origin}/api/file-upload`,
}; };
switch (machine.type) { prompt_id = v4();
case "runpod-serverless":
prompt_id = v4();
const data = {
input: {
...shareData,
prompt_id: prompt_id,
},
};
console.log(data);
if (!machine.auth_token) {
throw new Error("Machine auth token not found");
}
const __result = await fetch(`${machine.endpoint}/run`, {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${machine.auth_token}`,
},
body: JSON.stringify(data),
cache: "no-store",
});
console.log(__result);
if (!__result.ok)
throw new Error(`Error creating run, ${__result.statusText}`);
console.log(data, __result);
break;
case "classic":
const body = shareData;
const comfyui_endpoint = `${machine.endpoint}/comfyui-deploy/run`;
const _result = await fetch(comfyui_endpoint, {
method: "POST",
body: JSON.stringify(body),
cache: "no-store",
});
if (!_result.ok)
throw new Error(`Error creating run, ${_result.statusText}`);
const result = await ComfyAPI_Run.parseAsync(await _result.json());
prompt_id = result.prompt_id;
break;
}
// Add to our db // Add to our db
const workflow_run = await db const workflow_run = await db
@ -117,6 +75,76 @@ export const createRun = withServerPromise(
revalidatePath(`/${workflow_version_data.workflow_id}`); revalidatePath(`/${workflow_version_data.workflow_id}`);
try {
switch (machine.type) {
case "runpod-serverless":
const data = {
input: {
...shareData,
prompt_id: prompt_id,
},
};
if (
!machine.auth_token &&
!machine.endpoint.includes("localhost") &&
!machine.endpoint.includes("127.0.0.1")
) {
throw new Error("Machine auth token not found");
}
const __result = await fetch(`${machine.endpoint}/run`, {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${machine.auth_token}`,
},
body: JSON.stringify(data),
cache: "no-store",
});
console.log(__result);
if (!__result.ok)
throw new Error(`Error creating run, ${__result.statusText}`);
console.log(data, __result);
break;
case "classic":
const body = {
...shareData,
prompt_id: prompt_id,
};
// console.log(body);
const comfyui_endpoint = `${machine.endpoint}/comfyui-deploy/run`;
const _result = await fetch(comfyui_endpoint, {
method: "POST",
body: JSON.stringify(body),
cache: "no-store",
});
// console.log(_result);
if (!_result.ok) {
let message = `Error creating run, ${_result.statusText}`;
try {
const result = await ComfyAPI_Run.parseAsync(
await _result.json()
);
message += ` ${result.node_errors}`;
} catch (error) {}
throw new Error(message);
}
// prompt_id = result.prompt_id;
break;
}
} catch (e) {
console.error(e);
await db
.update(workflowRunsTable)
.set({
status: "failed",
})
.where(eq(workflowRunsTable.id, workflow_run[0].id));
throw e;
}
return { return {
workflow_run_id: workflow_run[0].id, workflow_run_id: workflow_run[0].id,
message: "Successful workflow run", message: "Successful workflow run",