This commit is contained in:
nick 2024-10-04 18:49:30 -07:00
commit 8b05ed26c9

View File

@ -443,7 +443,7 @@ async def comfy_deploy_run(request):
# We proxy the request to Comfy Deploy, this is a native run # We proxy the request to Comfy Deploy, this is a native run
if "is_native_run" in data: if "is_native_run" in data:
async with aiohttp.ClientSession() as session: async with aiohttp.ClientSession() as session:
pprint(data) # pprint(data)
# headers = request.headers.copy() # headers = request.headers.copy()
# headers['Content-Type'] = 'application/json' # headers['Content-Type'] = 'application/json'
async with session.post( async with session.post(
@ -1137,14 +1137,7 @@ async def send_json_override(self, event, data, sid=None):
] ]
) )
if prompt_id in comfy_message_queues:
comfy_message_queues[prompt_id].put_nowait({"event": event, "data": data})
asyncio.create_task(update_run_ws_event(prompt_id, event, data)) asyncio.create_task(update_run_ws_event(prompt_id, event, data))
# event_emitter.emit("send_json", {
# "event": event,
# "data": data
# })
if event == "execution_start": if event == "execution_start":
await update_run(prompt_id, Status.RUNNING) await update_run(prompt_id, Status.RUNNING)
@ -1189,7 +1182,7 @@ async def send_json_override(self, event, data, sid=None):
return return
prompt_metadata[prompt_id].last_updated_node = node prompt_metadata[prompt_id].last_updated_node = node
class_type = prompt_metadata[prompt_id].workflow_api[node]["class_type"] class_type = prompt_metadata[prompt_id].workflow_api[node]["class_type"]
logger.info(f"At: {calculated_progress * 100}% - {class_type}") logger.info(f"At: {round(calculated_progress * 100)}% - {class_type}")
await send( await send(
"live_status", "live_status",
{ {
@ -1224,7 +1217,6 @@ async def send_json_override(self, event, data, sid=None):
if prompt_id in prompt_metadata: if prompt_id in prompt_metadata:
node = data.get("node") node = data.get("node")
class_type = prompt_metadata[prompt_id].workflow_api[node]["class_type"] class_type = prompt_metadata[prompt_id].workflow_api[node]["class_type"]
logger.info(f"Executed {class_type} {data}")
node_meta = { node_meta = {
"node_id": node, "node_id": node,
"node_class": class_type, "node_class": class_type,
@ -1232,14 +1224,18 @@ async def send_json_override(self, event, data, sid=None):
if class_type == "PreviewImage": if class_type == "PreviewImage":
logger.info("Skipping preview image") logger.info("Skipping preview image")
return return
await update_run_with_output(
prompt_id,
data.get("output"),
node_id=data.get("node"),
node_meta=node_meta,
)
logger.info(f"Executed {class_type} {data}")
else: else:
logger.info(f"Executed {data}") logger.info(f"Executed {data}")
await update_run_with_output( if prompt_id in comfy_message_queues:
prompt_id, data.get("output"), node_id=data.get("node"), node_meta=node_meta comfy_message_queues[prompt_id].put_nowait({"event": event, "data": data})
)
# await update_run_with_output(prompt_id, data.get('output'), node_id=data.get('node'))
# update_run_with_output(prompt_id, data.get('output'))
# Global variable to keep track of the last read line number # Global variable to keep track of the last read line number
@ -1425,7 +1421,7 @@ async def upload_with_retry(
response.raise_for_status() # This will raise an exception for 4xx and 5xx status codes response.raise_for_status() # This will raise an exception for 4xx and 5xx status codes
response_text = await response.text() response_text = await response.text()
logger.info(f"Response body: {response_text[:1000]}...") # logger.info(f"Response body: {response_text[:1000]}...")
logger.info("Upload successful") logger.info("Upload successful")
return response # Successful upload, exit the retry loop return response # Successful upload, exit the retry loop
@ -1493,7 +1489,7 @@ async def upload_file(
target_url = f"{file_upload_endpoint}?file_name={filename}&run_id={prompt_id}&type={content_type}&version=v2" target_url = f"{file_upload_endpoint}?file_name={filename}&run_id={prompt_id}&type={content_type}&version=v2"
start_time = time.time() # Start timing here start_time = time.time() # Start timing here
logger.info(f"Target URL: {target_url}") # logger.info(f"Target URL: {target_url}")
result = await async_request_with_retry( result = await async_request_with_retry(
"GET", target_url, disable_timeout=True, token=token "GET", target_url, disable_timeout=True, token=token
) )
@ -1505,7 +1501,7 @@ async def upload_file(
) )
ok = await result.json() ok = await result.json()
logger.info(f"Result: {ok}") # logger.info(f"Result: {ok}")
async with aiofiles.open(file, "rb") as f: async with aiofiles.open(file, "rb") as f:
data = await f.read() data = await f.read()
@ -1518,7 +1514,7 @@ async def upload_file(
"Content-Length": size, "Content-Length": size,
} }
logger.info(headers) # logger.info(headers)
if ok.get("include_acl") is True: if ok.get("include_acl") is True:
headers["x-amz-acl"] = "public-read" headers["x-amz-acl"] = "public-read"
@ -1548,6 +1544,8 @@ async def upload_file(
if ok.get("is_public") is not None: if ok.get("is_public") is not None:
item["is_public"] = ok.get("is_public") item["is_public"] = ok.get("is_public")
return item
def have_pending_upload(prompt_id): def have_pending_upload(prompt_id):
if ( if (
@ -1758,7 +1756,7 @@ async def update_run_with_output(
"node_meta": node_meta, "node_meta": node_meta,
"gpu_event_id": gpu_event_id, "gpu_event_id": gpu_event_id,
} }
pprint(body) # pprint(body)
have_upload_media = False have_upload_media = False
if data is not None: if data is not None:
have_upload_media = ( have_upload_media = (