Compare commits
	
		
			No commits in common. "main" and "benny/log-sync" have entirely different histories.
		
	
	
		
			main
			...
			benny/log-
		
	
		
							
								
								
									
										3
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										3
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							@ -1,3 +1,2 @@
 | 
			
		||||
__pycache__
 | 
			
		||||
.DS_Store
 | 
			
		||||
file-hash-cache.json
 | 
			
		||||
.DS_Store
 | 
			
		||||
							
								
								
									
										504
									
								
								builder/modal-builder/src/main.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										504
									
								
								builder/modal-builder/src/main.py
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,504 @@
 | 
			
		||||
from typing import Union, Optional, Dict, List
 | 
			
		||||
from pydantic import BaseModel, Field, field_validator
 | 
			
		||||
from fastapi import FastAPI, HTTPException, WebSocket, BackgroundTasks, WebSocketDisconnect
 | 
			
		||||
from fastapi.responses import JSONResponse
 | 
			
		||||
from fastapi.logger import logger as fastapi_logger
 | 
			
		||||
import os
 | 
			
		||||
from enum import Enum
 | 
			
		||||
import json
 | 
			
		||||
import subprocess
 | 
			
		||||
import time
 | 
			
		||||
from contextlib import asynccontextmanager
 | 
			
		||||
import asyncio
 | 
			
		||||
import threading
 | 
			
		||||
import signal
 | 
			
		||||
import logging
 | 
			
		||||
from fastapi.logger import logger as fastapi_logger
 | 
			
		||||
import requests
 | 
			
		||||
from urllib.parse import parse_qs
 | 
			
		||||
from starlette.middleware.base import BaseHTTPMiddleware
 | 
			
		||||
from starlette.types import ASGIApp, Scope, Receive, Send
 | 
			
		||||
 | 
			
		||||
from concurrent.futures import ThreadPoolExecutor
 | 
			
		||||
 | 
			
		||||
# executor = ThreadPoolExecutor(max_workers=5)
 | 
			
		||||
 | 
			
		||||
gunicorn_error_logger = logging.getLogger("gunicorn.error")
 | 
			
		||||
gunicorn_logger = logging.getLogger("gunicorn")
 | 
			
		||||
uvicorn_access_logger = logging.getLogger("uvicorn.access")
 | 
			
		||||
uvicorn_access_logger.handlers = gunicorn_error_logger.handlers
 | 
			
		||||
 | 
			
		||||
fastapi_logger.handlers = gunicorn_error_logger.handlers
 | 
			
		||||
 | 
			
		||||
if __name__ != "__main__":
 | 
			
		||||
    fastapi_logger.setLevel(gunicorn_logger.level)
 | 
			
		||||
else:
 | 
			
		||||
    fastapi_logger.setLevel(logging.DEBUG)
 | 
			
		||||
 | 
			
		||||
logger = logging.getLogger("uvicorn")
 | 
			
		||||
logger.setLevel(logging.INFO)
 | 
			
		||||
 | 
			
		||||
last_activity_time = time.time()
 | 
			
		||||
global_timeout = 60 * 4
 | 
			
		||||
 | 
			
		||||
machine_id_websocket_dict = {}
 | 
			
		||||
machine_id_status = {}
 | 
			
		||||
 | 
			
		||||
