From 2690db12b5883bcf9525ce74f49f3e6236813694 Mon Sep 17 00:00:00 2001 From: BennyKok Date: Fri, 22 Dec 2023 00:24:12 +0800 Subject: [PATCH] feat: machines logs viewer --- custom_routes.py | 52 ++++++++++++++++- prestartup_script.py | 39 +++++++++++++ web/src/components/MachinesWS.tsx | 93 +++++++++++++++++++++++++++++-- web/src/components/RunDisplay.tsx | 10 +--- web/src/components/RunsTable.tsx | 2 +- 5 files changed, 181 insertions(+), 15 deletions(-) create mode 100644 prestartup_script.py diff --git a/custom_routes.py b/custom_routes.py index 0d61855..49ce446 100644 --- a/custom_routes.py +++ b/custom_routes.py @@ -17,8 +17,11 @@ import uuid import asyncio import atexit import logging +import sys +from logging.handlers import RotatingFileHandler from enum import Enum from urllib.parse import quote +import threading api = None api_task = None @@ -124,6 +127,7 @@ async def websocket_handler(request): try: # Send initial state to the new client await send("status", { 'sid': sid }, sid) + await send_first_time_log(sid) async for msg in ws: if msg.type == aiohttp.WSMsgType.ERROR: @@ -136,7 +140,7 @@ async def send(event, data, sid=None): try: if sid: ws = sockets.get(sid) - if ws and not ws.closed: # Check if the WebSocket connection is open and not closing + if ws != None and not ws.closed: # Check if the WebSocket connection is open and not closing await ws.send_json({ 'event': event, 'data': data }) else: for ws in sockets.values(): @@ -291,4 +295,48 @@ async def update_run_with_output(prompt_id, data): }) prompt_server.send_json_original = prompt_server.send_json -prompt_server.send_json = send_json_override.__get__(prompt_server, server.PromptServer) \ No newline at end of file +prompt_server.send_json = send_json_override.__get__(prompt_server, server.PromptServer) + +root_path = os.path.dirname(os.path.abspath(__file__)) +two_dirs_up = os.path.dirname(os.path.dirname(root_path)) +log_file_path = os.path.join(two_dirs_up, 'comfy-deploy.log') + +last_read_line = 0 + +async def watch_file_changes(file_path, callback): + global last_read_line + last_modified_time = os.stat(file_path).st_mtime + while True: + time.sleep(1) # sleep for a while to reduce CPU usage + modified_time = os.stat(file_path).st_mtime + if modified_time != last_modified_time: + last_modified_time = modified_time + with open(file_path, 'r') as file: + lines = file.readlines() + if last_read_line > len(lines): + last_read_line = 0 # Reset if log file has been rotated + new_lines = lines[last_read_line:] + last_read_line = len(lines) + if new_lines: + await callback(''.join(new_lines)) + + +async def send_first_time_log(sid): + with open(log_file_path, 'r') as file: + lines = file.readlines() + await send("LOGS", ''.join(lines), sid) + +async def send_logs_to_websocket(logs): + await send("LOGS", logs) + +def start_loop(loop): + asyncio.set_event_loop(loop) + loop.run_forever() + +def run_in_new_thread(coroutine): + new_loop = asyncio.new_event_loop() + t = threading.Thread(target=start_loop, args=(new_loop,), daemon=True) + t.start() + asyncio.run_coroutine_threadsafe(coroutine, new_loop) + +run_in_new_thread(watch_file_changes(log_file_path, send_logs_to_websocket)) diff --git a/prestartup_script.py b/prestartup_script.py new file mode 100644 index 0000000..6f090cc --- /dev/null +++ b/prestartup_script.py @@ -0,0 +1,39 @@ +import datetime +import os +import subprocess +import sys +import atexit +import threading +import logging +from logging.handlers import RotatingFileHandler + +class StreamToLogger(object): + def __init__(self, original, logger, log_level): + self.original_stdout = original + self.logger = logger + self.log_level = log_level + + def write(self, buf): + self.original_stdout.write(buf) + self.original_stdout.flush() + for line in buf.rstrip().splitlines(): + self.logger.log(self.log_level, line.rstrip()) + + def flush(self): + self.original_stdout.flush() + + +logger = logging.getLogger() +logger.setLevel(logging.INFO) + +# Create a handler that rotates log files every 1MB +handler = RotatingFileHandler('comfy-deploy.log', maxBytes=500000, backupCount=5) +logger.addHandler(handler) + +# Store original streams +original_stdout = sys.stdout +original_stderr = sys.stderr + +# Redirect stdout and stderr to the logger +sys.stdout = StreamToLogger(original_stdout, logger, logging.INFO) +sys.stderr = StreamToLogger(original_stderr, logger, logging.ERROR) \ No newline at end of file diff --git a/web/src/components/MachinesWS.tsx b/web/src/components/MachinesWS.tsx index 7b065be..3aaff34 100644 --- a/web/src/components/MachinesWS.tsx +++ b/web/src/components/MachinesWS.tsx @@ -1,9 +1,17 @@ "use client"; import { Badge } from "@/components/ui/badge"; +import { + Dialog, + DialogContent, + DialogDescription, + DialogHeader, + DialogTitle, + DialogTrigger, +} from "@/components/ui/dialog"; import type { getMachines } from "@/server/curdMachine"; import { Check, CircleOff, SatelliteDish } from "lucide-react"; -import React, { useEffect } from "react"; +import React, { useEffect, useRef, useState } from "react"; import useWebSocket, { ReadyState } from "react-use-websocket"; import { create } from "zustand"; @@ -16,6 +24,12 @@ type State = { data: any; }; }[]; + logs: { + machine_id: string; + logs: string; + timestamp: number; + }[]; + addLogs: (id: string, logs: string) => void; addData: ( id: string, json: { @@ -27,6 +41,13 @@ type State = { export const useStore = create((set) => ({ data: [], + logs: [], + addLogs(id, logs) { + set((state) => ({ + ...state, + logs: [...state.logs, { machine_id: id, logs, timestamp: Date.now() }], + })); + }, addData: (id, json) => set((state) => ({ ...state, @@ -54,7 +75,14 @@ function MachineWS({ }: { machine: Awaited>[0]; }) { - const { addData } = useStore(); + const { addData, addLogs } = useStore(); + const logs = useStore((x) => + x.logs + .filter((p) => p.machine_id === machine.id) + .sort((a, b) => a.timestamp - b.timestamp) + ); + const [sid, setSid] = useState(""); + const wsEndpoint = machine.endpoint.replace(/^http/, "ws"); const { lastMessage, readyState } = useWebSocket( `${wsEndpoint}/comfyui-deploy/ws`, @@ -62,6 +90,9 @@ function MachineWS({ shouldReconnect: () => true, reconnectAttempts: 20, reconnectInterval: 1000, + // queryParams: { + // clientId: sid, + // }, } ); @@ -75,20 +106,72 @@ function MachineWS({ [ReadyState.UNINSTANTIATED]: "Uninstantiated", }[readyState]; + const container = useRef(null); + useEffect(() => { if (!lastMessage?.data) return; const message = JSON.parse(lastMessage.data); console.log(message.event, message); + if (message.data.sid) { + setSid(message.data.sid); + } + if (message.data?.prompt_id) { addData(message.data.prompt_id, message); } + + if (message.event === "LOGS") { + addLogs(machine.id, message.data); + } }, [lastMessage]); + useEffect(() => { + // console.log(logs.length, container.current); + if (container.current) { + const scrollHeight = container.current.scrollHeight; + + container.current.scrollTo({ + top: scrollHeight, + behavior: "smooth", + }); + } + }, [logs.length]); + return ( - - {machine.name} {connectionStatus} - + + + + {machine.name} {connectionStatus} + + + + + Machine Logs + + You can view your run's outputs here + + +
{ + if (!container.current && ref) { + const scrollHeight = ref.scrollHeight; + + ref.scrollTo({ + top: scrollHeight, + behavior: "instant", + }); + } + container.current = ref; + }} + className="flex flex-col text-xs p-2 overflow-y-scroll max-h-[400px] whitespace-break-spaces" + > + {logs.map((x, i) => ( +
{x.logs}
+ ))} +
+
+
); } diff --git a/web/src/components/RunDisplay.tsx b/web/src/components/RunDisplay.tsx index 18ce7a6..8ee07d0 100644 --- a/web/src/components/RunDisplay.tsx +++ b/web/src/components/RunDisplay.tsx @@ -1,6 +1,5 @@ - -import { RunInputs } from "@/components/RunInputs"; import { LiveStatus } from "./LiveStatus"; +import { RunInputs } from "@/components/RunInputs"; import { RunOutputs } from "@/components/RunOutputs"; import { Dialog, @@ -22,10 +21,7 @@ export async function RunDisplay({ }) { return ( - + {run.number} {run.machine?.name} @@ -42,7 +38,7 @@ export async function RunDisplay({
- + diff --git a/web/src/components/RunsTable.tsx b/web/src/components/RunsTable.tsx index cad6091..75d61a3 100644 --- a/web/src/components/RunsTable.tsx +++ b/web/src/components/RunsTable.tsx @@ -39,7 +39,7 @@ export async function RunsTable(props: { workflow_id: string }) { export async function DeploymentsTable(props: { workflow_id: string }) { const allRuns = await findAllDeployments(props.workflow_id); return ( -
+
A list of your deployments