Compare commits
No commits in common. "main" and "benny/log-sync" have entirely different histories.
main
...
benny/log-
1
.gitignore
vendored
1
.gitignore
vendored
@ -1,3 +1,2 @@
|
||||
__pycache__
|
||||
.DS_Store
|
||||
file-hash-cache.json
|
||||
|
504
builder/modal-builder/src/main.py
Normal file
504
builder/modal-builder/src/main.py
Normal file
@ -0,0 +1,504 @@
|
||||
from typing import Union, Optional, Dict, List
|
||||
from pydantic import BaseModel, Field, field_validator
|
||||
from fastapi import FastAPI, HTTPException, WebSocket, BackgroundTasks, WebSocketDisconnect
|
||||
from fastapi.responses import JSONResponse
|
||||
from fastapi.logger import logger as fastapi_logger
|
||||
import os
|
||||
from enum import Enum
|
||||
import json
|
||||
import subprocess
|
||||
import time
|
||||
from contextlib import asynccontextmanager
|
||||
import asyncio
|
||||
import threading
|
||||
import signal
|
||||
import logging
|
||||
from fastapi.logger import logger as fastapi_logger
|
||||
import requests
|
||||
from urllib.parse import parse_qs
|
||||
from starlette.middleware.base import BaseHTTPMiddleware
|
||||
from starlette.types import ASGIApp, Scope, Receive, Send
|
||||
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
# executor = ThreadPoolExecutor(max_workers=5)
|
||||
|
||||
gunicorn_error_logger = logging.getLogger("gunicorn.error")
|
||||
gunicorn_logger = logging.getLogger("gunicorn")
|
||||
uvicorn_access_logger = logging.getLogger("uvicorn.access")
|
||||
uvicorn_access_logger.handlers = gunicorn_error_logger.handlers
|
||||
|
||||
fastapi_logger.handlers = gunicorn_error_logger.handlers
|
||||
|
||||
if __name__ != "__main__":
|
||||
fastapi_logger.setLevel(gunicorn_logger.level)
|
||||
else:
|
||||
fastapi_logger.setLevel(logging.DEBUG)
|
||||
|
||||
logger = logging.getLogger("uvicorn")
|
||||
logger.setLevel(logging.INFO)
|
||||
|
||||
last_activity_time = time.time()
|
||||
global_timeout = 60 * 4
|
||||
|
||||
machine_id_websocket_dict = {}
|
||||
machine_id_status = {}
|
||||
|
||||
fly_instance_id = os.environ.get('FLY_ALLOC_ID', 'local').split('-')[0]
|
||||
|
||||
|
||||
class FlyReplayMiddleware(BaseHTTPMiddleware):
|
||||
"""
|
||||
If the wrong instance was picked by the fly.io load balancer we use the fly-replay header
|
||||
to repeat the request again on the right instance.
|
||||
|
||||
This only works if the right instance is provided as a query_string parameter.
|
||||
"""
|
||||
|
||||
def __init__(self, app: ASGIApp) -> None:
|
||||
self.app = app
|
||||
|
||||
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
|
||||
query_string = scope.get('query_string', b'').decode()
|
||||
query_params = parse_qs(query_string)
|
||||
target_instance = query_params.get(
|
||||
'fly_instance_id', [fly_instance_id])[0]
|
||||
|
||||
async def send_wrapper(message):
|
||||
if target_instance != fly_instance_id:
|
||||
if message['type'] == 'websocket.close' and 'Invalid session' in message['reason']:
|
||||
# fly.io only seems to look at the fly-replay header if websocket is accepted
|
||||
message = {'type': 'websocket.accept'}
|
||||
if 'headers' not in message:
|
||||
message['headers'] = []
|
||||
message['headers'].append(
|
||||
[b'fly-replay', f'instance={target_instance}'.encode()])
|
||||
await send(message)
|
||||
await self.app(scope, receive, send_wrapper)
|
||||
|
||||
|
||||
async def check_inactivity():
|
||||
global last_activity_time
|
||||
while True:
|
||||
# logger.info("Checking inactivity...")
|
||||
if time.time() - last_activity_time > global_timeout:
|
||||
if len(machine_id_status) == 0:
|
||||
# The application has been inactive for more than 60 seconds.
|
||||
# Scale it down to zero here.
|
||||
logger.info(
|
||||
f"No activity for {global_timeout} seconds, exiting...")
|
||||
# os._exit(0)
|
||||
os.kill(os.getpid(), signal.SIGINT)
|
||||
break
|
||||
else:
|
||||
pass
|
||||
# logger.info(f"Timeout but still in progress")
|
||||
|
||||
await asyncio.sleep(1) # Check every second
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI):
|
||||
thread = run_in_new_thread(check_inactivity())
|
||||
yield
|
||||
logger.info("Cancelling")
|
||||
|
||||
#
|
||||
app = FastAPI(lifespan=lifespan)
|
||||
app.add_middleware(FlyReplayMiddleware)
|
||||
# MODAL_ORG = os.environ.get("MODAL_ORG")
|
||||
|
||||
|
||||
@app.get("/")
|
||||
def read_root():
|
||||
global last_activity_time
|
||||
last_activity_time = time.time()
|
||||
logger.info(f"Extended inactivity time to {global_timeout}")
|
||||
return {"Hello": "World"}
|
||||
|
||||
# create a post route called /create takes in a json of example
|
||||
# {
|
||||
# name: "my first image",
|
||||
# deps: {
|
||||
# "comfyui": "d0165d819afe76bd4e6bdd710eb5f3e571b6a804",
|
||||
# "git_custom_nodes": {
|
||||
# "https://github.com/cubiq/ComfyUI_IPAdapter_plus": {
|
||||
# "hash": "2ca0c6dd0b2ad64b1c480828638914a564331dcd",
|
||||
# "disabled": true
|
||||
# },
|
||||
# "https://github.com/ltdrdata/ComfyUI-Manager.git": {
|
||||
# "hash": "9c86f62b912f4625fe2b929c7fc61deb9d16f6d3",
|
||||
# "disabled": false
|
||||
# },
|
||||
# },
|
||||
# "file_custom_nodes": []
|
||||
# }
|
||||
# }
|
||||
|
||||
|
||||
class GitCustomNodes(BaseModel):
|
||||
hash: str
|
||||
disabled: bool
|
||||
|
||||
class FileCustomNodes(BaseModel):
|
||||
filename: str
|
||||
disabled: bool
|
||||
|
||||
|
||||
class Snapshot(BaseModel):
|
||||
comfyui: str
|
||||
git_custom_nodes: Dict[str, GitCustomNodes]
|
||||
file_custom_nodes: List[FileCustomNodes]
|
||||
|
||||
class Model(BaseModel):
|
||||
name: str
|
||||
type: str
|
||||
base: str
|
||||
save_path: str
|
||||
description: str
|
||||
reference: str
|
||||
filename: str
|
||||
url: str
|
||||
|
||||
|
||||
class GPUType(str, Enum):
|
||||
T4 = "T4"
|
||||
A10G = "A10G"
|
||||
A100 = "A100"
|
||||
L4 = "L4"
|
||||
|
||||
|
||||
class Item(BaseModel):
|
||||
machine_id: str
|
||||
name: str
|
||||
snapshot: Snapshot
|
||||
models: List[Model]
|
||||
callback_url: str
|
||||
gpu: GPUType = Field(default=GPUType.T4)
|
||||
|
||||
@field_validator('gpu')
|
||||
@classmethod
|
||||
def check_gpu(cls, value):
|
||||
if value not in GPUType.__members__:
|
||||
raise ValueError(
|
||||
f"Invalid GPU option. Choose from: {', '.join(GPUType.__members__.keys())}")
|
||||
return GPUType(value)
|
||||
|
||||
|
||||
@app.websocket("/ws/{machine_id}")
|
||||
async def websocket_endpoint(websocket: WebSocket, machine_id: str):
|
||||
await websocket.accept()
|
||||
machine_id_websocket_dict[machine_id] = websocket
|
||||
# Send existing logs
|
||||
if machine_id in machine_logs_cache:
|
||||
combined_logs = "\n".join(
|
||||
log_entry['logs'] for log_entry in machine_logs_cache[machine_id])
|
||||
await websocket.send_text(json.dumps({"event": "LOGS", "data": {
|
||||
"machine_id": machine_id,
|
||||
"logs": combined_logs,
|
||||
"timestamp": time.time()
|
||||
}}))
|
||||
try:
|
||||
while True:
|
||||
data = await websocket.receive_text()
|
||||
global last_activity_time
|
||||
last_activity_time = time.time()
|
||||
logger.info(f"Extended inactivity time to {global_timeout}")
|
||||
# You can handle received messages here if needed
|
||||
except WebSocketDisconnect:
|
||||
if machine_id in machine_id_websocket_dict:
|
||||
machine_id_websocket_dict.pop(machine_id)
|
||||
|
||||
# @app.get("/test")
|
||||
# async def test():
|
||||
# machine_id_status["123"] = True
|
||||
# global last_activity_time
|
||||
# last_activity_time = time.time()
|
||||
# logger.info(f"Extended inactivity time to {global_timeout}")
|
||||
|
||||
# await asyncio.sleep(10)
|
||||
|
||||
# machine_id_status["123"] = False
|
||||
# machine_id_status.pop("123")
|
||||
|
||||
# return {"Hello": "World"}
|
||||
|
||||
|
||||
@app.post("/create")
|
||||
async def create_machine(item: Item):
|
||||
global last_activity_time
|
||||
last_activity_time = time.time()
|
||||
logger.info(f"Extended inactivity time to {global_timeout}")
|
||||
|
||||
if item.machine_id in machine_id_status and machine_id_status[item.machine_id]:
|
||||
return JSONResponse(status_code=400, content={"error": "Build already in progress."})
|
||||
|
||||
# Run the building logic in a separate thread
|
||||
# future = executor.submit(build_logic, item)
|
||||
task = asyncio.create_task(build_logic(item))
|
||||
|
||||
return JSONResponse(status_code=200, content={"message": "Build Queued", "build_machine_instance_id": fly_instance_id})
|
||||
|
||||
|
||||
class StopAppItem(BaseModel):
|
||||
machine_id: str
|
||||
|
||||
|
||||
def find_app_id(app_list, app_name):
|
||||
for app in app_list:
|
||||
if app['Name'] == app_name:
|
||||
return app['App ID']
|
||||
return None
|
||||
|
||||
@app.post("/stop-app")
|
||||
async def stop_app(item: StopAppItem):
|
||||
# cmd = f"modal app list | grep {item.machine_id} | awk -F '│' '{{print $2}}'"
|
||||
cmd = f"modal app list --json"
|
||||
|
||||
env = os.environ.copy()
|
||||
env["COLUMNS"] = "10000" # Set the width to a large value
|
||||
find_id_process = await asyncio.subprocess.create_subprocess_shell(cmd,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
env=env)
|
||||
await find_id_process.wait()
|
||||
|
||||
stdout, stderr = await find_id_process.communicate()
|
||||
if stdout:
|
||||
app_id = stdout.decode().strip()
|
||||
app_list = json.loads(app_id)
|
||||
app_id = find_app_id(app_list, item.machine_id)
|
||||
logger.info(f"cp_process stdout: {app_id}")
|
||||
if stderr:
|
||||
logger.info(f"cp_process stderr: {stderr.decode()}")
|
||||
|
||||
cp_process = await asyncio.subprocess.create_subprocess_exec("modal", "app", "stop", app_id,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,)
|
||||
await cp_process.wait()
|
||||
logger.info(f"Stopping app {item.machine_id}")
|
||||
stdout, stderr = await cp_process.communicate()
|
||||
if stdout:
|
||||
logger.info(f"cp_process stdout: {stdout.decode()}")
|
||||
if stderr:
|
||||
logger.info(f"cp_process stderr: {stderr.decode()}")
|
||||
|
||||
if cp_process.returncode == 0:
|
||||
return JSONResponse(status_code=200, content={"status": "success"})
|
||||
else:
|
||||
return JSONResponse(status_code=500, content={"status": "error", "error": stderr.decode()})
|
||||
|
||||
# Initialize the logs cache
|
||||
machine_logs_cache = {}
|
||||
|
||||
|
||||
async def build_logic(item: Item):
|
||||
# Deploy to modal
|
||||
folder_path = f"/app/builds/{item.machine_id}"
|
||||
machine_id_status[item.machine_id] = True
|
||||
|
||||
# Ensure the os path is same as the current directory
|
||||
# os.chdir(os.path.dirname(os.path.realpath(__file__)))
|
||||
# print(
|
||||
# f"builder - Current working directory: {os.getcwd()}"
|
||||
# )
|
||||
|
||||
# Copy the app template
|
||||
# os.system(f"cp -r template {folder_path}")
|
||||
cp_process = await asyncio.subprocess.create_subprocess_exec("cp", "-r", "/app/src/template", folder_path)
|
||||
await cp_process.wait()
|
||||
|
||||
# Write the config file
|
||||
config = {
|
||||
"name": item.name,
|
||||
"deploy_test": os.environ.get("DEPLOY_TEST_FLAG", "False"),
|
||||
"gpu": item.gpu,
|
||||
"civitai_token": os.environ.get("CIVITAI_TOKEN", "")
|
||||
}
|
||||
with open(f"{folder_path}/config.py", "w") as f:
|
||||
f.write("config = " + json.dumps(config))
|
||||
|
||||
with open(f"{folder_path}/data/snapshot.json", "w") as f:
|
||||
f.write(item.snapshot.json())
|
||||
|
||||
with open(f"{folder_path}/data/models.json", "w") as f:
|
||||
models_json_list = [model.dict() for model in item.models]
|
||||
models_json_string = json.dumps(models_json_list)
|
||||
f.write(models_json_string)
|
||||
|
||||
# os.chdir(folder_path)
|
||||
# process = subprocess.Popen(f"modal deploy {folder_path}/app.py", stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True)
|
||||
process = await asyncio.subprocess.create_subprocess_shell(
|
||||
f"modal deploy app.py",
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
cwd=folder_path,
|
||||
env={**os.environ, "COLUMNS": "10000"}
|
||||
)
|
||||
|
||||
url = None
|
||||
|
||||
if item.machine_id not in machine_logs_cache:
|
||||
machine_logs_cache[item.machine_id] = []
|
||||
|
||||
machine_logs = machine_logs_cache[item.machine_id]
|
||||
|
||||
url_queue = asyncio.Queue()
|
||||
|
||||
async def read_stream(stream, isStderr, url_queue: asyncio.Queue):
|
||||
while True:
|
||||
line = await stream.readline()
|
||||
if line:
|
||||
l = line.decode('utf-8').strip()
|
||||
|
||||
if l == "":
|
||||
continue
|
||||
|
||||
if not isStderr:
|
||||
logger.info(l)
|
||||
machine_logs.append({
|
||||
"logs": l,
|
||||
"timestamp": time.time()
|
||||
})
|
||||
|
||||
if item.machine_id in machine_id_websocket_dict:
|
||||
await machine_id_websocket_dict[item.machine_id].send_text(json.dumps({"event": "LOGS", "data": {
|
||||
"machine_id": item.machine_id,
|
||||
"logs": l,
|
||||
"timestamp": time.time()
|
||||
}}))
|
||||
|
||||
if "Created comfyui_api =>" in l or ((l.startswith("https://") or l.startswith("│")) and l.endswith(".modal.run")):
|
||||
if "Created comfyui_api =>" in l:
|
||||
url = l.split("=>")[1].strip()
|
||||
# making sure it is a url
|
||||
elif "comfyui-api" in l:
|
||||
# Some case it only prints the url on a blank line
|
||||
if l.startswith("│"):
|
||||
url = l.split("│")[1].strip()
|
||||
else:
|
||||
url = l
|
||||
|
||||
if url:
|
||||
machine_logs.append({
|
||||
"logs": f"App image built, url: {url}",
|
||||
"timestamp": time.time()
|
||||
})
|
||||
|
||||
await url_queue.put(url)
|
||||
|
||||
if item.machine_id in machine_id_websocket_dict:
|
||||
await machine_id_websocket_dict[item.machine_id].send_text(json.dumps({"event": "LOGS", "data": {
|
||||
"machine_id": item.machine_id,
|
||||
"logs": f"App image built, url: {url}",
|
||||
"timestamp": time.time()
|
||||
}}))
|
||||
await machine_id_websocket_dict[item.machine_id].send_text(json.dumps({"event": "FINISHED", "data": {
|
||||
"status": "succuss",
|
||||
}}))
|
||||
|
||||
else:
|
||||
# is error
|
||||
logger.error(l)
|
||||
machine_logs.append({
|
||||
"logs": l,
|
||||
"timestamp": time.time()
|
||||
})
|
||||
|
||||
if item.machine_id in machine_id_websocket_dict:
|
||||
await machine_id_websocket_dict[item.machine_id].send_text(json.dumps({"event": "LOGS", "data": {
|
||||
"machine_id": item.machine_id,
|
||||
"logs": l,
|
||||
"timestamp": time.time()
|
||||
}}))
|
||||
await machine_id_websocket_dict[item.machine_id].send_text(json.dumps({"event": "FINISHED", "data": {
|
||||
"status": "failed",
|
||||
}}))
|
||||
else:
|
||||
break
|
||||
|
||||
stdout_task = asyncio.create_task(
|
||||
read_stream(process.stdout, False, url_queue))
|
||||
stderr_task = asyncio.create_task(
|
||||
read_stream(process.stderr, True, url_queue))
|
||||
|
||||
await asyncio.wait([stdout_task, stderr_task])
|
||||
|
||||
# Wait for the subprocess to finish
|
||||
await process.wait()
|
||||
|
||||
if not url_queue.empty():
|
||||
# The queue is not empty, you can get an item
|
||||
url = await url_queue.get()
|
||||
|
||||
# Close the ws connection and also pop the item
|
||||
if item.machine_id in machine_id_websocket_dict and machine_id_websocket_dict[item.machine_id] is not None:
|
||||
await machine_id_websocket_dict[item.machine_id].close()
|
||||
|
||||
if item.machine_id in machine_id_websocket_dict:
|
||||
machine_id_websocket_dict.pop(item.machine_id)
|
||||
|
||||
if item.machine_id in machine_id_status:
|
||||
machine_id_status[item.machine_id] = False
|
||||
|
||||
# Check for errors
|
||||
if process.returncode != 0:
|
||||
logger.info("An error occurred.")
|
||||
# Send a post request with the json body machine_id to the callback url
|
||||
machine_logs.append({
|
||||
"logs": "Unable to build the app image.",
|
||||
"timestamp": time.time()
|
||||
})
|
||||
requests.post(item.callback_url, json={
|
||||
"machine_id": item.machine_id, "build_log": json.dumps(machine_logs)})
|
||||
|
||||
if item.machine_id in machine_logs_cache:
|
||||
del machine_logs_cache[item.machine_id]
|
||||
|
||||
return
|
||||
# return JSONResponse(status_code=400, content={"error": "Unable to build the app image."})
|
||||
|
||||
# app_suffix = "comfyui-app"
|
||||
|
||||
if url is None:
|
||||
machine_logs.append({
|
||||
"logs": "App image built, but url is None, unable to parse the url.",
|
||||
"timestamp": time.time()
|
||||
})
|
||||
requests.post(item.callback_url, json={
|
||||
"machine_id": item.machine_id, "build_log": json.dumps(machine_logs)})
|
||||
|
||||
if item.machine_id in machine_logs_cache:
|
||||
del machine_logs_cache[item.machine_id]
|
||||
|
||||
return
|
||||
# return JSONResponse(status_code=400, content={"error": "App image built, but url is None, unable to parse the url."})
|
||||
# example https://bennykok--my-app-comfyui-app.modal.run/
|
||||
# my_url = f"https://{MODAL_ORG}--{item.container_id}-{app_suffix}.modal.run"
|
||||
|
||||
requests.post(item.callback_url, json={
|
||||
"machine_id": item.machine_id, "endpoint": url, "build_log": json.dumps(machine_logs)})
|
||||
if item.machine_id in machine_logs_cache:
|
||||
del machine_logs_cache[item.machine_id]
|
||||
|
||||
logger.info("done")
|
||||
logger.info(url)
|
||||
|
||||
|
||||
def start_loop(loop):
|
||||
asyncio.set_event_loop(loop)
|
||||
loop.run_forever()
|
||||
|
||||
|
||||
def run_in_new_thread(coroutine):
|
||||
new_loop = asyncio.new_event_loop()
|
||||
t = threading.Thread(target=start_loop, args=(new_loop,), daemon=True)
|
||||
t.start()
|
||||
asyncio.run_coroutine_threadsafe(coroutine, new_loop)
|
||||
return t
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import uvicorn
|
||||
# , log_level="debug"
|
||||
uvicorn.run("main:app", host="0.0.0.0", port=8080, lifespan="on")
|
@ -1,448 +0,0 @@
|
||||
import modal
|
||||
from typing import Union, Optional, Dict, List
|
||||
from pydantic import BaseModel, Field, field_validator
|
||||
from fastapi import FastAPI, HTTPException, WebSocket, BackgroundTasks, WebSocketDisconnect
|
||||
from fastapi.responses import JSONResponse
|
||||
from fastapi.logger import logger as fastapi_logger
|
||||
import os
|
||||
from enum import Enum
|
||||
import json
|
||||
import subprocess
|
||||
import time
|
||||
from contextlib import asynccontextmanager
|
||||
import asyncio
|
||||
import threading
|
||||
import signal
|
||||
import logging
|
||||
from fastapi.logger import logger as fastapi_logger
|
||||
import requests
|
||||
from urllib.parse import parse_qs
|
||||
from starlette.middleware.base import BaseHTTPMiddleware
|
||||
from starlette.types import ASGIApp, Scope, Receive, Send
|
||||
|
||||
# Modal应用实例
|
||||
modal_app = modal.App(name="comfyui-deploy")
|
||||
|
||||
gunicorn_error_logger = logging.getLogger("gunicorn.error")
|
||||
gunicorn_logger = logging.getLogger("gunicorn")
|
||||
uvicorn_access_logger = logging.getLogger("uvicorn.access")
|
||||
uvicorn_access_logger.handlers = gunicorn_error_logger.handlers
|
||||
|
||||
fastapi_logger.handlers = gunicorn_error_logger.handlers
|
||||
|
||||
if __name__ != "__main__":
|
||||
fastapi_logger.setLevel(gunicorn_logger.level)
|
||||
else:
|
||||
fastapi_logger.setLevel(logging.DEBUG)
|
||||
|
||||
logger = logging.getLogger("uvicorn")
|
||||
logger.setLevel(logging.INFO)
|
||||
|
||||
last_activity_time = time.time()
|
||||
global_timeout = 60 * 4
|
||||
|
||||
machine_id_websocket_dict = {}
|
||||
machine_id_status = {}
|
||||
machine_logs_cache = {}
|
||||
|
||||
fly_instance_id = os.environ.get('FLY_ALLOC_ID', 'local').split('-')[0]
|
||||
|
||||
class FlyReplayMiddleware(BaseHTTPMiddleware):
|
||||
def __init__(self, app: ASGIApp) -> None:
|
||||
super().__init__(app)
|
||||
|
||||
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
|
||||
query_string = scope.get('query_string', b'').decode()
|
||||
query_params = parse_qs(query_string)
|
||||
target_instance = query_params.get('fly_instance_id', [fly_instance_id])[0]
|
||||
|
||||
async def send_wrapper(message):
|
||||
if target_instance != fly_instance_id:
|
||||
if message['type'] == 'websocket.close' and 'Invalid session' in message.get('reason', ''):
|
||||
message = {'type': 'websocket.accept'}
|
||||
if 'headers' not in message:
|
||||
message['headers'] = []
|
||||
message['headers'].append([b'fly-replay', f'instance={target_instance}'.encode()])
|
||||
await send(message)
|
||||
await self.app(scope, receive, send_wrapper)
|
||||
|
||||
async def check_inactivity():
|
||||
global last_activity_time
|
||||
while True:
|
||||
if time.time() - last_activity_time > global_timeout:
|
||||
if len(machine_id_status) == 0:
|
||||
logger.info(f"No activity for {global_timeout} seconds, exiting...")
|
||||
os.kill(os.getpid(), signal.SIGINT)
|
||||
break
|
||||
await asyncio.sleep(1)
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI):
|
||||
thread = run_in_new_thread(check_inactivity())
|
||||
yield
|
||||
logger.info("Cancelling")
|
||||
|
||||
# FastAPI实例
|
||||
fastapi_app = FastAPI(lifespan=lifespan)
|
||||
fastapi_app.add_middleware(FlyReplayMiddleware)
|
||||
|
||||
class GitCustomNodes(BaseModel):
|
||||
hash: str
|
||||
disabled: bool
|
||||
|
||||
class FileCustomNodes(BaseModel):
|
||||
filename: str
|
||||
disabled: bool
|
||||
|
||||
class Snapshot(BaseModel):
|
||||
comfyui: str
|
||||
git_custom_nodes: Dict[str, GitCustomNodes]
|
||||
file_custom_nodes: List[FileCustomNodes]
|
||||
|
||||
class Model(BaseModel):
|
||||
name: str
|
||||
type: str
|
||||
base: str
|
||||
save_path: str
|
||||
description: str
|
||||
reference: str
|
||||
filename: str
|
||||
url: str
|
||||
|
||||
class GPUType(str, Enum):
|
||||
T4 = "T4"
|
||||
A10G = "A10G"
|
||||
A100 = "A100"
|
||||
L4 = "L4"
|
||||
|
||||
class Item(BaseModel):
|
||||
machine_id: str
|
||||
name: str
|
||||
snapshot: Snapshot
|
||||
models: List[Model]
|
||||
callback_url: str
|
||||
gpu: GPUType = Field(default=GPUType.T4)
|
||||
|
||||
@field_validator('gpu')
|
||||
@classmethod
|
||||
def check_gpu(cls, value):
|
||||
if value not in GPUType.__members__:
|
||||
raise ValueError(f"Invalid GPU option. Choose from: {', '.join(GPUType.__members__.keys())}")
|
||||
return GPUType(value)
|
||||
|
||||
class StopAppItem(BaseModel):
|
||||
machine_id: str
|
||||
|
||||
@fastapi_app.get("/")
|
||||
def read_root():
|
||||
global last_activity_time
|
||||
last_activity_time = time.time()
|
||||
logger.info(f"Extended inactivity time to {global_timeout}")
|
||||
return {"Hello": "World"}
|
||||
|
||||
@fastapi_app.websocket("/ws/{machine_id}")
|
||||
async def websocket_endpoint(websocket: WebSocket, machine_id: str):
|
||||
await websocket.accept()
|
||||
machine_id_websocket_dict[machine_id] = websocket
|
||||
if machine_id in machine_logs_cache:
|
||||
combined_logs = "\n".join(log_entry['logs'] for log_entry in machine_logs_cache[machine_id])
|
||||
await websocket.send_text(json.dumps({
|
||||
"event": "LOGS",
|
||||
"data": {
|
||||
"machine_id": machine_id,
|
||||
"logs": combined_logs,
|
||||
"timestamp": time.time()
|
||||
}
|
||||
}))
|
||||
try:
|
||||
while True:
|
||||
data = await websocket.receive_text()
|
||||
global last_activity_time
|
||||
last_activity_time = time.time()
|
||||
logger.info(f"Extended inactivity time to {global_timeout}")
|
||||
except WebSocketDisconnect:
|
||||
if machine_id in machine_id_websocket_dict:
|
||||
del machine_id_websocket_dict[machine_id]
|
||||
|
||||
@fastapi_app.post("/create")
|
||||
async def create_machine(item: Item):
|
||||
global last_activity_time
|
||||
last_activity_time = time.time()
|
||||
logger.info(f"Extended inactivity time to {global_timeout}")
|
||||
|
||||
if item.machine_id in machine_id_status and machine_id_status[item.machine_id]:
|
||||
return JSONResponse(status_code=400, content={"error": "Build already in progress."})
|
||||
|
||||
task = asyncio.create_task(build_logic(item))
|
||||
return JSONResponse(
|
||||
status_code=200,
|
||||
content={
|
||||
"message": "Build Queued",
|
||||
"build_machine_instance_id": fly_instance_id
|
||||
}
|
||||
)
|
||||
|
||||
def find_app_id(app_list, app_name):
|
||||
for app in app_list:
|
||||
if app['Name'] == app_name:
|
||||
return app['App ID']
|
||||
return None
|
||||
|
||||
@fastapi_app.post("/stop-app")
|
||||
async def stop_app(item: StopAppItem):
|
||||
cmd = f"modal app list --json"
|
||||
env = os.environ.copy()
|
||||
env["COLUMNS"] = "10000"
|
||||
|
||||
find_id_process = await asyncio.subprocess.create_subprocess_shell(
|
||||
cmd,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
env=env
|
||||
)
|
||||
|
||||
stdout, stderr = await find_id_process.communicate()
|
||||
if stdout:
|
||||
app_list = json.loads(stdout.decode().strip())
|
||||
app_id = find_app_id(app_list, item.machine_id)
|
||||
logger.info(f"cp_process stdout: {app_id}")
|
||||
if stderr:
|
||||
logger.info(f"cp_process stderr: {stderr.decode()}")
|
||||
|
||||
cp_process = await asyncio.subprocess.create_subprocess_exec(
|
||||
"modal", "app", "stop", app_id,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
)
|
||||
|
||||
await cp_process.wait()
|
||||
stdout, stderr = await cp_process.communicate()
|
||||
|
||||
if stdout:
|
||||
logger.info(f"cp_process stdout: {stdout.decode()}")
|
||||
if stderr:
|
||||
logger.info(f"cp_process stderr: {stderr.decode()}")
|
||||
|
||||
if cp_process.returncode == 0:
|
||||
return JSONResponse(status_code=200, content={"status": "success"})
|
||||
else:
|
||||
return JSONResponse(
|
||||
status_code=500,
|
||||
content={"status": "error", "error": stderr.decode()}
|
||||
)
|
||||
|
||||
async def build_logic(item: Item):
|
||||
folder_path = f"/app/builds/{item.machine_id}"
|
||||
machine_id_status[item.machine_id] = True
|
||||
|
||||
cp_process = await asyncio.subprocess.create_subprocess_exec(
|
||||
"cp", "-r", "/app/src/template", folder_path
|
||||
)
|
||||
await cp_process.wait()
|
||||
|
||||
config = {
|
||||
"name": item.name,
|
||||
"deploy_test": os.environ.get("DEPLOY_TEST_FLAG", "False"),
|
||||
"gpu": item.gpu,
|
||||
"civitai_token": os.environ.get("CIVITAI_TOKEN", "833b4ded5c7757a06a803763500bab58")
|
||||
}
|
||||
|
||||
with open(f"{folder_path}/config.py", "w") as f:
|
||||
f.write("config = " + json.dumps(config))
|
||||
|
||||
with open(f"{folder_path}/data/snapshot.json", "w") as f:
|
||||
f.write(item.snapshot.json())
|
||||
|
||||
with open(f"{folder_path}/data/models.json", "w") as f:
|
||||
models_json_list = [model.dict() for model in item.models]
|
||||
f.write(json.dumps(models_json_list))
|
||||
|
||||
process = await asyncio.subprocess.create_subprocess_shell(
|
||||
f"modal deploy app.py",
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
cwd=folder_path,
|
||||
env={**os.environ, "COLUMNS": "10000"}
|
||||
)
|
||||
|
||||
if item.machine_id not in machine_logs_cache:
|
||||
machine_logs_cache[item.machine_id] = []
|
||||
|
||||
machine_logs = machine_logs_cache[item.machine_id]
|
||||
url_queue = asyncio.Queue()
|
||||
|
||||
async def read_stream(stream, isStderr, url_queue: asyncio.Queue):
|
||||
while True:
|
||||
line = await stream.readline()
|
||||
if not line:
|
||||
break
|
||||
|
||||
l = line.decode('utf-8').strip()
|
||||
if not l:
|
||||
continue
|
||||
|
||||
if not isStderr:
|
||||
logger.info(l)
|
||||
machine_logs.append({
|
||||
"logs": l,
|
||||
"timestamp": time.time()
|
||||
})
|
||||
|
||||
if item.machine_id in machine_id_websocket_dict:
|
||||
await machine_id_websocket_dict[item.machine_id].send_text(
|
||||
json.dumps({
|
||||
"event": "LOGS",
|
||||
"data": {
|
||||
"machine_id": item.machine_id,
|
||||
"logs": l,
|
||||
"timestamp": time.time()
|
||||
}
|
||||
})
|
||||
)
|
||||
|
||||
if "Created comfyui_api =>" in l or ((l.startswith("https://") or l.startswith("│")) and l.endswith(".modal.run")):
|
||||
if "Created comfyui_api =>" in l:
|
||||
url = l.split("=>")[1].strip()
|
||||
elif "comfyui-api" in l:
|
||||
url = l.split("│")[1].strip() if l.startswith("│") else l
|
||||
|
||||
if url:
|
||||
machine_logs.append({
|
||||
"logs": f"App image built, url: {url}",
|
||||
"timestamp": time.time()
|
||||
})
|
||||
|
||||
await url_queue.put(url)
|
||||
|
||||
if item.machine_id in machine_id_websocket_dict:
|
||||
await machine_id_websocket_dict[item.machine_id].send_text(
|
||||
json.dumps({
|
||||
"event": "LOGS",
|
||||
"data": {
|
||||
"machine_id": item.machine_id,
|
||||
"logs": f"App image built, url: {url}",
|
||||
"timestamp": time.time()
|
||||
}
|
||||
})
|
||||
)
|
||||
await machine_id_websocket_dict[item.machine_id].send_text(
|
||||
json.dumps({
|
||||
"event": "FINISHED",
|
||||
"data": {
|
||||
"status": "success",
|
||||
}
|
||||
})
|
||||
)
|
||||
|
||||
else:
|
||||
logger.error(l)
|
||||
machine_logs.append({
|
||||
"logs": l,
|
||||
"timestamp": time.time()
|
||||
})
|
||||
|
||||
if item.machine_id in machine_id_websocket_dict:
|
||||
await machine_id_websocket_dict[item.machine_id].send_text(
|
||||
json.dumps({
|
||||
"event": "LOGS",
|
||||
"data": {
|
||||
"machine_id": item.machine_id,
|
||||
"logs": l,
|
||||
"timestamp": time.time()
|
||||
}
|
||||
})
|
||||
)
|
||||
await machine_id_websocket_dict[item.machine_id].send_text(
|
||||
json.dumps({
|
||||
"event": "FINISHED",
|
||||
"data": {
|
||||
"status": "failed",
|
||||
}
|
||||
})
|
||||
)
|
||||
|
||||
stdout_task = asyncio.create_task(read_stream(process.stdout, False, url_queue))
|
||||
stderr_task = asyncio.create_task(read_stream(process.stderr, True, url_queue))
|
||||
|
||||
await asyncio.wait([stdout_task, stderr_task])
|
||||
await process.wait()
|
||||
|
||||
url = await url_queue.get() if not url_queue.empty() else None
|
||||
|
||||
if item.machine_id in machine_id_websocket_dict and machine_id_websocket_dict[item.machine_id] is not None:
|
||||
await machine_id_websocket_dict[item.machine_id].close()
|
||||
|
||||
if item.machine_id in machine_id_websocket_dict:
|
||||
del machine_id_websocket_dict[item.machine_id]
|
||||
|
||||
if item.machine_id in machine_id_status:
|
||||
machine_id_status[item.machine_id] = False
|
||||
|
||||
if process.returncode != 0:
|
||||
logger.info("An error occurred.")
|
||||
machine_logs.append({
|
||||
"logs": "Unable to build the app image.",
|
||||
"timestamp": time.time()
|
||||
})
|
||||
requests.post(
|
||||
item.callback_url,
|
||||
json={
|
||||
"machine_id": item.machine_id,
|
||||
"build_log": json.dumps(machine_logs)
|
||||
}
|
||||
)
|
||||
if item.machine_id in machine_logs_cache:
|
||||
del machine_logs_cache[item.machine_id]
|
||||
return
|
||||
|
||||
if url is None:
|
||||
machine_logs.append({
|
||||
"logs": "App image built, but url is None, unable to parse the url.",
|
||||
"timestamp": time.time()
|
||||
})
|
||||
requests.post(
|
||||
item.callback_url,
|
||||
json={
|
||||
"machine_id": item.machine_id,
|
||||
"build_log": json.dumps(machine_logs)
|
||||
}
|
||||
)
|
||||
if item.machine_id in machine_logs_cache:
|
||||
del machine_logs_cache[item.machine_id]
|
||||
return
|
||||
|
||||
requests.post(
|
||||
item.callback_url,
|
||||
json={
|
||||
"machine_id": item.machine_id,
|
||||
"endpoint": url,
|
||||
"build_log": json.dumps(machine_logs)
|
||||
}
|
||||
)
|
||||
|
||||
if item.machine_id in machine_logs_cache:
|
||||
del machine_logs_cache[item.machine_id]
|
||||
|
||||
logger.info("done")
|
||||
logger.info(url)
|
||||
|
||||
def start_loop(loop):
|
||||
asyncio.set_event_loop(loop)
|
||||
loop.run_forever()
|
||||
|
||||
def run_in_new_thread(coroutine):
|
||||
new_loop = asyncio.new_event_loop()
|
||||
t = threading.Thread(target=start_loop, args=(new_loop,), daemon=True)
|
||||
t.start()
|
||||
asyncio.run_coroutine_threadsafe(coroutine, new_loop)
|
||||
return t
|
||||
|
||||
# Modal endpoint
|
||||
@modal_app.function()
|
||||
@modal.asgi_app()
|
||||
def app():
|
||||
return fastapi_app
|
||||
|
||||
if __name__ == "__main__":
|
||||
import uvicorn
|
||||
uvicorn.run(fastapi_app, host="0.0.0.0", port=8080, lifespan="on")
|
@ -307,5 +307,4 @@ def comfyui_app():
|
||||
},
|
||||
)()
|
||||
|
||||
proxy_app = make_simple_proxy_app(ProxyContext(config)) # Assign to variable
|
||||
return proxy_app # Return the variable
|
||||
return make_simple_proxy_app(ProxyContext(config))
|
@ -1,57 +0,0 @@
|
||||
import os
|
||||
import io
|
||||
import torchaudio
|
||||
from folder_paths import get_annotated_filepath
|
||||
|
||||
class ComfyUIDeployExternalAudio:
|
||||
RETURN_TYPES = ("AUDIO",)
|
||||
RETURN_NAMES = ("audio",)
|
||||
FUNCTION = "load_audio"
|
||||
|
||||
@classmethod
|
||||
def INPUT_TYPES(cls):
|
||||
return {
|
||||
"required": {
|
||||
"input_id": (
|
||||
"STRING",
|
||||
{"multiline": False, "default": "input_audio"},
|
||||
),
|
||||
"audio_file": ("STRING", {"default": ""}),
|
||||
},
|
||||
"optional": {
|
||||
"default_value": ("AUDIO",),
|
||||
"display_name": (
|
||||
"STRING",
|
||||
{"multiline": False, "default": ""},
|
||||
),
|
||||
"description": (
|
||||
"STRING",
|
||||
{"multiline": False, "default": ""},
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def VALIDATE_INPUTS(s, audio_file, **kwargs):
|
||||
return True
|
||||
|
||||
def load_audio(self, input_id, audio_file, default_value=None, display_name=None, description=None):
|
||||
if audio_file and audio_file != "":
|
||||
if audio_file.startswith(('http://', 'https://')):
|
||||
# Handle URL input
|
||||
import requests
|
||||
response = requests.get(audio_file)
|
||||
audio_data = io.BytesIO(response.content)
|
||||
waveform, sample_rate = torchaudio.load(audio_data)
|
||||
else:
|
||||
# Handle local file
|
||||
audio_path = get_annotated_filepath(audio_file)
|
||||
waveform, sample_rate = torchaudio.load(audio_path)
|
||||
|
||||
audio = {"waveform": waveform.unsqueeze(0), "sample_rate": sample_rate}
|
||||
return (audio,)
|
||||
else:
|
||||
return (default_value,)
|
||||
|
||||
NODE_CLASS_MAPPINGS = {"ComfyUIDeployExternalAudio": ComfyUIDeployExternalAudio}
|
||||
NODE_DISPLAY_NAME_MAPPINGS = {"ComfyUIDeployExternalAudio": "External Audio (ComfyUI Deploy)"}
|
@ -1,108 +0,0 @@
|
||||
from PIL import Image, ImageOps
|
||||
import numpy as np
|
||||
import torch
|
||||
import folder_paths
|
||||
|
||||
|
||||
class AnyType(str):
|
||||
def __ne__(self, __value: object) -> bool:
|
||||
return False
|
||||
|
||||
|
||||
WILDCARD = AnyType("*")
|
||||
|
||||
|
||||
class ComfyUIDeployExternalFaceModel:
|
||||
@classmethod
|
||||
def INPUT_TYPES(s):
|
||||
return {
|
||||
"required": {
|
||||
"input_id": (
|
||||
"STRING",
|
||||
{"multiline": False, "default": "input_reactor_face_model"},
|
||||
),
|
||||
},
|
||||
"optional": {
|
||||
"default_face_model_name": (
|
||||
"STRING",
|
||||
{"multiline": False, "default": ""},
|
||||
),
|
||||
"face_model_save_name": ( # if `default_face_model_name` is a link to download a file, we will attempt to save it with this name
|
||||
"STRING",
|
||||
{"multiline": False, "default": ""},
|
||||
),
|
||||
"display_name": (
|
||||
"STRING",
|
||||
{"multiline": False, "default": ""},
|
||||
),
|
||||
"description": (
|
||||
"STRING",
|
||||
{"multiline": True, "default": ""},
|
||||
),
|
||||
"face_model_url": (
|
||||
"STRING",
|
||||
{"multiline": False, "default": ""},
|
||||
),
|
||||
},
|
||||
}
|
||||
|
||||
RETURN_TYPES = (WILDCARD,)
|
||||
RETURN_NAMES = ("path",)
|
||||
|
||||
FUNCTION = "run"
|
||||
|
||||
CATEGORY = "deploy"
|
||||
|
||||
def run(
|
||||
self,
|
||||
input_id,
|
||||
default_face_model_name=None,
|
||||
face_model_save_name=None,
|
||||
display_name=None,
|
||||
description=None,
|
||||
face_model_url=None,
|
||||
):
|
||||
import requests
|
||||
import os
|
||||
import uuid
|
||||
|
||||
if face_model_url and face_model_url.startswith("http"):
|
||||
if face_model_save_name:
|
||||
existing_face_models = folder_paths.get_filename_list("reactor/faces")
|
||||
# Check if face_model_save_name exists in the list
|
||||
if face_model_save_name in existing_face_models:
|
||||
print(f"using face model: {face_model_save_name}")
|
||||
return (face_model_save_name,)
|
||||
else:
|
||||
face_model_save_name = str(uuid.uuid4()) + ".safetensors"
|
||||
print(face_model_save_name)
|
||||
print(folder_paths.folder_names_and_paths["reactor/faces"][0][0])
|
||||
destination_path = os.path.join(
|
||||
folder_paths.folder_names_and_paths["reactor/faces"][0][0],
|
||||
face_model_save_name,
|
||||
)
|
||||
|
||||
print(destination_path)
|
||||
print(
|
||||
"Downloading external face model - "
|
||||
+ face_model_url
|
||||
+ " to "
|
||||
+ destination_path
|
||||
)
|
||||
response = requests.get(
|
||||
face_model_url,
|
||||
headers={"User-Agent": "Mozilla/5.0"},
|
||||
allow_redirects=True,
|
||||
)
|
||||
with open(destination_path, "wb") as out_file:
|
||||
out_file.write(response.content)
|
||||
return (face_model_save_name,)
|
||||
else:
|
||||
print(f"using face model: {default_face_model_name}")
|
||||
return (default_face_model_name,)
|
||||
|
||||
|
||||
NODE_CLASS_MAPPINGS = {"ComfyUIDeployExternalFaceModel": ComfyUIDeployExternalFaceModel}
|
||||
NODE_DISPLAY_NAME_MAPPINGS = {
|
||||
"ComfyUIDeployExternalFaceModel": "External Face Model (ComfyUI Deploy)"
|
||||
}
|
@ -21,9 +21,8 @@ class ComfyUIDeployExternalImage:
|
||||
),
|
||||
"description": (
|
||||
"STRING",
|
||||
{"multiline": False, "default": ""},
|
||||
{"multiline": True, "default": ""},
|
||||
),
|
||||
"default_value_url": ("STRING", {"image_preview": True, "default": ""}),
|
||||
}
|
||||
}
|
||||
|
||||
@ -34,43 +33,31 @@ class ComfyUIDeployExternalImage:
|
||||
|
||||
CATEGORY = "image"
|
||||
|
||||
def run(self, input_id, default_value=None, display_name=None, description=None, default_value_url=None):
|
||||
def run(self, input_id, default_value=None, display_name=None, description=None):
|
||||
image = default_value
|
||||
|
||||
# Try both input_id and default_value_url
|
||||
urls_to_try = [url for url in [input_id, default_value_url] if url]
|
||||
|
||||
print(default_value_url)
|
||||
|
||||
for url in urls_to_try:
|
||||
try:
|
||||
if url.startswith('http'):
|
||||
if input_id.startswith('http'):
|
||||
import requests
|
||||
from io import BytesIO
|
||||
print(f"Fetching image from url: {url}")
|
||||
response = requests.get(url)
|
||||
print("Fetching image from url: ", input_id)
|
||||
response = requests.get(input_id)
|
||||
image = Image.open(BytesIO(response.content))
|
||||
break
|
||||
elif url.startswith(('data:image/png;base64,', 'data:image/jpeg;base64,', 'data:image/jpg;base64,')):
|
||||
elif input_id.startswith('data:image/png;base64,') or input_id.startswith('data:image/jpeg;base64,') or input_id.startswith('data:image/jpg;base64,'):
|
||||
import base64
|
||||
from io import BytesIO
|
||||
print("Decoding base64 image")
|
||||
base64_image = url[url.find(",")+1:]
|
||||
base64_image = input_id[input_id.find(",")+1:]
|
||||
decoded_image = base64.b64decode(base64_image)
|
||||
image = Image.open(BytesIO(decoded_image))
|
||||
break
|
||||
except:
|
||||
continue
|
||||
else:
|
||||
raise ValueError("Invalid image url provided.")
|
||||
|
||||
if image is not None:
|
||||
try:
|
||||
image = ImageOps.exif_transpose(image)
|
||||
image = image.convert("RGB")
|
||||
image = np.array(image).astype(np.float32) / 255.0
|
||||
image = torch.from_numpy(image)[None,]
|
||||
return [image]
|
||||
except:
|
||||
pass
|
||||
|
||||
return [image]
|
||||
|
||||
|
||||
|
@ -64,8 +64,7 @@ class ComfyUIDeployExternalLora:
|
||||
import os
|
||||
import uuid
|
||||
|
||||
if lora_url:
|
||||
if lora_url.startswith("http"):
|
||||
if lora_url and lora_url.startswith("http"):
|
||||
if lora_save_name:
|
||||
existing_loras = folder_paths.get_filename_list("loras")
|
||||
# Check if lora_save_name exists in the list
|
||||
@ -80,12 +79,7 @@ class ComfyUIDeployExternalLora:
|
||||
folder_paths.folder_names_and_paths["loras"][0][0], lora_save_name
|
||||
)
|
||||
print(destination_path)
|
||||
print(
|
||||
"Downloading external lora - "
|
||||
+ lora_url
|
||||
+ " to "
|
||||
+ destination_path
|
||||
)
|
||||
print("Downloading external lora - " + lora_url + " to " + destination_path)
|
||||
response = requests.get(
|
||||
lora_url,
|
||||
headers={"User-Agent": "Mozilla/5.0"},
|
||||
@ -93,13 +87,9 @@ class ComfyUIDeployExternalLora:
|
||||
)
|
||||
with open(destination_path, "wb") as out_file:
|
||||
out_file.write(response.content)
|
||||
print(f"Ext Lora loading: {lora_url} to {lora_save_name}")
|
||||
return (lora_save_name,)
|
||||
else:
|
||||
print(f"Ext Lora loading: {lora_url}")
|
||||
return (lora_url,)
|
||||
else:
|
||||
print(f"Ext Lora loading: {default_lora_name}")
|
||||
print(f"using lora: {default_lora_name}")
|
||||
return (default_lora_name,)
|
||||
|
||||
|
||||
|
@ -1,53 +0,0 @@
|
||||
import re
|
||||
|
||||
|
||||
class StringFunction:
|
||||
@classmethod
|
||||
def INPUT_TYPES(s):
|
||||
return {
|
||||
"required": {
|
||||
"action": (["append", "replace"], {}),
|
||||
"tidy_tags": (["yes", "no"], {}),
|
||||
},
|
||||
"optional": {
|
||||
"text_a": ("STRING", {"multiline": True, "dynamicPrompts": False}),
|
||||
"text_b": ("STRING", {"multiline": True, "dynamicPrompts": False}),
|
||||
"text_c": ("STRING", {"multiline": True, "dynamicPrompts": False}),
|
||||
},
|
||||
}
|
||||
|
||||
RETURN_TYPES = ("STRING",)
|
||||
FUNCTION = "exec"
|
||||
CATEGORY = "utils"
|
||||
OUTPUT_NODE = True
|
||||
|
||||
def exec(self, action, tidy_tags, text_a="", text_b="", text_c=""):
|
||||
tidy_tags = tidy_tags == "yes"
|
||||
out = ""
|
||||
if action == "append":
|
||||
out = (", " if tidy_tags else "").join(
|
||||
filter(None, [text_a, text_b, text_c])
|
||||
)
|
||||
else:
|
||||
if text_c is None:
|
||||
text_c = ""
|
||||
if text_b.startswith("/") and text_b.endswith("/"):
|
||||
regex = text_b[1:-1]
|
||||
out = re.sub(regex, text_c, text_a)
|
||||
else:
|
||||
out = text_a.replace(text_b, text_c)
|
||||
if tidy_tags:
|
||||
out = re.sub(r"\s{2,}", " ", out)
|
||||
out = out.replace(" ,", ",")
|
||||
out = re.sub(r",{2,}", ",", out)
|
||||
out = out.strip()
|
||||
return {"ui": {"text": (out,)}, "result": (out,)}
|
||||
|
||||
|
||||
NODE_CLASS_MAPPINGS = {
|
||||
"ComfyUIDeployStringCombine": StringFunction,
|
||||
}
|
||||
|
||||
NODE_DISPLAY_NAME_MAPPINGS = {
|
||||
"ComfyUIDeployStringCombine": "String Combine (ComfyUI Deploy)",
|
||||
}
|
@ -1,46 +0,0 @@
|
||||
class AnyType(str):
|
||||
def __ne__(self, __value: object) -> bool:
|
||||
return False
|
||||
|
||||
|
||||
WILDCARD = AnyType("*")
|
||||
|
||||
class ComfyUIDeployExternalTextAny:
|
||||
@classmethod
|
||||
def INPUT_TYPES(s):
|
||||
return {
|
||||
"required": {
|
||||
"input_id": (
|
||||
"STRING",
|
||||
{"multiline": False, "default": "input_text"},
|
||||
),
|
||||
},
|
||||
"optional": {
|
||||
"default_value": (
|
||||
"STRING",
|
||||
{"multiline": True, "default": ""},
|
||||
),
|
||||
"display_name": (
|
||||
"STRING",
|
||||
{"multiline": False, "default": ""},
|
||||
),
|
||||
"description": (
|
||||
"STRING",
|
||||
{"multiline": True, "default": ""},
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
RETURN_TYPES = (WILDCARD,)
|
||||
RETURN_NAMES = ("text",)
|
||||
|
||||
FUNCTION = "run"
|
||||
|
||||
CATEGORY = "text"
|
||||
|
||||
def run(self, input_id, default_value=None, display_name=None, description=None):
|
||||
return [default_value]
|
||||
|
||||
|
||||
NODE_CLASS_MAPPINGS = {"ComfyUIDeployExternalTextAny": ComfyUIDeployExternalTextAny}
|
||||
NODE_DISPLAY_NAME_MAPPINGS = {"ComfyUIDeployExternalTextAny": "External Text Any (ComfyUI Deploy)"}
|
52
comfy-nodes/external_text_list.py
Normal file
52
comfy-nodes/external_text_list.py
Normal file
@ -0,0 +1,52 @@
|
||||
import folder_paths
|
||||
from PIL import Image, ImageOps
|
||||
import numpy as np
|
||||
import torch
|
||||
import json
|
||||
|
||||
class ComfyUIDeployExternalTextList:
|
||||
@classmethod
|
||||
def INPUT_TYPES(s):
|
||||
return {
|
||||
"required": {
|
||||
"input_id": (
|
||||
"STRING",
|
||||
{"multiline": False, "default": 'input_text_list'},
|
||||
),
|
||||
"text": (
|
||||
"STRING",
|
||||
{"multiline": True, "default": "[]"},
|
||||
),
|
||||
},
|
||||
"optional": {
|
||||
"display_name": (
|
||||
"STRING",
|
||||
{"multiline": False, "default": ""},
|
||||
),
|
||||
"description": (
|
||||
"STRING",
|
||||
{"multiline": True, "default": ""},
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
RETURN_TYPES = ("STRING",)
|
||||
RETURN_NAMES = ("text",)
|
||||
|
||||
OUTPUT_IS_LIST = (True,)
|
||||
|
||||
FUNCTION = "run"
|
||||
|
||||
CATEGORY = "text"
|
||||
|
||||
def run(self, input_id, text=None, display_name=None, description=None):
|
||||
text_list = []
|
||||
try:
|
||||
text_list = json.loads(text) # Assuming text is a JSON array string
|
||||
except Exception as e:
|
||||
print(f"Error processing images: {e}")
|
||||
pass
|
||||
return ([text_list],)
|
||||
|
||||
NODE_CLASS_MAPPINGS = {"ComfyUIDeployExternalTextList": ComfyUIDeployExternalTextList}
|
||||
NODE_DISPLAY_NAME_MAPPINGS = {"ComfyUIDeployExternalTextList": "External Text List (ComfyUI Deploy)"}
|
@ -1,60 +0,0 @@
|
||||
import folder_paths
|
||||
class AnyType(str):
|
||||
def __ne__(self, __value: object) -> bool:
|
||||
return False
|
||||
|
||||
from os import walk
|
||||
|
||||
WILDCARD = AnyType("*")
|
||||
|
||||
MODEL_EXTENSIONS = {
|
||||
"safetensors": "SafeTensors file format",
|
||||
"ckpt": "Checkpoint file",
|
||||
"pth": "PyTorch serialized file",
|
||||
"pkl": "Pickle file",
|
||||
"onnx": "ONNX file",
|
||||
}
|
||||
|
||||
def fetch_files(path):
|
||||
for (dirpath, dirnames, filenames) in walk(path):
|
||||
fs = []
|
||||
if len(dirnames) > 0:
|
||||
for dirname in dirnames:
|
||||
fs.extend(fetch_files(f"{dirpath}/{dirname}"))
|
||||
for filename in filenames:
|
||||
# Remove "./models/" from the beginning of dirpath
|
||||
relative_dirpath = dirpath.replace("./models/", "", 1)
|
||||
file_path = f"{relative_dirpath}/{filename}"
|
||||
|
||||
# Only add files that are known model extensions
|
||||
file_extension = filename.split('.')[-1].lower()
|
||||
if file_extension in MODEL_EXTENSIONS:
|
||||
fs.append(file_path)
|
||||
|
||||
return fs
|
||||
allModels = fetch_files("./models")
|
||||
|
||||
class ComfyUIDeployModalList:
|
||||
@classmethod
|
||||
def INPUT_TYPES(s):
|
||||
return {
|
||||
"required": {
|
||||
"model": (allModels, ),
|
||||
}
|
||||
}
|
||||
|
||||
RETURN_TYPES = (WILDCARD,)
|
||||
RETURN_NAMES = ("model",)
|
||||
|
||||
FUNCTION = "run"
|
||||
|
||||
CATEGORY = "model"
|
||||
|
||||
def run(self, model=""):
|
||||
# Split the model path by '/' and select the last item
|
||||
model_name = model.split('/')[-1]
|
||||
return [model_name]
|
||||
|
||||
|
||||
NODE_CLASS_MAPPINGS = {"ComfyUIDeployModelList": ComfyUIDeployModalList}
|
||||
NODE_DISPLAY_NAME_MAPPINGS = {"ComfyUIDeployModelList": "Model List (ComfyUI Deploy)"}
|
@ -1,92 +0,0 @@
|
||||
import os
|
||||
import json
|
||||
import numpy as np
|
||||
from PIL import Image
|
||||
from PIL.PngImagePlugin import PngInfo
|
||||
import folder_paths
|
||||
|
||||
|
||||
class ComfyDeployOutputImage:
|
||||
def __init__(self):
|
||||
self.output_dir = folder_paths.get_output_directory()
|
||||
self.type = "output"
|
||||
self.prefix_append = ""
|
||||
self.compress_level = 4
|
||||
|
||||
@classmethod
|
||||
def INPUT_TYPES(s):
|
||||
return {
|
||||
"required": {
|
||||
"images": ("IMAGE", {"tooltip": "The images to save."}),
|
||||
"filename_prefix": (
|
||||
"STRING",
|
||||
{
|
||||
"default": "ComfyUI",
|
||||
"tooltip": "The prefix for the file to save. This may include formatting information such as %date:yyyy-MM-dd% or %Empty Latent Image.width% to include values from nodes.",
|
||||
},
|
||||
),
|
||||
"file_type": (["png", "jpg", "webp"], {"default": "webp"}),
|
||||
"quality": ("INT", {"default": 80, "min": 1, "max": 100, "step": 1}),
|
||||
},
|
||||
"hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO"},
|
||||
}
|
||||
|
||||
RETURN_TYPES = ()
|
||||
FUNCTION = "run"
|
||||
|
||||
OUTPUT_NODE = True
|
||||
|
||||
CATEGORY = "output"
|
||||
DESCRIPTION = "Saves the input images to your ComfyUI output directory."
|
||||
|
||||
def run(
|
||||
self,
|
||||
images,
|
||||
filename_prefix="ComfyUI",
|
||||
file_type="png",
|
||||
quality=80,
|
||||
prompt=None,
|
||||
extra_pnginfo=None,
|
||||
):
|
||||
filename_prefix += self.prefix_append
|
||||
full_output_folder, filename, counter, subfolder, filename_prefix = (
|
||||
folder_paths.get_save_image_path(
|
||||
filename_prefix, self.output_dir, images[0].shape[1], images[0].shape[0]
|
||||
)
|
||||
)
|
||||
results = list()
|
||||
for batch_number, image in enumerate(images):
|
||||
i = 255.0 * image.cpu().numpy()
|
||||
img = Image.fromarray(np.clip(i, 0, 255).astype(np.uint8))
|
||||
metadata = PngInfo()
|
||||
if prompt is not None:
|
||||
metadata.add_text("prompt", json.dumps(prompt))
|
||||
if extra_pnginfo is not None:
|
||||
for x in extra_pnginfo:
|
||||
metadata.add_text(x, json.dumps(extra_pnginfo[x]))
|
||||
|
||||
filename_with_batch_num = filename.replace("%batch_num%", str(batch_number))
|
||||
file = f"{filename_with_batch_num}_{counter:05}_.{file_type}"
|
||||
file_path = os.path.join(full_output_folder, file)
|
||||
|
||||
if file_type == "png":
|
||||
img.save(
|
||||
file_path, pnginfo=metadata, compress_level=self.compress_level
|
||||
)
|
||||
elif file_type == "jpg":
|
||||
img.save(file_path, quality=quality, optimize=True)
|
||||
elif file_type == "webp":
|
||||
img.save(file_path, quality=quality)
|
||||
|
||||
results.append(
|
||||
{"filename": file, "subfolder": subfolder, "type": self.type}
|
||||
)
|
||||
counter += 1
|
||||
|
||||
return {"ui": {"images": results}}
|
||||
|
||||
|
||||
NODE_CLASS_MAPPINGS = {"ComfyDeployOutputImage": ComfyDeployOutputImage}
|
||||
NODE_DISPLAY_NAME_MAPPINGS = {
|
||||
"ComfyDeployOutputImage": "Image Output (ComfyDeploy)"
|
||||
}
|
1248
custom_routes.py
1248
custom_routes.py
File diff suppressed because it is too large
Load Diff
34
globals.py
34
globals.py
@ -6,12 +6,10 @@ from PIL import Image, ImageOps
|
||||
from io import BytesIO
|
||||
from pydantic import BaseModel as PydanticBaseModel
|
||||
|
||||
|
||||
class BaseModel(PydanticBaseModel):
|
||||
class Config:
|
||||
arbitrary_types_allowed = True
|
||||
|
||||
|
||||
class Status(Enum):
|
||||
NOT_STARTED = "not-started"
|
||||
RUNNING = "running"
|
||||
@ -19,7 +17,6 @@ class Status(Enum):
|
||||
FAILED = "failed"
|
||||
UPLOADING = "uploading"
|
||||
|
||||
|
||||
class StreamingPrompt(BaseModel):
|
||||
workflow_api: Any
|
||||
auth_token: str
|
||||
@ -27,9 +24,6 @@ class StreamingPrompt(BaseModel):
|
||||
running_prompt_ids: set[str] = set()
|
||||
status_endpoint: Optional[str]
|
||||
file_upload_endpoint: Optional[str]
|
||||
workflow: Any
|
||||
gpu_event_id: Optional[str] = None
|
||||
|
||||
|
||||
class SimplePrompt(BaseModel):
|
||||
status_endpoint: Optional[str]
|
||||
@ -40,39 +34,34 @@ class SimplePrompt(BaseModel):
|
||||
workflow_api: dict
|
||||
status: Status = Status.NOT_STARTED
|
||||
progress: set = set()
|
||||
last_updated_node: Optional[str] = None
|
||||
last_updated_node: Optional[str] = None,
|
||||
uploading_nodes: set = set()
|
||||
done: bool = False
|
||||
is_realtime: bool = False
|
||||
start_time: Optional[float] = None
|
||||
gpu_event_id: Optional[str] = None
|
||||
|
||||
is_realtime: bool = False,
|
||||
start_time: Optional[float] = None,
|
||||
|
||||
sockets = dict()
|
||||
prompt_metadata: dict[str, SimplePrompt] = {}
|
||||
streaming_prompt_metadata: dict[str, StreamingPrompt] = {}
|
||||
|
||||
|
||||
class BinaryEventTypes:
|
||||
PREVIEW_IMAGE = 1
|
||||
UNENCODED_PREVIEW_IMAGE = 2
|
||||
|
||||
|
||||
max_output_id_length = 24
|
||||
|
||||
|
||||
async def send_image(image_data, sid=None, output_id: str = None):
|
||||
async def send_image(image_data, sid=None, output_id:str = None):
|
||||
max_length = max_output_id_length
|
||||
output_id = output_id[:max_length]
|
||||
padded_output_id = output_id.ljust(max_length, "\x00")
|
||||
encoded_output_id = padded_output_id.encode("ascii", "replace")
|
||||
padded_output_id = output_id.ljust(max_length, '\x00')
|
||||
encoded_output_id = padded_output_id.encode('ascii', 'replace')
|
||||
|
||||
image_type = image_data[0]
|
||||
image = image_data[1]
|
||||
max_size = image_data[2]
|
||||
quality = image_data[3]
|
||||
if max_size is not None:
|
||||
if hasattr(Image, "Resampling"):
|
||||
if hasattr(Image, 'Resampling'):
|
||||
resampling = Image.Resampling.BILINEAR
|
||||
else:
|
||||
resampling = Image.ANTIALIAS
|
||||
@ -101,18 +90,12 @@ async def send_image(image_data, sid=None, output_id: str = None):
|
||||
preview_bytes = bytesIO.getvalue()
|
||||
await send_bytes(BinaryEventTypes.PREVIEW_IMAGE, preview_bytes, sid=sid)
|
||||
|
||||
|
||||
async def send_socket_catch_exception(function, message):
|
||||
try:
|
||||
await function(message)
|
||||
except (
|
||||
aiohttp.ClientError,
|
||||
aiohttp.ClientPayloadError,
|
||||
ConnectionResetError,
|
||||
) as err:
|
||||
except (aiohttp.ClientError, aiohttp.ClientPayloadError, ConnectionResetError) as err:
|
||||
print("send error:", err)
|
||||
|
||||
|
||||
def encode_bytes(event, data):
|
||||
if not isinstance(event, int):
|
||||
raise RuntimeError(f"Binary event types must be integers, got {event}")
|
||||
@ -122,7 +105,6 @@ def encode_bytes(event, data):
|
||||
message.extend(data)
|
||||
return message
|
||||
|
||||
|
||||
async def send_bytes(event, data, sid=None):
|
||||
message = encode_bytes(event, data)
|
||||
|
||||
|
@ -1,8 +1,8 @@
|
||||
[project]
|
||||
name = "comfyui-deploy"
|
||||
description = "Open source comfyui deployment platform, a vercel for generative workflow infra."
|
||||
version = "1.1.0"
|
||||
license = { file = "LICENSE" }
|
||||
version = "1.0.0"
|
||||
license = "LICENSE"
|
||||
dependencies = ["aiofiles", "pydantic", "opencv-python", "imageio-ffmpeg"]
|
||||
|
||||
[project.urls]
|
||||
|
@ -3,5 +3,4 @@ pydantic
|
||||
opencv-python
|
||||
imageio-ffmpeg
|
||||
brotli
|
||||
tabulate
|
||||
# logfire
|
4
web-plugin/api.js
Normal file
4
web-plugin/api.js
Normal file
@ -0,0 +1,4 @@
|
||||
/** @typedef {import('../../../web/scripts/api.js').api} API*/
|
||||
import { api as _api } from '../../scripts/api.js';
|
||||
/** @type {API} */
|
||||
export const api = _api;
|
4
web-plugin/app.js
Normal file
4
web-plugin/app.js
Normal file
@ -0,0 +1,4 @@
|
||||
/** @typedef {import('../../../web/scripts/app.js').ComfyApp} ComfyApp*/
|
||||
import { app as _app } from '../../scripts/app.js';
|
||||
/** @type {ComfyApp} */
|
||||
export const app = _app;
|
File diff suppressed because it is too large
Load Diff
18
web-plugin/widgets.js
Normal file
18
web-plugin/widgets.js
Normal file
@ -0,0 +1,18 @@
|
||||
// /** @typedef {import('../../../web/scripts/api.js').api} API*/
|
||||
// import { api as _api } from "../../scripts/api.js";
|
||||
// /** @type {API} */
|
||||
// export const api = _api;
|
||||
|
||||
/** @typedef {typeof import('../../../web/scripts/widgets.js').ComfyWidgets} Widgets*/
|
||||
import { ComfyWidgets as _ComfyWidgets } from "../../scripts/widgets.js";
|
||||
|
||||
/**
|
||||
* @type {Widgets}
|
||||
*/
|
||||
export const ComfyWidgets = _ComfyWidgets;
|
||||
|
||||
// import { LGraphNode as _LGraphNode } from "../../types/litegraph.js";
|
||||
|
||||
/** @typedef {typeof import('../../../web/types/litegraph.js').LGraphNode} LGraphNode*/
|
||||
/** @type {LGraphNode}*/
|
||||
export const LGraphNode = LiteGraph.LGraphNode;
|
@ -6,5 +6,4 @@ export const customInputNodes: Record<string, string> = {
|
||||
ComfyUIDeployExternalNumberInt: "integer",
|
||||
ComfyUIDeployExternalLora: "string - (public lora download url)",
|
||||
ComfyUIDeployExternalCheckpoint: "string - (public checkpoints download url)",
|
||||
ComfyUIDeployExternalFaceModel: "string - (public face model download url)",
|
||||
};
|
||||
|
Loading…
x
Reference in New Issue
Block a user