fly_instance_id = os.environ.get('FLY_ALLOC_ID', 'local').split('-')[0]
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class FlyReplayMiddleware(BaseHTTPMiddleware):
 | 
			
		||||
    """
 | 
			
		||||
    If the wrong instance was picked by the fly.io load balancer we use the fly-replay header
 | 
			
		||||
    to repeat the request again on the right instance.
 | 
			
		||||
 | 
			
		||||
    This only works if the right instance is provided as a query_string parameter.
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    def __init__(self, app: ASGIApp) -> None:
 | 
			
		||||
        self.app = app
 | 
			
		||||
 | 
			
		||||
    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
 | 
			
		||||
        query_string = scope.get('query_string', b'').decode()
 | 
			
		||||
        query_params = parse_qs(query_string)
 | 
			
		||||
        target_instance = query_params.get(
 | 
			
		||||
            'fly_instance_id', [fly_instance_id])[0]
 | 
			
		||||
 | 
			
		||||
        async def send_wrapper(message):
 | 
			
		||||
            if target_instance != fly_instance_id:
 | 
			
		||||
                if message['type'] == 'websocket.close' and 'Invalid session' in message['reason']:
 | 
			
		||||
                    # fly.io only seems to look at the fly-replay header if websocket is accepted
 | 
			
		||||
                    message = {'type': 'websocket.accept'}
 | 
			
		||||
                if 'headers' not in message:
 | 
			
		||||
                    message['headers'] = []
 | 
			
		||||
                message['headers'].append(
 | 
			
		||||
                    [b'fly-replay', f'instance={target_instance}'.encode()])
 | 
			
		||||
            await send(message)
 | 
			
		||||
        await self.app(scope, receive, send_wrapper)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def check_inactivity():
 | 
			
		||||
    global last_activity_time
 | 
			
		||||
    while True:
 | 
			
		||||
        # logger.info("Checking inactivity...")
 | 
			
		||||
        if time.time() - last_activity_time > global_timeout:
 | 
			
		||||
            if len(machine_id_status) == 0:
 | 
			
		||||
                # The application has been inactive for more than 60 seconds.
 | 
			
		||||
                # Scale it down to zero here.
 | 
			
		||||
                logger.info(
 | 
			
		||||
                    f"No activity for {global_timeout} seconds, exiting...")
 | 
			
		||||
                # os._exit(0)
 | 
			
		||||
                os.kill(os.getpid(), signal.SIGINT)
 | 
			
		||||
                break
 | 
			
		||||
            else:
 | 
			
		||||
                pass
 | 
			
		||||
                # logger.info(f"Timeout but still in progress")
 | 
			
		||||
 | 
			
		||||
        await asyncio.sleep(1)  # Check every second
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@asynccontextmanager
 | 
			
		||||
async def lifespan(app: FastAPI):
 | 
			
		||||
    thread = run_in_new_thread(check_inactivity())
 | 
			
		||||
    yield
 | 
			
		||||
    logger.info("Cancelling")
 | 
			
		||||
 | 
			
		||||
#
 | 
			
		||||
app = FastAPI(lifespan=lifespan)
 | 
			
		||||
app.add_middleware(FlyReplayMiddleware)
 | 
			
		||||
# MODAL_ORG = os.environ.get("MODAL_ORG")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@app.get("/")
 | 
			
		||||
def read_root():
 | 
			
		||||
    global last_activity_time
 | 
			
		||||
    last_activity_time = time.time()
 | 
			
		||||
    logger.info(f"Extended inactivity time to {global_timeout}")
 | 
			
		||||
    return {"Hello": "World"}
 | 
			
		||||
 | 
			
		||||
# create a post route called /create takes in a json of example
 | 
			
		||||
# {
 | 
			
		||||
#     name: "my first image",
 | 
			
		||||
#     deps: {
 | 
			
		||||
#         "comfyui": "d0165d819afe76bd4e6bdd710eb5f3e571b6a804",
 | 
			
		||||
#         "git_custom_nodes": {
 | 
			
		||||
#             "https://github.com/cubiq/ComfyUI_IPAdapter_plus": {
 | 
			
		||||
#                 "hash": "2ca0c6dd0b2ad64b1c480828638914a564331dcd",
 | 
			
		||||
#                 "disabled": true
 | 
			
		||||
#             },
 | 
			
		||||
#             "https://github.com/ltdrdata/ComfyUI-Manager.git": {
 | 
			
		||||
#                 "hash": "9c86f62b912f4625fe2b929c7fc61deb9d16f6d3",
 | 
			
		||||
#                 "disabled": false
 | 
			
		||||
#             },
 | 
			
		||||
#         },
 | 
			
		||||
#         "file_custom_nodes": []
 | 
			
		||||
#     }
 | 
			
		||||
# }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class GitCustomNodes(BaseModel):
 | 
			
		||||
    hash: str
 | 
			
		||||
    disabled: bool
 | 
			
		||||
 | 
			
		||||
class FileCustomNodes(BaseModel):
 | 
			
		||||
    filename: str
 | 
			
		||||
    disabled: bool
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Snapshot(BaseModel):
 | 
			
		||||
    comfyui: str
 | 
			
		||||
    git_custom_nodes: Dict[str, GitCustomNodes]
 | 
			
		||||
    file_custom_nodes: List[FileCustomNodes]
 | 
			
		||||
 | 
			
		||||
class Model(BaseModel):
 | 
			
		||||
    name: str
 | 
			
		||||
    type: str
 | 
			
		||||
    base: str
 | 
			
		||||
    save_path: str
 | 
			
		||||
    description: str
 | 
			
		||||
    reference: str
 | 
			
		||||
    filename: str
 | 
			
		||||
    url: str
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class GPUType(str, Enum):
 | 
			
		||||
    T4 = "T4"
 | 
			
		||||
    A10G = "A10G"
 | 
			
		||||
    A100 = "A100"
 | 
			
		||||
    L4 = "L4"
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Item(BaseModel):
 | 
			
		||||
    machine_id: str
 | 
			
		||||
    name: str
 | 
			
		||||
    snapshot: Snapshot
 | 
			
		||||
    models: List[Model]
 | 
			
		||||
    callback_url: str
 | 
			
		||||
    gpu: GPUType = Field(default=GPUType.T4)
 | 
			
		||||
 | 
			
		||||
    @field_validator('gpu')
 | 
			
		||||
    @classmethod
 | 
			
		||||
    def check_gpu(cls, value):
 | 
			
		||||
        if value not in GPUType.__members__:
 | 
			
		||||
            raise ValueError(
 | 
			
		||||
                f"Invalid GPU option. Choose from: {', '.join(GPUType.__members__.keys())}")
 | 
			
		||||
        return GPUType(value)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@app.websocket("/ws/{machine_id}")
 | 
			
		||||
async def websocket_endpoint(websocket: WebSocket, machine_id: str):
 | 
			
		||||
    await websocket.accept()
 | 
			
		||||
    machine_id_websocket_dict[machine_id] = websocket
 | 
			
		||||
    # Send existing logs
 | 
			
		||||
    if machine_id in machine_logs_cache:
 | 
			
		||||
        combined_logs = "\n".join(
 | 
			
		||||
            log_entry['logs'] for log_entry in machine_logs_cache[machine_id])
 | 
			
		||||
        await websocket.send_text(json.dumps({"event": "LOGS", "data": {
 | 
			
		||||
            "machine_id": machine_id,
 | 
			
		||||
            "logs": combined_logs,
 | 
			
		||||
            "timestamp": time.time()
 | 
			
		||||
        }}))
 | 
			
		||||
    try:
 | 
			
		||||
        while True:
 | 
			
		||||
            data = await websocket.receive_text()
 | 
			
		||||
            global last_activity_time
 | 
			
		||||
            last_activity_time = time.time()
 | 
			
		||||
            logger.info(f"Extended inactivity time to {global_timeout}")
 | 
			
		||||
            # You can handle received messages here if needed
 | 
			
		||||
    except WebSocketDisconnect:
 | 
			
		||||
        if machine_id in machine_id_websocket_dict:
 | 
			
		||||
            machine_id_websocket_dict.pop(machine_id)
 | 
			
		||||
 | 
			
		||||
# @app.get("/test")
 | 
			
		||||
# async def test():
 | 
			
		||||
#     machine_id_status["123"] = True
 | 
			
		||||
#     global last_activity_time
 | 
			
		||||
#     last_activity_time = time.time()
 | 
			
		||||
#     logger.info(f"Extended inactivity time to {global_timeout}")
 | 
			
		||||
 | 
			
		||||
#     await asyncio.sleep(10)
 | 
			
		||||
 | 
			
		||||
#     machine_id_status["123"] = False
 | 
			
		||||
#     machine_id_status.pop("123")
 | 
			
		||||
 | 
			
		||||
#     return {"Hello": "World"}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@app.post("/create")
 | 
			
		||||
async def create_machine(item: Item):
 | 
			
		||||
    global last_activity_time
 | 
			
		||||
    last_activity_time = time.time()
 | 
			
		||||
    logger.info(f"Extended inactivity time to {global_timeout}")
 | 
			
		||||
 | 
			
		||||
    if item.machine_id in machine_id_status and machine_id_status[item.machine_id]:
 | 
			
		||||
        return JSONResponse(status_code=400, content={"error": "Build already in progress."})
 | 
			
		||||
 | 
			
		||||
    # Run the building logic in a separate thread
 | 
			
		||||
    # future = executor.submit(build_logic, item)
 | 
			
		||||
    task = asyncio.create_task(build_logic(item))
 | 
			
		||||
 | 
			
		||||
    return JSONResponse(status_code=200, content={"message": "Build Queued", "build_machine_instance_id": fly_instance_id})
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class StopAppItem(BaseModel):
 | 
			
		||||
    machine_id: str
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def find_app_id(app_list, app_name):
 | 
			
		||||
    for app in app_list:
 | 
			
		||||
        if app['Name'] == app_name:
 | 
			
		||||
            return app['App ID']
 | 
			
		||||
    return None
 | 
			
		||||
 | 
			
		||||
@app.post("/stop-app")
 | 
			
		||||
async def stop_app(item: StopAppItem):
 | 
			
		||||
    # cmd = f"modal app list | grep {item.machine_id} | awk -F '│' '{{print $2}}'"
 | 
			
		||||
    cmd = f"modal app list --json"
 | 
			
		||||
 | 
			
		||||
    env = os.environ.copy()
 | 
			
		||||
    env["COLUMNS"] = "10000"  # Set the width to a large value
 | 
			
		||||
    find_id_process = await asyncio.subprocess.create_subprocess_shell(cmd,
 | 
			
		||||
                                                                      stdout=asyncio.subprocess.PIPE,
 | 
			
		||||
                                                                      stderr=asyncio.subprocess.PIPE,
 | 
			
		||||
                                                                      env=env)
 | 
			
		||||
    await find_id_process.wait()
 | 
			
		||||
 | 
			
		||||
    stdout, stderr = await find_id_process.communicate()
 | 
			
		||||
    if stdout:
 | 
			
		||||
        app_id = stdout.decode().strip()
 | 
			
		||||
        app_list = json.loads(app_id)
 | 
			
		||||
        app_id = find_app_id(app_list, item.machine_id)
 | 
			
		||||
        logger.info(f"cp_process stdout: {app_id}")
 | 
			
		||||
    if stderr:
 | 
			
		||||
        logger.info(f"cp_process stderr: {stderr.decode()}")
 | 
			
		||||
 | 
			
		||||
    cp_process = await asyncio.subprocess.create_subprocess_exec("modal", "app", "stop", app_id,
 | 
			
		||||
                                                                 stdout=asyncio.subprocess.PIPE,
 | 
			
		||||
                                                                 stderr=asyncio.subprocess.PIPE,)
 | 
			
		||||
    await cp_process.wait()
 | 
			
		||||
    logger.info(f"Stopping app {item.machine_id}")
 | 
			
		||||
    stdout, stderr = await cp_process.communicate()
 | 
			
		||||
    if stdout:
 | 
			
		||||
        logger.info(f"cp_process stdout: {stdout.decode()}")
 | 
			
		||||
    if stderr:
 | 
			
		||||
        logger.info(f"cp_process stderr: {stderr.decode()}")
 | 
			
		||||
 | 
			
		||||
    if cp_process.returncode == 0:
 | 
			
		||||
        return JSONResponse(status_code=200, content={"status": "success"})
 | 
			
		||||
    else:
 | 
			
		||||
        return JSONResponse(status_code=500, content={"status": "error", "error": stderr.decode()})
 | 
			
		||||
 | 
			
		||||
# Initialize the logs cache
 | 
			
		||||
machine_logs_cache = {}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def build_logic(item: Item):
 | 
			
		||||
    # Deploy to modal
 | 
			
		||||
    folder_path = f"/app/builds/{item.machine_id}"
 | 
			
		||||
    machine_id_status[item.machine_id] = True
 | 
			
		||||
 | 
			
		||||
    # Ensure the os path is same as the current directory
 | 
			
		||||
    # os.chdir(os.path.dirname(os.path.realpath(__file__)))
 | 
			
		||||
    # print(
 | 
			
		||||
    #     f"builder - Current working directory: {os.getcwd()}"
 | 
			
		||||
    # )
 | 
			
		||||
 | 
			
		||||
    # Copy the app template
 | 
			
		||||
    # os.system(f"cp -r template {folder_path}")
 | 
			
		||||
    cp_process = await asyncio.subprocess.create_subprocess_exec("cp", "-r", "/app/src/template", folder_path)
 | 
			
		||||
    await cp_process.wait()
 | 
			
		||||
 | 
			
		||||
    # Write the config file
 | 
			
		||||
    config = {
 | 
			
		||||
        "name": item.name,
 | 
			
		||||
        "deploy_test": os.environ.get("DEPLOY_TEST_FLAG", "False"),
 | 
			
		||||
        "gpu": item.gpu,
 | 
			
		||||
        "civitai_token": os.environ.get("CIVITAI_TOKEN", "")
 | 
			
		||||
    }
 | 
			
		||||
    with open(f"{folder_path}/config.py", "w") as f:
 | 
			
		||||
        f.write("config = " + json.dumps(config))
 | 
			
		||||
 | 
			
		||||
    with open(f"{folder_path}/data/snapshot.json", "w") as f:
 | 
			
		||||
        f.write(item.snapshot.json())
 | 
			
		||||
 | 
			
		||||
    with open(f"{folder_path}/data/models.json", "w") as f:
 | 
			
		||||
        models_json_list = [model.dict() for model in item.models]
 | 
			
		||||
        models_json_string = json.dumps(models_json_list)
 | 
			
		||||
        f.write(models_json_string)
 | 
			
		||||
 | 
			
		||||
    # os.chdir(folder_path)
 | 
			
		||||
    # process = subprocess.Popen(f"modal deploy {folder_path}/app.py", stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True)
 | 
			
		||||
    process = await asyncio.subprocess.create_subprocess_shell(
 | 
			
		||||
        f"modal deploy app.py",
 | 
			
		||||
        stdout=asyncio.subprocess.PIPE,
 | 
			
		||||
        stderr=asyncio.subprocess.PIPE,
 | 
			
		||||
        cwd=folder_path,
 | 
			
		||||
        env={**os.environ, "COLUMNS": "10000"}
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    url = None
 | 
			
		||||
 | 
			
		||||
    if item.machine_id not in machine_logs_cache:
 | 
			
		||||
        machine_logs_cache[item.machine_id] = []
 | 
			
		||||
 | 
			
		||||
    machine_logs = machine_logs_cache[item.machine_id]
 | 
			
		||||
 | 
			
		||||
    url_queue = asyncio.Queue()
 | 
			
		||||
 | 
			
		||||
    async def read_stream(stream, isStderr, url_queue: asyncio.Queue):
 | 
			
		||||
        while True:
 | 
			
		||||
            line = await stream.readline()
 | 
			
		||||
            if line:
 | 
			
		||||
                l = line.decode('utf-8').strip()
 | 
			
		||||
 | 
			
		||||
                if l == "":
 | 
			
		||||
                    continue
 | 
			
		||||
 | 
			
		||||
                if not isStderr:
 | 
			
		||||
                    logger.info(l)
 | 
			
		||||
                    machine_logs.append({
 | 
			
		||||
                        "logs": l,
 | 
			
		||||
                        "timestamp": time.time()
 | 
			
		||||
                    })
 | 
			
		||||
 | 
			
		||||
                    if item.machine_id in machine_id_websocket_dict:
 | 
			
		||||
                        await machine_id_websocket_dict[item.machine_id].send_text(json.dumps({"event": "LOGS", "data": {
 | 
			
		||||
                            "machine_id": item.machine_id,
 | 
			
		||||
                            "logs": l,
 | 
			
		||||
                            "timestamp": time.time()
 | 
			
		||||
                        }}))
 | 
			
		||||
 | 
			
		||||
                    if "Created comfyui_api =>" in l or ((l.startswith("https://") or l.startswith("│")) and l.endswith(".modal.run")):
 | 
			
		||||
                        if "Created comfyui_api =>" in l:
 | 
			
		||||
                            url = l.split("=>")[1].strip()
 | 
			
		||||
                        # making sure it is a url
 | 
			
		||||
                        elif "comfyui-api" in l:
 | 
			
		||||
                            # Some case it only prints the url on a blank line
 | 
			
		||||
                            if l.startswith("│"):
 | 
			
		||||
                                url = l.split("│")[1].strip()
 | 
			
		||||
                            else:
 | 
			
		||||
                                url = l
 | 
			
		||||
 | 
			
		||||
                        if url:
 | 
			
		||||
                            machine_logs.append({
 | 
			
		||||
                                "logs": f"App image built, url: {url}",
 | 
			
		||||
                                "timestamp": time.time()
 | 
			
		||||
                            })
 | 
			
		||||
 | 
			
		||||
                            await url_queue.put(url)
 | 
			
		||||
 | 
			
		||||
                            if item.machine_id in machine_id_websocket_dict:
 | 
			
		||||
                                await machine_id_websocket_dict[item.machine_id].send_text(json.dumps({"event": "LOGS", "data": {
 | 
			
		||||
                                    "machine_id": item.machine_id,
 | 
			
		||||
                                    "logs": f"App image built, url: {url}",
 | 
			
		||||
                                    "timestamp": time.time()
 | 
			
		||||
                                }}))
 | 
			
		||||
                                await machine_id_websocket_dict[item.machine_id].send_text(json.dumps({"event": "FINISHED", "data": {
 | 
			
		||||
                                    "status": "succuss",
 | 
			
		||||
                                }}))
 | 
			
		||||
 | 
			
		||||
                else:
 | 
			
		||||
                    # is error
 | 
			
		||||
                    logger.error(l)
 | 
			
		||||
                    machine_logs.append({
 | 
			
		||||
                        "logs": l,
 | 
			
		||||
                        "timestamp": time.time()
 | 
			
		||||
                    })
 | 
			
		||||
 | 
			
		||||
                    if item.machine_id in machine_id_websocket_dict:
 | 
			
		||||
                        await machine_id_websocket_dict[item.machine_id].send_text(json.dumps({"event": "LOGS", "data": {
 | 
			
		||||
                            "machine_id": item.machine_id,
 | 
			
		||||
                            "logs": l,
 | 
			
		||||
                            "timestamp": time.time()
 | 
			
		||||
                        }}))
 | 
			
		||||
                        await machine_id_websocket_dict[item.machine_id].send_text(json.dumps({"event": "FINISHED", "data": {
 | 
			
		||||
                            "status": "failed",
 | 
			
		||||
                        }}))
 | 
			
		||||
            else:
 | 
			
		||||
                break
 | 
			
		||||
 | 
			
		||||
    stdout_task = asyncio.create_task(
 | 
			
		||||
        read_stream(process.stdout, False, url_queue))
 | 
			
		||||
    stderr_task = asyncio.create_task(
 | 
			
		||||
        read_stream(process.stderr, True, url_queue))
 | 
			
		||||
 | 
			
		||||
    await asyncio.wait([stdout_task, stderr_task])
 | 
			
		||||
 | 
			
		||||
    # Wait for the subprocess to finish
 | 
			
		||||
    await process.wait()
 | 
			
		||||
 | 
			
		||||
    if not url_queue.empty():
 | 
			
		||||
        # The queue is not empty, you can get an item
 | 
			
		||||
        url = await url_queue.get()
 | 
			
		||||
 | 
			
		||||
    # Close the ws connection and also pop the item
 | 
			
		||||
    if item.machine_id in machine_id_websocket_dict and machine_id_websocket_dict[item.machine_id] is not None:
 | 
			
		||||
        await machine_id_websocket_dict[item.machine_id].close()
 | 
			
		||||
 | 
			
		||||
    if item.machine_id in machine_id_websocket_dict:
 | 
			
		||||
        machine_id_websocket_dict.pop(item.machine_id)
 | 
			
		||||
 | 
			
		||||
    if item.machine_id in machine_id_status:
 | 
			
		||||
        machine_id_status[item.machine_id] = False
 | 
			
		||||
 | 
			
		||||
    # Check for errors
 | 
			
		||||
    if process.returncode != 0:
 | 
			
		||||
        logger.info("An error occurred.")
 | 
			
		||||
        # Send a post request with the json body machine_id to the callback url
 | 
			
		||||
        machine_logs.append({
 | 
			
		||||
            "logs": "Unable to build the app image.",
 | 
			
		||||
            "timestamp": time.time()
 | 
			
		||||
        })
 | 
			
		||||
        requests.post(item.callback_url, json={
 | 
			
		||||
                      "machine_id": item.machine_id, "build_log": json.dumps(machine_logs)})
 | 
			
		||||
 | 
			
		||||
        if item.machine_id in machine_logs_cache:
 | 
			
		||||
            del machine_logs_cache[item.machine_id]
 | 
			
		||||
 | 
			
		||||
        return
 | 
			
		||||
        # return JSONResponse(status_code=400, content={"error": "Unable to build the app image."})
 | 
			
		||||
 | 
			
		||||
    # app_suffix = "comfyui-app"
 | 
			
		||||
 | 
			
		||||
    if url is None:
 | 
			
		||||
        machine_logs.append({
 | 
			
		||||
            "logs": "App image built, but url is None, unable to parse the url.",
 | 
			
		||||
            "timestamp": time.time()
 | 
			
		||||
        })
 | 
			
		||||
        requests.post(item.callback_url, json={
 | 
			
		||||
                      "machine_id": item.machine_id, "build_log": json.dumps(machine_logs)})
 | 
			
		||||
 | 
			
		||||
        if item.machine_id in machine_logs_cache:
 | 
			
		||||
            del machine_logs_cache[item.machine_id]
 | 
			
		||||
 | 
			
		||||
        return
 | 
			
		||||
        # return JSONResponse(status_code=400, content={"error": "App image built, but url is None, unable to parse the url."})
 | 
			
		||||
    # example https://bennykok--my-app-comfyui-app.modal.run/
 | 
			
		||||
    # my_url = f"https://{MODAL_ORG}--{item.container_id}-{app_suffix}.modal.run"
 | 
			
		||||
 | 
			
		||||
    requests.post(item.callback_url, json={
 | 
			
		||||
                  "machine_id": item.machine_id, "endpoint": url, "build_log": json.dumps(machine_logs)})
 | 
			
		||||
    if item.machine_id in machine_logs_cache:
 | 
			
		||||
        del machine_logs_cache[item.machine_id]
 | 
			
		||||
 | 
			
		||||
    logger.info("done")
 | 
			
		||||
    logger.info(url)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def start_loop(loop):
 | 
			
		||||
    asyncio.set_event_loop(loop)
 | 
			
		||||
    loop.run_forever()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def run_in_new_thread(coroutine):
 | 
			
		||||
    new_loop = asyncio.new_event_loop()
 | 
			
		||||
    t = threading.Thread(target=start_loop, args=(new_loop,), daemon=True)
 | 
			
		||||
    t.start()
 | 
			
		||||
    asyncio.run_coroutine_threadsafe(coroutine, new_loop)
 | 
			
		||||
    return t
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
if __name__ == "__main__":
 | 
			
		||||
    import uvicorn
 | 
			
		||||
    # , log_level="debug"
 | 
			
		||||
    uvicorn.run("main:app", host="0.0.0.0", port=8080, lifespan="on")
 | 
			
		||||
@ -1,448 +0,0 @@
 | 
			
		||||
import modal
 | 
			
		||||
from typing import Union, Optional, Dict, List
 | 
			
		||||
from pydantic import BaseModel, Field, field_validator
 | 
			
		||||
from fastapi import FastAPI, HTTPException, WebSocket, BackgroundTasks, WebSocketDisconnect
 | 
			
		||||
from fastapi.responses import JSONResponse
 | 
			
		||||
from fastapi.logger import logger as fastapi_logger
 | 
			
		||||
import os
 | 
			
		||||
from enum import Enum
 | 
			
		||||
import json
 | 
			
		||||
import subprocess
 | 
			
		||||
import time
 | 
			
		||||
from contextlib import asynccontextmanager
 | 
			
		||||
import asyncio
 | 
			
		||||
import threading
 | 
			
		||||
import signal
 | 
			
		||||
import logging
 | 
			
		||||
from fastapi.logger import logger as fastapi_logger
 | 
			
		||||
import requests
 | 
			
		||||
from urllib.parse import parse_qs
 | 
			
		||||
from starlette.middleware.base import BaseHTTPMiddleware
 | 
			
		||||
from starlette.types import ASGIApp, Scope, Receive, Send
 | 
			
		||||
 | 
			
		||||
# Modal应用实例
 | 
			
		||||
modal_app = modal.App(name="comfyui-deploy")
 | 
			
		||||
 | 
			
		||||
gunicorn_error_logger = logging.getLogger("gunicorn.error")
 | 
			
		||||
gunicorn_logger = logging.getLogger("gunicorn")
 | 
			
		||||
uvicorn_access_logger = logging.getLogger("uvicorn.access")
 | 
			
		||||
uvicorn_access_logger.handlers = gunicorn_error_logger.handlers
 | 
			
		||||
 | 
			
		||||
fastapi_logger.handlers = gunicorn_error_logger.handlers
 | 
			
		||||
 | 
			
		||||
if __name__ != "__main__":
 | 
			
		||||
    fastapi_logger.setLevel(gunicorn_logger.level)
 | 
			
		||||
else:
 | 
			
		||||
    fastapi_logger.setLevel(logging.DEBUG)
 | 
			
		||||
 | 
			
		||||
logger = logging.getLogger("uvicorn")
 | 
			
		||||
logger.setLevel(logging.INFO)
 | 
			
		||||
 | 
			
		||||
last_activity_time = time.time()
 | 
			
		||||
global_timeout = 60 * 4
 | 
			
		||||
 | 
			
		||||
machine_id_websocket_dict = {}
 | 
			
		||||
machine_id_status = {}
 | 
			
		||||
machine_logs_cache = {}
 | 
			
		||||
 | 
			
		||||
fly_instance_id = os.environ.get('FLY_ALLOC_ID', 'local').split('-')[0]
 | 
			
		||||
 | 
			
		||||
class FlyReplayMiddleware(BaseHTTPMiddleware):
 | 
			
		||||
    def __init__(self, app: ASGIApp) -> None:
 | 
			
		||||
        super().__init__(app)
 | 
			
		||||
 | 
			
		||||
    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
 | 
			
		||||
        query_string = scope.get('query_string', b'').decode()
 | 
			
		||||
        query_params = parse_qs(query_string)
 | 
			
		||||
        target_instance = query_params.get('fly_instance_id', [fly_instance_id])[0]
 | 
			
		||||
 | 
			
		||||
        async def send_wrapper(message):
 | 
			
		||||
            if target_instance != fly_instance_id:
 | 
			
		||||
                if message['type'] == 'websocket.close' and 'Invalid session' in message.get('reason', ''):
 | 
			
		||||
                    message = {'type': 'websocket.accept'}
 | 
			
		||||
                if 'headers' not in message:
 | 
			
		||||
                    message['headers'] = []
 | 
			
		||||
                message['headers'].append([b'fly-replay', f'instance={target_instance}'.encode()])
 | 
			
		||||
            await send(message)
 | 
			
		||||
        await self.app(scope, receive, send_wrapper)
 | 
			
		||||
 | 
			
		||||
async def check_inactivity():
 | 
			
		||||
    global last_activity_time
 | 
			
		||||
    while True:
 | 
			
		||||
        if time.time() - last_activity_time > global_timeout:
 | 
			
		||||
            if len(machine_id_status) == 0:
 | 
			
		||||
                logger.info(f"No activity for {global_timeout} seconds, exiting...")
 | 
			
		||||
                os.kill(os.getpid(), signal.SIGINT)
 | 
			
		||||
                break
 | 
			
		||||
        await asyncio.sleep(1)
 | 
			
		||||
 | 
			
		||||
@asynccontextmanager
 | 
			
		||||
async def lifespan(app: FastAPI):
 | 
			
		||||
    thread = run_in_new_thread(check_inactivity())
 | 
			
		||||
    yield
 | 
			
		||||
    logger.info("Cancelling")
 | 
			
		||||
 | 
			
		||||
# FastAPI实例
 | 
			
		||||
fastapi_app = FastAPI(lifespan=lifespan)
 | 
			
		||||
fastapi_app.add_middleware(FlyReplayMiddleware)
 | 
			
		||||
 | 
			
		||||
class GitCustomNodes(BaseModel):
 | 
			
		||||
    hash: str
 | 
			
		||||
    disabled: bool
 | 
			
		||||
 | 
			
		||||
class FileCustomNodes(BaseModel):
 | 
			
		||||
    filename: str
 | 
			
		||||
    disabled: bool
 | 
			
		||||
 | 
			
		||||
class Snapshot(BaseModel):
 | 
			
		||||
    comfyui: str
 | 
			
		||||
    git_custom_nodes: Dict[str, GitCustomNodes]
 | 
			
		||||
    file_custom_nodes: List[FileCustomNodes]
 | 
			
		||||
 | 
			
		||||
class Model(BaseModel):
 | 
			
		||||
    name: str
 | 
			
		||||
    type: str
 | 
			
		||||
    base: str
 | 
			
		||||
    save_path: str
 | 
			
		||||
    description: str
 | 
			
		||||
    reference: str
 | 
			
		||||
    filename: str
 | 
			
		||||
    url: str
 | 
			
		||||
 | 
			
		||||
class GPUType(str, Enum):
 | 
			
		||||
    T4 = "T4"
 | 
			
		||||
    A10G = "A10G"
 | 
			
		||||
    A100 = "A100"
 | 
			
		||||
    L4 = "L4"
 | 
			
		||||
 | 
			
		||||
class Item(BaseModel):
 | 
			
		||||
    machine_id: str
 | 
			
		||||
    name: str
 | 
			
		||||
    snapshot: Snapshot
 | 
			
		||||
    models: List[Model]
 | 
			
		||||
    callback_url: str
 | 
			
		||||
    gpu: GPUType = Field(default=GPUType.T4)
 | 
			
		||||
 | 
			
		||||
    @field_validator('gpu')
 | 
			
		||||
    @classmethod
 | 
			
		||||
    def check_gpu(cls, value):
 | 
			
		||||
        if value not in GPUType.__members__:
 | 
			
		||||
            raise ValueError(f"Invalid GPU option. Choose from: {', '.join(GPUType.__members__.keys())}")
 | 
			
		||||
        return GPUType(value)
 | 
			
		||||
 | 
			
		||||
class StopAppItem(BaseModel):
 | 
			
		||||
    machine_id: str
 | 
			
		||||
 | 
			
		||||
@fastapi_app.get("/")
 | 
			
		||||
def read_root():
 | 
			
		||||
    global last_activity_time
 | 
			
		||||
    last_activity_time = time.time()
 | 
			
		||||
    logger.info(f"Extended inactivity time to {global_timeout}")
 | 
			
		||||
    return {"Hello": "World"}
 | 
			
		||||
 | 
			
		||||
@fastapi_app.websocket("/ws/{machine_id}")
 | 
			
		||||
async def websocket_endpoint(websocket: WebSocket, machine_id: str):
 | 
			
		||||
    await websocket.accept()
 | 
			
		||||
    machine_id_websocket_dict[machine_id] = websocket
 | 
			
		||||
    if machine_id in machine_logs_cache:
 | 
			
		||||
        combined_logs = "\n".join(log_entry['logs'] for log_entry in machine_logs_cache[machine_id])
 | 
			
		||||
        await websocket.send_text(json.dumps({
 | 
			
		||||
            "event": "LOGS", 
 | 
			
		||||
            "data": {
 | 
			
		||||
                "machine_id": machine_id,
 | 
			
		||||
                "logs": combined_logs,
 | 
			
		||||
                "timestamp": time.time()
 | 
			
		||||
            }
 | 
			
		||||
        }))
 | 
			
		||||
    try:
 | 
			
		||||
        while True:
 | 
			
		||||
            data = await websocket.receive_text()
 | 
			
		||||
            global last_activity_time
 | 
			
		||||
            last_activity_time = time.time()
 | 
			
		||||
            logger.info(f"Extended inactivity time to {global_timeout}")
 | 
			
		||||
    except WebSocketDisconnect:
 | 
			
		||||
        if machine_id in machine_id_websocket_dict:
 | 
			
		||||
            del machine_id_websocket_dict[machine_id]
 | 
			
		||||
 | 
			
		||||
@fastapi_app.post("/create")
 | 
			
		||||
async def create_machine(item: Item):
 | 
			
		||||
    global last_activity_time
 | 
			
		||||
    last_activity_time = time.time()
 | 
			
		||||
    logger.info(f"Extended inactivity time to {global_timeout}")
 | 
			
		||||
 | 
			
		||||
    if item.machine_id in machine_id_status and machine_id_status[item.machine_id]:
 | 
			
		||||
        return JSONResponse(status_code=400, content={"error": "Build already in progress."})
 | 
			
		||||
 | 
			
		||||
    task = asyncio.create_task(build_logic(item))
 | 
			
		||||
    return JSONResponse(
 | 
			
		||||
        status_code=200, 
 | 
			
		||||
        content={
 | 
			
		||||
            "message": "Build Queued",
 | 
			
		||||
            "build_machine_instance_id": fly_instance_id
 | 
			
		||||
        }
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
def find_app_id(app_list, app_name):
 | 
			
		||||
    for app in app_list:
 | 
			
		||||
        if app['Name'] == app_name:
 | 
			
		||||
            return app['App ID']
 | 
			
		||||
    return None
 | 
			
		||||
 | 
			
		||||
@fastapi_app.post("/stop-app")
 | 
			
		||||
async def stop_app(item: StopAppItem):
 | 
			
		||||
    cmd = f"modal app list --json"
 | 
			
		||||
    env = os.environ.copy()
 | 
			
		||||
    env["COLUMNS"] = "10000"
 | 
			
		||||
    
 | 
			
		||||
    find_id_process = await asyncio.subprocess.create_subprocess_shell(
 | 
			
		||||
        cmd,
 | 
			
		||||
        stdout=asyncio.subprocess.PIPE,
 | 
			
		||||
        stderr=asyncio.subprocess.PIPE,
 | 
			
		||||
        env=env
 | 
			
		||||
    )
 | 
			
		||||
    
 | 
			
		||||
    stdout, stderr = await find_id_process.communicate()
 | 
			
		||||
    if stdout:
 | 
			
		||||
        app_list = json.loads(stdout.decode().strip())
 | 
			
		||||
        app_id = find_app_id(app_list, item.machine_id)
 | 
			
		||||
        logger.info(f"cp_process stdout: {app_id}")
 | 
			
		||||
    if stderr:
 | 
			
		||||
        logger.info(f"cp_process stderr: {stderr.decode()}")
 | 
			
		||||
 | 
			
		||||
    cp_process = await asyncio.subprocess.create_subprocess_exec(
 | 
			
		||||
        "modal", "app", "stop", app_id,
 | 
			
		||||
        stdout=asyncio.subprocess.PIPE,
 | 
			
		||||
        stderr=asyncio.subprocess.PIPE,
 | 
			
		||||
    )
 | 
			
		||||
    
 | 
			
		||||
    await cp_process.wait()
 | 
			
		||||
    stdout, stderr = await cp_process.communicate()
 | 
			
		||||
    
 | 
			
		||||
    if stdout:
 | 
			
		||||
        logger.info(f"cp_process stdout: {stdout.decode()}")
 | 
			
		||||
    if stderr:
 | 
			
		||||
        logger.info(f"cp_process stderr: {stderr.decode()}")
 | 
			
		||||
 | 
			
		||||
    if cp_process.returncode == 0:
 | 
			
		||||
        return JSONResponse(status_code=200, content={"status": "success"})
 | 
			
		||||
    else:
 | 
			
		||||
        return JSONResponse(
 | 
			
		||||
            status_code=500, 
 | 
			
		||||
            content={"status": "error", "error": stderr.decode()}
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
async def build_logic(item: Item):
 | 
			
		||||
    folder_path = f"/app/builds/{item.machine_id}"
 | 
			
		||||
    machine_id_status[item.machine_id] = True
 | 
			
		||||
 | 
			
		||||
    cp_process = await asyncio.subprocess.create_subprocess_exec(
 | 
			
		||||
        "cp", "-r", "/app/src/template", folder_path
 | 
			
		||||
    )
 | 
			
		||||
    await cp_process.wait()
 | 
			
		||||
 | 
			
		||||
    config = {
 | 
			
		||||
        "name": item.name,
 | 
			
		||||
        "deploy_test": os.environ.get("DEPLOY_TEST_FLAG", "False"),
 | 
			
		||||
        "gpu": item.gpu,
 | 
			
		||||
        "civitai_token": os.environ.get("CIVITAI_TOKEN", "833b4ded5c7757a06a803763500bab58")
 | 
			
		||||
    }
 | 
			
		||||
    
 | 
			
		||||
    with open(f"{folder_path}/config.py", "w") as f:
 | 
			
		||||
        f.write("config = " + json.dumps(config))
 | 
			
		||||
 | 
			
		||||
    with open(f"{folder_path}/data/snapshot.json", "w") as f:
 | 
			
		||||
        f.write(item.snapshot.json())
 | 
			
		||||
 | 
			
		||||
    with open(f"{folder_path}/data/models.json", "w") as f:
 | 
			
		||||
        models_json_list = [model.dict() for model in item.models]
 | 
			
		||||
        f.write(json.dumps(models_json_list))
 | 
			
		||||
 | 
			
		||||
    process = await asyncio.subprocess.create_subprocess_shell(
 | 
			
		||||
        f"modal deploy app.py",
 | 
			
		||||
        stdout=asyncio.subprocess.PIPE,
 | 
			
		||||
        stderr=asyncio.subprocess.PIPE,
 | 
			
		||||
        cwd=folder_path,
 | 
			
		||||
        env={**os.environ, "COLUMNS": "10000"}
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    if item.machine_id not in machine_logs_cache:
 | 
			
		||||
        machine_logs_cache[item.machine_id] = []
 | 
			
		||||
 | 
			
		||||
    machine_logs = machine_logs_cache[item.machine_id]
 | 
			
		||||
    url_queue = asyncio.Queue()
 | 
			
		||||
 | 
			
		||||
    async def read_stream(stream, isStderr, url_queue: asyncio.Queue):
 | 
			
		||||
        while True:
 | 
			
		||||
            line = await stream.readline()
 | 
			
		||||
            if not line:
 | 
			
		||||
                break
 | 
			
		||||
 | 
			
		||||
            l = line.decode('utf-8').strip()
 | 
			
		||||
            if not l:
 | 
			
		||||
                continue
 | 
			
		||||
 | 
			
		||||
            if not isStderr:
 | 
			
		||||
                logger.info(l)
 | 
			
		||||
                machine_logs.append({
 | 
			
		||||
                    "logs": l,
 | 
			
		||||
                    "timestamp": time.time()
 | 
			
		||||
                })
 | 
			
		||||
 | 
			
		||||
                if item.machine_id in machine_id_websocket_dict:
 | 
			
		||||
                    await machine_id_websocket_dict[item.machine_id].send_text(
 | 
			
		||||
                        json.dumps({
 | 
			
		||||
                            "event": "LOGS",
 | 
			
		||||
                            "data": {
 | 
			
		||||
                                "machine_id": item.machine_id,
 | 
			
		||||
                                "logs": l,
 | 
			
		||||
                                "timestamp": time.time()
 | 
			
		||||
                            }
 | 
			
		||||
                        })
 | 
			
		||||
                    )
 | 
			
		||||
 | 
			
		||||
                if "Created comfyui_api =>" in l or ((l.startswith("https://") or l.startswith("│")) and l.endswith(".modal.run")):
 | 
			
		||||
                    if "Created comfyui_api =>" in l:
 | 
			
		||||
                        url = l.split("=>")[1].strip()
 | 
			
		||||
                    elif "comfyui-api" in l:
 | 
			
		||||
                        url = l.split("│")[1].strip() if l.startswith("│") else l
 | 
			
		||||
 | 
			
		||||
                    if url:
 | 
			
		||||
                        machine_logs.append({
 | 
			
		||||
                            "logs": f"App image built, url: {url}",
 | 
			
		||||
                            "timestamp": time.time()
 | 
			
		||||
                        })
 | 
			
		||||
 | 
			
		||||
                        await url_queue.put(url)
 | 
			
		||||
 | 
			
		||||
                        if item.machine_id in machine_id_websocket_dict:
 | 
			
		||||
                            await machine_id_websocket_dict[item.machine_id].send_text(
 | 
			
		||||
                                json.dumps({
 | 
			
		||||
                                    "event": "LOGS",
 | 
			
		||||
                                    "data": {
 | 
			
		||||
                                        "machine_id": item.machine_id,
 | 
			
		||||
                                        "logs": f"App image built, url: {url}",
 | 
			
		||||
                                        "timestamp": time.time()
 | 
			
		||||
                                    }
 | 
			
		||||
                                })
 | 
			
		||||
                            )
 | 
			
		||||
                            await machine_id_websocket_dict[item.machine_id].send_text(
 | 
			
		||||
                                json.dumps({
 | 
			
		||||
                                    "event": "FINISHED",
 | 
			
		||||
                                    "data": {
 | 
			
		||||
                                        "status": "success",
 | 
			
		||||
                                    }
 | 
			
		||||
                                })
 | 
			
		||||
                            )
 | 
			
		||||
 | 
			
		||||
            else:
 | 
			
		||||
                logger.error(l)
 | 
			
		||||
                machine_logs.append({
 | 
			
		||||
                    "logs": l,
 | 
			
		||||
                    "timestamp": time.time()
 | 
			
		||||
                })
 | 
			
		||||
 | 
			
		||||
                if item.machine_id in machine_id_websocket_dict:
 | 
			
		||||
                    await machine_id_websocket_dict[item.machine_id].send_text(
 | 
			
		||||
                        json.dumps({
 | 
			
		||||
                            "event": "LOGS",
 | 
			
		||||
                            "data": {
 | 
			
		||||
                                "machine_id": item.machine_id,
 | 
			
		||||
                                "logs": l,
 | 
			
		||||
                                "timestamp": time.time()
 | 
			
		||||
                            }
 | 
			
		||||
                        })
 | 
			
		||||
                    )
 | 
			
		||||
                    await machine_id_websocket_dict[item.machine_id].send_text(
 | 
			
		||||
                        json.dumps({
 | 
			
		||||
                            "event": "FINISHED",
 | 
			
		||||
                            "data": {
 | 
			
		||||
                                "status": "failed",
 | 
			
		||||
                            }
 | 
			
		||||
                        })
 | 
			
		||||
                    )
 | 
			
		||||
 | 
			
		||||
    stdout_task = asyncio.create_task(read_stream(process.stdout, False, url_queue))
 | 
			
		||||
    stderr_task = asyncio.create_task(read_stream(process.stderr, True, url_queue))
 | 
			
		||||
 | 
			
		||||
    await asyncio.wait([stdout_task, stderr_task])
 | 
			
		||||
    await process.wait()
 | 
			
		||||
 | 
			
		||||
    url = await url_queue.get() if not url_queue.empty() else None
 | 
			
		||||
 | 
			
		||||
    if item.machine_id in machine_id_websocket_dict and machine_id_websocket_dict[item.machine_id] is not None:
 | 
			
		||||
        await machine_id_websocket_dict[item.machine_id].close()
 | 
			
		||||
 | 
			
		||||
    if item.machine_id in machine_id_websocket_dict:
 | 
			
		||||
        del machine_id_websocket_dict[item.machine_id]
 | 
			
		||||
 | 
			
		||||
    if item.machine_id in machine_id_status:
 | 
			
		||||
        machine_id_status[item.machine_id] = False
 | 
			
		||||
 | 
			
		||||
    if process.returncode != 0:
 | 
			
		||||
        logger.info("An error occurred.")
 | 
			
		||||
        machine_logs.append({
 | 
			
		||||
            "logs": "Unable to build the app image.",
 | 
			
		||||
            "timestamp": time.time()
 | 
			
		||||
        })
 | 
			
		||||
        requests.post(
 | 
			
		||||
            item.callback_url,
 | 
			
		||||
            json={
 | 
			
		||||
                "machine_id": item.machine_id,
 | 
			
		||||
                "build_log": json.dumps(machine_logs)
 | 
			
		||||
            }
 | 
			
		||||
        )
 | 
			
		||||
        if item.machine_id in machine_logs_cache:
 | 
			
		||||
            del machine_logs_cache[item.machine_id]
 | 
			
		||||
        return
 | 
			
		||||
 | 
			
		||||
    if url is None:
 | 
			
		||||
        machine_logs.append({
 | 
			
		||||
            "logs": "App image built, but url is None, unable to parse the url.",
 | 
			
		||||
            "timestamp": time.time()
 | 
			
		||||
        })
 | 
			
		||||
        requests.post(
 | 
			
		||||
            item.callback_url,
 | 
			
		||||
            json={
 | 
			
		||||
                "machine_id": item.machine_id,
 | 
			
		||||
                "build_log": json.dumps(machine_logs)
 | 
			
		||||
            }
 | 
			
		||||
        )
 | 
			
		||||
        if item.machine_id in machine_logs_cache:
 | 
			
		||||
            del machine_logs_cache[item.machine_id]
 | 
			
		||||
        return
 | 
			
		||||
 | 
			
		||||
    requests.post(
 | 
			
		||||
        item.callback_url,
 | 
			
		||||
        json={
 | 
			
		||||
            "machine_id": item.machine_id,
 | 
			
		||||
            "endpoint": url,
 | 
			
		||||
            "build_log": json.dumps(machine_logs)
 | 
			
		||||
        }
 | 
			
		||||
    )
 | 
			
		||||
    
 | 
			
		||||
    if item.machine_id in machine_logs_cache:
 | 
			
		||||
        del machine_logs_cache[item.machine_id]
 | 
			
		||||
 | 
			
		||||
    logger.info("done")
 | 
			
		||||
    logger.info(url)
 | 
			
		||||
 | 
			
		||||
def start_loop(loop):
 | 
			
		||||
    asyncio.set_event_loop(loop)
 | 
			
		||||
    loop.run_forever()
 | 
			
		||||
 | 
			
		||||
def run_in_new_thread(coroutine):
 | 
			
		||||
    new_loop = asyncio.new_event_loop()
 | 
			
		||||
    t = threading.Thread(target=start_loop, args=(new_loop,), daemon=True)
 | 
			
		||||
    t.start()
 | 
			
		||||
    asyncio.run_coroutine_threadsafe(coroutine, new_loop)
 | 
			
		||||
    return t
 | 
			
		||||
 | 
			
		||||
# Modal endpoint
 | 
			
		||||
@modal_app.function()
 | 
			
		||||
@modal.asgi_app()
 | 
			
		||||
def app():
 | 
			
		||||
    return fastapi_app
 | 
			
		||||
 | 
			
		||||
if __name__ == "__main__":
 | 
			
		||||
    import uvicorn
 | 
			
		||||
    uvicorn.run(fastapi_app, host="0.0.0.0", port=8080, lifespan="on")
 | 
			
		||||
@ -307,5 +307,4 @@ def comfyui_app():
 | 
			
		||||
        },
 | 
			
		||||
    )()
 | 
			
		||||
 | 
			
		||||
    proxy_app = make_simple_proxy_app(ProxyContext(config)) # Assign to variable
 | 
			
		||||
    return proxy_app # Return the variable
 | 
			
		||||
    return make_simple_proxy_app(ProxyContext(config))
 | 
			
		||||
@ -1,57 +0,0 @@
 | 
			
		||||
import os
 | 
			
		||||
import io
 | 
			
		||||
import torchaudio
 | 
			
		||||
from folder_paths import get_annotated_filepath
 | 
			
		||||
 | 
			
		||||
class ComfyUIDeployExternalAudio:
 | 
			
		||||
    RETURN_TYPES = ("AUDIO",)
 | 
			
		||||
    RETURN_NAMES = ("audio",)
 | 
			
		||||
    FUNCTION = "load_audio"
 | 
			
		||||
    
 | 
			
		||||
    @classmethod
 | 
			
		||||
    def INPUT_TYPES(cls):
 | 
			
		||||
        return {
 | 
			
		||||
            "required": {
 | 
			
		||||
                "input_id": (
 | 
			
		||||
                    "STRING",
 | 
			
		||||
                    {"multiline": False, "default": "input_audio"},
 | 
			
		||||
                ),
 | 
			
		||||
                "audio_file": ("STRING", {"default": ""}),
 | 
			
		||||
            },
 | 
			
		||||
            "optional": {
 | 
			
		||||
                "default_value": ("AUDIO",),
 | 
			
		||||
                "display_name": (
 | 
			
		||||
                    "STRING",
 | 
			
		||||
                    {"multiline": False, "default": ""},
 | 
			
		||||
                ),
 | 
			
		||||
                "description": (
 | 
			
		||||
                    "STRING",
 | 
			
		||||
                    {"multiline": False, "default": ""},
 | 
			
		||||
                ),
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    
 | 
			
		||||
    @classmethod
 | 
			
		||||
    def VALIDATE_INPUTS(s, audio_file, **kwargs):
 | 
			
		||||
        return True
 | 
			
		||||
 | 
			
		||||
    def load_audio(self, input_id, audio_file, default_value=None, display_name=None, description=None):
 | 
			
		||||
        if audio_file and audio_file != "":
 | 
			
		||||
            if audio_file.startswith(('http://', 'https://')):
 | 
			
		||||
                # Handle URL input
 | 
			
		||||
                import requests
 | 
			
		||||
                response = requests.get(audio_file)
 | 
			
		||||
                audio_data = io.BytesIO(response.content)
 | 
			
		||||
                waveform, sample_rate = torchaudio.load(audio_data)
 | 
			
		||||
            else:
 | 
			
		||||
                # Handle local file
 | 
			
		||||
                audio_path = get_annotated_filepath(audio_file)
 | 
			
		||||
                waveform, sample_rate = torchaudio.load(audio_path)
 | 
			
		||||
            
 | 
			
		||||
            audio = {"waveform": waveform.unsqueeze(0), "sample_rate": sample_rate}
 | 
			
		||||
            return (audio,)
 | 
			
		||||
        else:
 | 
			
		||||
            return (default_value,)
 | 
			
		||||
 | 
			
		||||
NODE_CLASS_MAPPINGS = {"ComfyUIDeployExternalAudio": ComfyUIDeployExternalAudio}
 | 
			
		||||
NODE_DISPLAY_NAME_MAPPINGS = {"ComfyUIDeployExternalAudio": "External Audio (ComfyUI Deploy)"}
 | 
			
		||||
@ -1,108 +0,0 @@
 | 
			
		||||
from PIL import Image, ImageOps
 | 
			
		||||
import numpy as np
 | 
			
		||||
import torch
 | 
			
		||||
import folder_paths
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class AnyType(str):
 | 
			
		||||
    def __ne__(self, __value: object) -> bool:
 | 
			
		||||
        return False
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
WILDCARD = AnyType("*")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class ComfyUIDeployExternalFaceModel:
 | 
			
		||||
    @classmethod
 | 
			
		||||
    def INPUT_TYPES(s):
 | 
			
		||||
        return {
 | 
			
		||||
            "required": {
 | 
			
		||||
                "input_id": (
 | 
			
		||||
                    "STRING",
 | 
			
		||||
                    {"multiline": False, "default": "input_reactor_face_model"},
 | 
			
		||||
                ),
 | 
			
		||||
            },
 | 
			
		||||
            "optional": {
 | 
			
		||||
                "default_face_model_name": (
 | 
			
		||||
                    "STRING",
 | 
			
		||||
                    {"multiline": False, "default": ""},
 | 
			
		||||
                ),
 | 
			
		||||
                "face_model_save_name": (  # if `default_face_model_name` is a link to download a file, we will attempt to save it with this name
 | 
			
		||||
                    "STRING",
 | 
			
		||||
                    {"multiline": False, "default": ""},
 | 
			
		||||
                ),
 | 
			
		||||
                "display_name": (
 | 
			
		||||
                    "STRING",
 | 
			
		||||
                    {"multiline": False, "default": ""},
 | 
			
		||||
                ),
 | 
			
		||||
                "description": (
 | 
			
		||||
                    "STRING",
 | 
			
		||||
                    {"multiline": True, "default": ""},
 | 
			
		||||
                ),
 | 
			
		||||
                "face_model_url": (
 | 
			
		||||
                    "STRING",
 | 
			
		||||
                    {"multiline": False, "default": ""},
 | 
			
		||||
                ),
 | 
			
		||||
            },
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
    RETURN_TYPES = (WILDCARD,)
 | 
			
		||||
    RETURN_NAMES = ("path",)
 | 
			
		||||
 | 
			
		||||
    FUNCTION = "run"
 | 
			
		||||
 | 
			
		||||
    CATEGORY = "deploy"
 | 
			
		||||
 | 
			
		||||
    def run(
 | 
			
		||||
        self,
 | 
			
		||||
        input_id,
 | 
			
		||||
        default_face_model_name=None,
 | 
			
		||||
        face_model_save_name=None,
 | 
			
		||||
        display_name=None,
 | 
			
		||||
        description=None,
 | 
			
		||||
        face_model_url=None,
 | 
			
		||||
    ):
 | 
			
		||||
        import requests
 | 
			
		||||
        import os
 | 
			
		||||
        import uuid
 | 
			
		||||
 | 
			
		||||
        if face_model_url and face_model_url.startswith("http"):
 | 
			
		||||
            if face_model_save_name:
 | 
			
		||||
                existing_face_models = folder_paths.get_filename_list("reactor/faces")
 | 
			
		||||
                # Check if face_model_save_name exists in the list
 | 
			
		||||
                if face_model_save_name in existing_face_models:
 | 
			
		||||
                    print(f"using face model: {face_model_save_name}")
 | 
			
		||||
                    return (face_model_save_name,)
 | 
			
		||||
            else:
 | 
			
		||||
                face_model_save_name = str(uuid.uuid4()) + ".safetensors"
 | 
			
		||||
            print(face_model_save_name)
 | 
			
		||||
            print(folder_paths.folder_names_and_paths["reactor/faces"][0][0])
 | 
			
		||||
            destination_path = os.path.join(
 | 
			
		||||
                folder_paths.folder_names_and_paths["reactor/faces"][0][0],
 | 
			
		||||
                face_model_save_name,
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            print(destination_path)
 | 
			
		||||
            print(
 | 
			
		||||
                "Downloading external face model - "
 | 
			
		||||
                + face_model_url
 | 
			
		||||
                + " to "
 | 
			
		||||
                + destination_path
 | 
			
		||||
            )
 | 
			
		||||
            response = requests.get(
 | 
			
		||||
                face_model_url,
 | 
			
		||||
                headers={"User-Agent": "Mozilla/5.0"},
 | 
			
		||||
                allow_redirects=True,
 | 
			
		||||
            )
 | 
			
		||||
            with open(destination_path, "wb") as out_file:
 | 
			
		||||
                out_file.write(response.content)
 | 
			
		||||
            return (face_model_save_name,)
 | 
			
		||||
        else:
 | 
			
		||||
            print(f"using face model: {default_face_model_name}")
 | 
			
		||||
            return (default_face_model_name,)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
NODE_CLASS_MAPPINGS = {"ComfyUIDeployExternalFaceModel": ComfyUIDeployExternalFaceModel}
 | 
			
		||||
NODE_DISPLAY_NAME_MAPPINGS = {
 | 
			
		||||
    "ComfyUIDeployExternalFaceModel": "External Face Model (ComfyUI Deploy)"
 | 
			
		||||
}
 | 
			
		||||
@ -21,9 +21,8 @@ class ComfyUIDeployExternalImage:
 | 
			
		||||
                ),
 | 
			
		||||
                "description": (
 | 
			
		||||
                    "STRING",
 | 
			
		||||
                    {"multiline": False, "default": ""},
 | 
			
		||||
                    {"multiline": True, "default": ""},
 | 
			
		||||
                ),
 | 
			
		||||
                "default_value_url": ("STRING", {"image_preview": True, "default": ""}),
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
@ -34,44 +33,32 @@ class ComfyUIDeployExternalImage:
 | 
			
		||||
 | 
			
		||||
    CATEGORY = "image"
 | 
			
		||||
 | 
			
		||||
    def run(self, input_id, default_value=None, display_name=None, description=None, default_value_url=None):
 | 
			
		||||
    def run(self, input_id, default_value=None, display_name=None, description=None):
 | 
			
		||||
        image = default_value
 | 
			
		||||
        
 | 
			
		||||
        # Try both input_id and default_value_url
 | 
			
		||||
        urls_to_try = [url for url in [input_id, default_value_url] if url]
 | 
			
		||||
        
 | 
			
		||||
        print(default_value_url)
 | 
			
		||||
        
 | 
			
		||||
        for url in urls_to_try:
 | 
			
		||||
            try:
 | 
			
		||||
                if url.startswith('http'):
 | 
			
		||||
                    import requests
 | 
			
		||||
                    from io import BytesIO
 | 
			
		||||
                    print(f"Fetching image from url: {url}")
 | 
			
		||||
                    response = requests.get(url)
 | 
			
		||||
                    image = Image.open(BytesIO(response.content))
 | 
			
		||||
                    break
 | 
			
		||||
                elif url.startswith(('data:image/png;base64,', 'data:image/jpeg;base64,', 'data:image/jpg;base64,')):
 | 
			
		||||
                    import base64
 | 
			
		||||
                    from io import BytesIO
 | 
			
		||||
                    print("Decoding base64 image")
 | 
			
		||||
                    base64_image = url[url.find(",")+1:]
 | 
			
		||||
                    decoded_image = base64.b64decode(base64_image)
 | 
			
		||||
                    image = Image.open(BytesIO(decoded_image))
 | 
			
		||||
                    break
 | 
			
		||||
            except:
 | 
			
		||||
                continue
 | 
			
		||||
        
 | 
			
		||||
        if image is not None:
 | 
			
		||||
            try:
 | 
			
		||||
                image = ImageOps.exif_transpose(image)
 | 
			
		||||
                image = image.convert("RGB")
 | 
			
		||||
                image = np.array(image).astype(np.float32) / 255.0
 | 
			
		||||
                image = torch.from_numpy(image)[None,]
 | 
			
		||||
            except:
 | 
			
		||||
                pass
 | 
			
		||||
                
 | 
			
		||||
        return [image]
 | 
			
		||||
        try:
 | 
			
		||||
            if input_id.startswith('http'):
 | 
			
		||||
                import requests
 | 
			
		||||
                from io import BytesIO
 | 
			
		||||
                print("Fetching image from url: ", input_id)
 | 
			
		||||
                response = requests.get(input_id)
 | 
			
		||||
                image = Image.open(BytesIO(response.content))
 | 
			
		||||
            elif input_id.startswith('data:image/png;base64,') or input_id.startswith('data:image/jpeg;base64,') or input_id.startswith('data:image/jpg;base64,'):
 | 
			
		||||
                import base64
 | 
			
		||||
                from io import BytesIO
 | 
			
		||||
                print("Decoding base64 image")
 | 
			
		||||
                base64_image = input_id[input_id.find(",")+1:]
 | 
			
		||||
                decoded_image = base64.b64decode(base64_image)
 | 
			
		||||
                image = Image.open(BytesIO(decoded_image))
 | 
			
		||||
            else:
 | 
			
		||||
                raise ValueError("Invalid image url provided.")
 | 
			
		||||
 | 
			
		||||
            image = ImageOps.exif_transpose(image)
 | 
			
		||||
            image = image.convert("RGB")
 | 
			
		||||
            image = np.array(image).astype(np.float32) / 255.0
 | 
			
		||||
            image = torch.from_numpy(image)[None,]
 | 
			
		||||
            return [image]
 | 
			
		||||
        except:
 | 
			
		||||
            return [image]
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
NODE_CLASS_MAPPINGS = {"ComfyUIDeployExternalImage": ComfyUIDeployExternalImage}
 | 
			
		||||
 | 
			
		||||
@ -64,42 +64,32 @@ class ComfyUIDeployExternalLora:
 | 
			
		||||
        import os
 | 
			
		||||
        import uuid
 | 
			
		||||
 | 
			
		||||
        if lora_url:
 | 
			
		||||
            if lora_url.startswith("http"):
 | 
			
		||||
                if lora_save_name:
 | 
			
		||||
                    existing_loras = folder_paths.get_filename_list("loras")
 | 
			
		||||
                    # Check if lora_save_name exists in the list
 | 
			
		||||
                    if lora_save_name in existing_loras:
 | 
			
		||||
                        print(f"using lora: {lora_save_name}")
 | 
			
		||||
                        return (lora_save_name,)
 | 
			
		||||
                else:
 | 
			
		||||
                    lora_save_name = str(uuid.uuid4()) + ".safetensors"
 | 
			
		||||
                print(lora_save_name)
 | 
			
		||||
                print(folder_paths.folder_names_and_paths["loras"][0][0])
 | 
			
		||||
                destination_path = os.path.join(
 | 
			
		||||
                    folder_paths.folder_names_and_paths["loras"][0][0], lora_save_name
 | 
			
		||||
                )
 | 
			
		||||
                print(destination_path)
 | 
			
		||||
                print(
 | 
			
		||||
                    "Downloading external lora - "
 | 
			
		||||
                    + lora_url
 | 
			
		||||
                    + " to "
 | 
			
		||||
                    + destination_path
 | 
			
		||||
                )
 | 
			
		||||
                response = requests.get(
 | 
			
		||||
                    lora_url,
 | 
			
		||||
                    headers={"User-Agent": "Mozilla/5.0"},
 | 
			
		||||
                    allow_redirects=True,
 | 
			
		||||
                )
 | 
			
		||||
                with open(destination_path, "wb") as out_file:
 | 
			
		||||
                    out_file.write(response.content)
 | 
			
		||||
                print(f"Ext Lora loading: {lora_url} to {lora_save_name}")
 | 
			
		||||
                return (lora_save_name,)
 | 
			
		||||
        if lora_url and lora_url.startswith("http"):
 | 
			
		||||
            if lora_save_name:
 | 
			
		||||
                existing_loras = folder_paths.get_filename_list("loras")
 | 
			
		||||
                # Check if lora_save_name exists in the list
 | 
			
		||||
                if lora_save_name in existing_loras:
 | 
			
		||||
                    print(f"using lora: {lora_save_name}")
 | 
			
		||||
                    return (lora_save_name,)
 | 
			
		||||
            else:
 | 
			
		||||
                print(f"Ext Lora loading: {lora_url}")
 | 
			
		||||
                return (lora_url,)
 | 
			
		||||
                lora_save_name = str(uuid.uuid4()) + ".safetensors"
 | 
			
		||||
            print(lora_save_name)
 | 
			
		||||
            print(folder_paths.folder_names_and_paths["loras"][0][0])
 | 
			
		||||
            destination_path = os.path.join(
 | 
			
		||||
                folder_paths.folder_names_and_paths["loras"][0][0], lora_save_name
 | 
			
		||||
            )
 | 
			
		||||
            print(destination_path)
 | 
			
		||||
            print("Downloading external lora - " + lora_url + " to " + destination_path)
 | 
			
		||||
            response = requests.get(
 | 
			
		||||
                lora_url,
 | 
			
		||||
                headers={"User-Agent": "Mozilla/5.0"},
 | 
			
		||||
                allow_redirects=True,
 | 
			
		||||
            )
 | 
			
		||||
            with open(destination_path, "wb") as out_file:
 | 
			
		||||
                out_file.write(response.content)
 | 
			
		||||
            return (lora_save_name,)
 | 
			
		||||
        else:
 | 
			
		||||
            print(f"Ext Lora loading: {default_lora_name}")
 | 
			
		||||
            print(f"using lora: {default_lora_name}")
 | 
			
		||||
            return (default_lora_name,)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -1,53 +0,0 @@
 | 
			
		||||
import re
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class StringFunction:
 | 
			
		||||
    @classmethod
 | 
			
		||||
    def INPUT_TYPES(s):
 | 
			
		||||
        return {
 | 
			
		||||
            "required": {
 | 
			
		||||
                "action": (["append", "replace"], {}),
 | 
			
		||||
                "tidy_tags": (["yes", "no"], {}),
 | 
			
		||||
            },
 | 
			
		||||
            "optional": {
 | 
			
		||||
                "text_a": ("STRING", {"multiline": True, "dynamicPrompts": False}),
 | 
			
		||||
                "text_b": ("STRING", {"multiline": True, "dynamicPrompts": False}),
 | 
			
		||||
                "text_c": ("STRING", {"multiline": True, "dynamicPrompts": False}),
 | 
			
		||||
            },
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
    RETURN_TYPES = ("STRING",)
 | 
			
		||||
    FUNCTION = "exec"
 | 
			
		||||
    CATEGORY = "utils"
 | 
			
		||||
    OUTPUT_NODE = True
 | 
			
		||||
 | 
			
		||||
    def exec(self, action, tidy_tags, text_a="", text_b="", text_c=""):
 | 
			
		||||
        tidy_tags = tidy_tags == "yes"
 | 
			
		||||
        out = ""
 | 
			
		||||
        if action == "append":
 | 
			
		||||
            out = (", " if tidy_tags else "").join(
 | 
			
		||||
                filter(None, [text_a, text_b, text_c])
 | 
			
		||||
            )
 | 
			
		||||
        else:
 | 
			
		||||
            if text_c is None:
 | 
			
		||||
                text_c = ""
 | 
			
		||||
            if text_b.startswith("/") and text_b.endswith("/"):
 | 
			
		||||
                regex = text_b[1:-1]
 | 
			
		||||
                out = re.sub(regex, text_c, text_a)
 | 
			
		||||
            else:
 | 
			
		||||
                out = text_a.replace(text_b, text_c)
 | 
			
		||||
        if tidy_tags:
 | 
			
		||||
            out = re.sub(r"\s{2,}", " ", out)
 | 
			
		||||
            out = out.replace(" ,", ",")
 | 
			
		||||
            out = re.sub(r",{2,}", ",", out)
 | 
			
		||||
            out = out.strip()
 | 
			
		||||
        return {"ui": {"text": (out,)}, "result": (out,)}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
NODE_CLASS_MAPPINGS = {
 | 
			
		||||
    "ComfyUIDeployStringCombine": StringFunction,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
NODE_DISPLAY_NAME_MAPPINGS = {
 | 
			
		||||
    "ComfyUIDeployStringCombine": "String Combine (ComfyUI Deploy)",
 | 
			
		||||
}
 | 
			
		||||
@ -1,46 +0,0 @@
 | 
			
		||||
class AnyType(str):
 | 
			
		||||
    def __ne__(self, __value: object) -> bool:
 | 
			
		||||
        return False
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
WILDCARD = AnyType("*")
 | 
			
		||||
 | 
			
		||||
class ComfyUIDeployExternalTextAny:
 | 
			
		||||
    @classmethod
 | 
			
		||||
    def INPUT_TYPES(s):
 | 
			
		||||
        return {
 | 
			
		||||
            "required": {
 | 
			
		||||
                "input_id": (
 | 
			
		||||
                    "STRING",
 | 
			
		||||
                    {"multiline": False, "default": "input_text"},
 | 
			
		||||
                ),
 | 
			
		||||
            },
 | 
			
		||||
            "optional": {
 | 
			
		||||
                "default_value": (
 | 
			
		||||
                    "STRING",
 | 
			
		||||
                    {"multiline": True, "default": ""},
 | 
			
		||||
                ),
 | 
			
		||||
                "display_name": (
 | 
			
		||||
                    "STRING",
 | 
			
		||||
                    {"multiline": False, "default": ""},
 | 
			
		||||
                ),
 | 
			
		||||
                "description": (
 | 
			
		||||
                    "STRING",
 | 
			
		||||
                    {"multiline": True, "default": ""},
 | 
			
		||||
                ),
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
    RETURN_TYPES = (WILDCARD,)
 | 
			
		||||
    RETURN_NAMES = ("text",)
 | 
			
		||||
 | 
			
		||||
    FUNCTION = "run"
 | 
			
		||||
 | 
			
		||||
    CATEGORY = "text"
 | 
			
		||||
 | 
			
		||||
    def run(self, input_id, default_value=None, display_name=None, description=None):
 | 
			
		||||
        return [default_value]
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
NODE_CLASS_MAPPINGS = {"ComfyUIDeployExternalTextAny": ComfyUIDeployExternalTextAny}
 | 
			
		||||
NODE_DISPLAY_NAME_MAPPINGS = {"ComfyUIDeployExternalTextAny": "External Text Any (ComfyUI Deploy)"}
 | 
			
		||||
							
								
								
									
										52
									
								
								comfy-nodes/external_text_list.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										52
									
								
								comfy-nodes/external_text_list.py
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,52 @@
 | 
			
		||||
import folder_paths
 | 
			
		||||
from PIL import Image, ImageOps
 | 
			
		||||
import numpy as np
 | 
			
		||||
import torch
 | 
			
		||||
import json
 | 
			
		||||
 | 
			
		||||
class ComfyUIDeployExternalTextList:
 | 
			
		||||
    @classmethod
 | 
			
		||||
    def INPUT_TYPES(s):
 | 
			
		||||
        return {
 | 
			
		||||
            "required": {
 | 
			
		||||
                "input_id": (
 | 
			
		||||
                    "STRING",
 | 
			
		||||
                    {"multiline": False, "default": 'input_text_list'},
 | 
			
		||||
                ),
 | 
			
		||||
                 "text": (
 | 
			
		||||
                    "STRING",
 | 
			
		||||
                    {"multiline": True, "default": "[]"},
 | 
			
		||||
                ),
 | 
			
		||||
            },
 | 
			
		||||
            "optional": {
 | 
			
		||||
                "display_name": (
 | 
			
		||||
                    "STRING",
 | 
			
		||||
                    {"multiline": False, "default": ""},
 | 
			
		||||
                ),
 | 
			
		||||
                "description": (
 | 
			
		||||
                    "STRING",
 | 
			
		||||
                    {"multiline": True, "default": ""},
 | 
			
		||||
                ),
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
    RETURN_TYPES = ("STRING",)
 | 
			
		||||
    RETURN_NAMES = ("text",)
 | 
			
		||||
 | 
			
		||||
    OUTPUT_IS_LIST = (True,)
 | 
			
		||||
 | 
			
		||||
    FUNCTION = "run"
 | 
			
		||||
 | 
			
		||||
    CATEGORY = "text"
 | 
			
		||||
 | 
			
		||||
    def run(self, input_id, text=None, display_name=None, description=None):
 | 
			
		||||
        text_list = []
 | 
			
		||||
        try:
 | 
			
		||||
            text_list = json.loads(text)  # Assuming text is a JSON array string
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            print(f"Error processing images: {e}")
 | 
			
		||||
            pass
 | 
			
		||||
        return ([text_list],)
 | 
			
		||||
 | 
			
		||||
NODE_CLASS_MAPPINGS = {"ComfyUIDeployExternalTextList": ComfyUIDeployExternalTextList}
 | 
			
		||||
NODE_DISPLAY_NAME_MAPPINGS = {"ComfyUIDeployExternalTextList": "External Text List (ComfyUI Deploy)"}
 | 
			
		||||
@ -1,60 +0,0 @@
 | 
			
		||||
import folder_paths
 | 
			
		||||
class AnyType(str):
 | 
			
		||||
    def __ne__(self, __value: object) -> bool:
 | 
			
		||||
        return False
 | 
			
		||||
 | 
			
		||||
from os import walk
 | 
			
		||||
 | 
			
		||||
WILDCARD = AnyType("*")
 | 
			
		||||
 | 
			
		||||
MODEL_EXTENSIONS = {
 | 
			
		||||
    "safetensors": "SafeTensors file format",
 | 
			
		||||
    "ckpt": "Checkpoint file",
 | 
			
		||||
    "pth": "PyTorch serialized file",
 | 
			
		||||
    "pkl": "Pickle file",
 | 
			
		||||
    "onnx": "ONNX file",
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
def fetch_files(path):
 | 
			
		||||
    for (dirpath, dirnames, filenames) in walk(path):
 | 
			
		||||
        fs = []
 | 
			
		||||
        if len(dirnames) > 0:
 | 
			
		||||
            for dirname in dirnames:
 | 
			
		||||
                fs.extend(fetch_files(f"{dirpath}/{dirname}"))
 | 
			
		||||
        for filename in filenames:
 | 
			
		||||
            # Remove "./models/" from the beginning of dirpath
 | 
			
		||||
            relative_dirpath = dirpath.replace("./models/", "", 1)
 | 
			
		||||
            file_path = f"{relative_dirpath}/{filename}"
 | 
			
		||||
            
 | 
			
		||||
            # Only add files that are known model extensions
 | 
			
		||||
            file_extension = filename.split('.')[-1].lower()
 | 
			
		||||
            if file_extension in MODEL_EXTENSIONS:
 | 
			
		||||
                fs.append(file_path)
 | 
			
		||||
 | 
			
		||||
        return fs
 | 
			
		||||
allModels = fetch_files("./models")
 | 
			
		||||
 | 
			
		||||
class ComfyUIDeployModalList:
 | 
			
		||||
    @classmethod
 | 
			
		||||
    def INPUT_TYPES(s):
 | 
			
		||||
        return {
 | 
			
		||||
            "required": {
 | 
			
		||||
                "model": (allModels, ),
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
    RETURN_TYPES = (WILDCARD,)
 | 
			
		||||
    RETURN_NAMES = ("model",)
 | 
			
		||||
 | 
			
		||||
    FUNCTION = "run"
 | 
			
		||||
 | 
			
		||||
    CATEGORY = "model"
 | 
			
		||||
 | 
			
		||||
    def run(self, model=""):
 | 
			
		||||
        # Split the model path by '/' and select the last item
 | 
			
		||||
        model_name = model.split('/')[-1]
 | 
			
		||||
        return [model_name]
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
NODE_CLASS_MAPPINGS = {"ComfyUIDeployModelList": ComfyUIDeployModalList}
 | 
			
		||||
NODE_DISPLAY_NAME_MAPPINGS = {"ComfyUIDeployModelList": "Model List (ComfyUI Deploy)"}
 | 
			
		||||
@ -1,92 +0,0 @@
 | 
			
		||||
import os
 | 
			
		||||
import json
 | 
			
		||||
import numpy as np
 | 
			
		||||
from PIL import Image
 | 
			
		||||
from PIL.PngImagePlugin import PngInfo
 | 
			
		||||
import folder_paths
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class ComfyDeployOutputImage:
 | 
			
		||||
    def __init__(self):
 | 
			
		||||
        self.output_dir = folder_paths.get_output_directory()
 | 
			
		||||
        self.type = "output"
 | 
			
		||||
        self.prefix_append = ""
 | 
			
		||||
        self.compress_level = 4
 | 
			
		||||
 | 
			
		||||
    @classmethod
 | 
			
		||||
    def INPUT_TYPES(s):
 | 
			
		||||
        return {
 | 
			
		||||
            "required": {
 | 
			
		||||
                "images": ("IMAGE", {"tooltip": "The images to save."}),
 | 
			
		||||
                "filename_prefix": (
 | 
			
		||||
                    "STRING",
 | 
			
		||||
                    {
 | 
			
		||||
                        "default": "ComfyUI",
 | 
			
		||||
                        "tooltip": "The prefix for the file to save. This may include formatting information such as %date:yyyy-MM-dd% or %Empty Latent Image.width% to include values from nodes.",
 | 
			
		||||
                    },
 | 
			
		||||
                ),
 | 
			
		||||
                "file_type": (["png", "jpg", "webp"], {"default": "webp"}),
 | 
			
		||||
                "quality": ("INT", {"default": 80, "min": 1, "max": 100, "step": 1}),
 | 
			
		||||
            },
 | 
			
		||||
            "hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO"},
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
    RETURN_TYPES = ()
 | 
			
		||||
    FUNCTION = "run"
 | 
			
		||||
 | 
			
		||||
    OUTPUT_NODE = True
 | 
			
		||||
 | 
			
		||||
    CATEGORY = "output"
 | 
			
		||||
    DESCRIPTION = "Saves the input images to your ComfyUI output directory."
 | 
			
		||||
 | 
			
		||||
    def run(
 | 
			
		||||
        self,
 | 
			
		||||
        images,
 | 
			
		||||
        filename_prefix="ComfyUI",
 | 
			
		||||
        file_type="png",
 | 
			
		||||
        quality=80,
 | 
			
		||||
        prompt=None,
 | 
			
		||||
        extra_pnginfo=None,
 | 
			
		||||
    ):
 | 
			
		||||
        filename_prefix += self.prefix_append
 | 
			
		||||
        full_output_folder, filename, counter, subfolder, filename_prefix = (
 | 
			
		||||
            folder_paths.get_save_image_path(
 | 
			
		||||
                filename_prefix, self.output_dir, images[0].shape[1], images[0].shape[0]
 | 
			
		||||
            )
 | 
			
		||||
        )
 | 
			
		||||
        results = list()
 | 
			
		||||
        for batch_number, image in enumerate(images):
 | 
			
		||||
            i = 255.0 * image.cpu().numpy()
 | 
			
		||||
            img = Image.fromarray(np.clip(i, 0, 255).astype(np.uint8))
 | 
			
		||||
            metadata = PngInfo()
 | 
			
		||||
            if prompt is not None:
 | 
			
		||||
                metadata.add_text("prompt", json.dumps(prompt))
 | 
			
		||||
            if extra_pnginfo is not None:
 | 
			
		||||
                for x in extra_pnginfo:
 | 
			
		||||
                    metadata.add_text(x, json.dumps(extra_pnginfo[x]))
 | 
			
		||||
 | 
			
		||||
            filename_with_batch_num = filename.replace("%batch_num%", str(batch_number))
 | 
			
		||||
            file = f"{filename_with_batch_num}_{counter:05}_.{file_type}"
 | 
			
		||||
            file_path = os.path.join(full_output_folder, file)
 | 
			
		||||
 | 
			
		||||
            if file_type == "png":
 | 
			
		||||
                img.save(
 | 
			
		||||
                    file_path, pnginfo=metadata, compress_level=self.compress_level
 | 
			
		||||
                )
 | 
			
		||||
            elif file_type == "jpg":
 | 
			
		||||
                img.save(file_path, quality=quality, optimize=True)
 | 
			
		||||
            elif file_type == "webp":
 | 
			
		||||
                img.save(file_path, quality=quality)
 | 
			
		||||
 | 
			
		||||
            results.append(
 | 
			
		||||
                {"filename": file, "subfolder": subfolder, "type": self.type}
 | 
			
		||||
            )
 | 
			
		||||
            counter += 1
 | 
			
		||||
 | 
			
		||||
        return {"ui": {"images": results}}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
NODE_CLASS_MAPPINGS = {"ComfyDeployOutputImage": ComfyDeployOutputImage}
 | 
			
		||||
NODE_DISPLAY_NAME_MAPPINGS = {
 | 
			
		||||
    "ComfyDeployOutputImage": "Image Output (ComfyDeploy)"
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										1362
									
								
								custom_routes.py
									
									
									
									
									
								
							
							
						
						
									
										1362
									
								
								custom_routes.py
									
									
									
									
									
								
							
										
											
												File diff suppressed because it is too large
												Load Diff
											
										
									
								
							
							
								
								
									
										54
									
								
								globals.py
									
									
									
									
									
								
							
							
						
						
									
										54
									
								
								globals.py
									
									
									
									
									
								
							@ -6,12 +6,10 @@ from PIL import Image, ImageOps
 | 
			
		||||
from io import BytesIO
 | 
			
		||||
from pydantic import BaseModel as PydanticBaseModel
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class BaseModel(PydanticBaseModel):
 | 
			
		||||
    class Config:
 | 
			
		||||
        arbitrary_types_allowed = True
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
        
 | 
			
		||||
class Status(Enum):
 | 
			
		||||
    NOT_STARTED = "not-started"
 | 
			
		||||
    RUNNING = "running"
 | 
			
		||||
@ -19,7 +17,6 @@ class Status(Enum):
 | 
			
		||||
    FAILED = "failed"
 | 
			
		||||
    UPLOADING = "uploading"
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class StreamingPrompt(BaseModel):
 | 
			
		||||
    workflow_api: Any
 | 
			
		||||
    auth_token: str
 | 
			
		||||
@ -27,52 +24,44 @@ class StreamingPrompt(BaseModel):
 | 
			
		||||
    running_prompt_ids: set[str] = set()
 | 
			
		||||
    status_endpoint: Optional[str]
 | 
			
		||||
    file_upload_endpoint: Optional[str]
 | 
			
		||||
    workflow: Any
 | 
			
		||||
    gpu_event_id: Optional[str] = None
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    
 | 
			
		||||
class SimplePrompt(BaseModel):
 | 
			
		||||
    status_endpoint: Optional[str]
 | 
			
		||||
    file_upload_endpoint: Optional[str]
 | 
			
		||||
 | 
			
		||||
    
 | 
			
		||||
    token: Optional[str]
 | 
			
		||||
 | 
			
		||||
    
 | 
			
		||||
    workflow_api: dict
 | 
			
		||||
    status: Status = Status.NOT_STARTED
 | 
			
		||||
    progress: set = set()
 | 
			
		||||
    last_updated_node: Optional[str] = None
 | 
			
		||||
    last_updated_node: Optional[str] = None,
 | 
			
		||||
    uploading_nodes: set = set()
 | 
			
		||||
    done: bool = False
 | 
			
		||||
    is_realtime: bool = False
 | 
			
		||||
    start_time: Optional[float] = None
 | 
			
		||||
    gpu_event_id: Optional[str] = None
 | 
			
		||||
 | 
			
		||||
    is_realtime: bool = False,
 | 
			
		||||
    start_time: Optional[float] = None,
 | 
			
		||||
 | 
			
		||||
sockets = dict()
 | 
			
		||||
prompt_metadata: dict[str, SimplePrompt] = {}
 | 
			
		||||
streaming_prompt_metadata: dict[str, StreamingPrompt] = {}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class BinaryEventTypes:
 | 
			
		||||
    PREVIEW_IMAGE = 1
 | 
			
		||||
    UNENCODED_PREVIEW_IMAGE = 2
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    
 | 
			
		||||
max_output_id_length = 24
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def send_image(image_data, sid=None, output_id: str = None):
 | 
			
		||||
async def send_image(image_data, sid=None, output_id:str = None):
 | 
			
		||||
    max_length = max_output_id_length
 | 
			
		||||
    output_id = output_id[:max_length]
 | 
			
		||||
    padded_output_id = output_id.ljust(max_length, "\x00")
 | 
			
		||||
    encoded_output_id = padded_output_id.encode("ascii", "replace")
 | 
			
		||||
 | 
			
		||||
    padded_output_id = output_id.ljust(max_length, '\x00')
 | 
			
		||||
    encoded_output_id = padded_output_id.encode('ascii', 'replace')
 | 
			
		||||
    
 | 
			
		||||
    image_type = image_data[0]
 | 
			
		||||
    image = image_data[1]
 | 
			
		||||
    max_size = image_data[2]
 | 
			
		||||
    quality = image_data[3]
 | 
			
		||||
    if max_size is not None:
 | 
			
		||||
        if hasattr(Image, "Resampling"):
 | 
			
		||||
        if hasattr(Image, 'Resampling'):
 | 
			
		||||
            resampling = Image.Resampling.BILINEAR
 | 
			
		||||
        else:
 | 
			
		||||
            resampling = Image.ANTIALIAS
 | 
			
		||||
@ -96,23 +85,17 @@ async def send_image(image_data, sid=None, output_id: str = None):
 | 
			
		||||
    position_after = bytesIO.tell()
 | 
			
		||||
    bytes_written = position_after - position_before
 | 
			
		||||
    print(f"Bytes written: {bytes_written}")
 | 
			
		||||
 | 
			
		||||
    
 | 
			
		||||
    image.save(bytesIO, format=image_type, quality=quality, compress_level=1)
 | 
			
		||||
    preview_bytes = bytesIO.getvalue()
 | 
			
		||||
    await send_bytes(BinaryEventTypes.PREVIEW_IMAGE, preview_bytes, sid=sid)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
        
 | 
			
		||||
async def send_socket_catch_exception(function, message):
 | 
			
		||||
    try:
 | 
			
		||||
        await function(message)
 | 
			
		||||
    except (
 | 
			
		||||
        aiohttp.ClientError,
 | 
			
		||||
        aiohttp.ClientPayloadError,
 | 
			
		||||
        ConnectionResetError,
 | 
			
		||||
    ) as err:
 | 
			
		||||
    except (aiohttp.ClientError, aiohttp.ClientPayloadError, ConnectionResetError) as err:
 | 
			
		||||
        print("send error:", err)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def encode_bytes(event, data):
 | 
			
		||||
    if not isinstance(event, int):
 | 
			
		||||
        raise RuntimeError(f"Binary event types must be integers, got {event}")
 | 
			
		||||
@ -122,10 +105,9 @@ def encode_bytes(event, data):
 | 
			
		||||
    message.extend(data)
 | 
			
		||||
    return message
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def send_bytes(event, data, sid=None):
 | 
			
		||||
    message = encode_bytes(event, data)
 | 
			
		||||
 | 
			
		||||
    
 | 
			
		||||
    print("sending image to ", event, sid)
 | 
			
		||||
 | 
			
		||||
    if sid is None:
 | 
			
		||||
@ -133,4 +115,4 @@ async def send_bytes(event, data, sid=None):
 | 
			
		||||
        for ws in _sockets:
 | 
			
		||||
            await send_socket_catch_exception(ws.send_bytes, message)
 | 
			
		||||
    elif sid in sockets:
 | 
			
		||||
        await send_socket_catch_exception(sockets[sid].send_bytes, message)
 | 
			
		||||
        await send_socket_catch_exception(sockets[sid].send_bytes, message)
 | 
			
		||||
@ -1,8 +1,8 @@
 | 
			
		||||
[project]
 | 
			
		||||
name = "comfyui-deploy"
 | 
			
		||||
description = "Open source comfyui deployment platform, a vercel for generative workflow infra."
 | 
			
		||||
version = "1.1.0"
 | 
			
		||||
license = { file = "LICENSE" }
 | 
			
		||||
version = "1.0.0"
 | 
			
		||||
license = "LICENSE"
 | 
			
		||||
dependencies = ["aiofiles", "pydantic", "opencv-python", "imageio-ffmpeg"]
 | 
			
		||||
 | 
			
		||||
[project.urls]
 | 
			
		||||
 | 
			
		||||
@ -3,5 +3,4 @@ pydantic
 | 
			
		||||
opencv-python
 | 
			
		||||
imageio-ffmpeg
 | 
			
		||||
brotli
 | 
			
		||||
tabulate
 | 
			
		||||
# logfire
 | 
			
		||||
							
								
								
									
										4
									
								
								web-plugin/api.js
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										4
									
								
								web-plugin/api.js
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,4 @@
 | 
			
		||||
/** @typedef {import('../../../web/scripts/api.js').api} API*/
 | 
			
		||||
import { api as _api } from '../../scripts/api.js';
 | 
			
		||||
/** @type {API} */
 | 
			
		||||
export const api = _api;
 | 
			
		||||
							
								
								
									
										4
									
								
								web-plugin/app.js
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										4
									
								
								web-plugin/app.js
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,4 @@
 | 
			
		||||
/** @typedef {import('../../../web/scripts/app.js').ComfyApp} ComfyApp*/
 | 
			
		||||
import { app as _app } from '../../scripts/app.js';
 | 
			
		||||
/** @type {ComfyApp} */
 | 
			
		||||
export const app = _app;
 | 
			
		||||
										
											
												File diff suppressed because it is too large
												Load Diff
											
										
									
								
							
							
								
								
									
										18
									
								
								web-plugin/widgets.js
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										18
									
								
								web-plugin/widgets.js
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,18 @@
 | 
			
		||||
// /** @typedef {import('../../../web/scripts/api.js').api} API*/
 | 
			
		||||
// import { api as _api } from "../../scripts/api.js";
 | 
			
		||||
// /** @type {API} */
 | 
			
		||||
// export const api = _api;
 | 
			
		||||
 | 
			
		||||
/** @typedef {typeof import('../../../web/scripts/widgets.js').ComfyWidgets} Widgets*/
 | 
			
		||||
import { ComfyWidgets as _ComfyWidgets } from "../../scripts/widgets.js";
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * @type {Widgets}
 | 
			
		||||
 */
 | 
			
		||||
export const ComfyWidgets = _ComfyWidgets;
 | 
			
		||||
 | 
			
		||||
// import { LGraphNode as _LGraphNode } from "../../types/litegraph.js";
 | 
			
		||||
 | 
			
		||||
/** @typedef {typeof import('../../../web/types/litegraph.js').LGraphNode} LGraphNode*/
 | 
			
		||||
/** @type {LGraphNode}*/
 | 
			
		||||
export const LGraphNode = LiteGraph.LGraphNode;
 | 
			
		||||
@ -6,5 +6,4 @@ export const customInputNodes: Record<string, string> = {
 | 
			
		||||
  ComfyUIDeployExternalNumberInt: "integer",
 | 
			
		||||
  ComfyUIDeployExternalLora: "string - (public lora download url)",
 | 
			
		||||
  ComfyUIDeployExternalCheckpoint: "string - (public checkpoints download url)",
 | 
			
		||||
  ComfyUIDeployExternalFaceModel: "string - (public face model download url)",
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user