feat: machines logs viewer
This commit is contained in:
		
							parent
							
								
									9cc3c6ffe3
								
							
						
					
					
						commit
						2690db12b5
					
				@ -17,8 +17,11 @@ import uuid
 | 
			
		||||
import asyncio
 | 
			
		||||
import atexit
 | 
			
		||||
import logging
 | 
			
		||||
import sys
 | 
			
		||||
from logging.handlers import RotatingFileHandler
 | 
			
		||||
from enum import Enum
 | 
			
		||||
from urllib.parse import quote
 | 
			
		||||
import threading
 | 
			
		||||
 | 
			
		||||
api = None
 | 
			
		||||
api_task = None
 | 
			
		||||
@ -124,6 +127,7 @@ async def websocket_handler(request):
 | 
			
		||||
    try:
 | 
			
		||||
        # Send initial state to the new client
 | 
			
		||||
        await send("status", { 'sid': sid }, sid)
 | 
			
		||||
        await send_first_time_log(sid)
 | 
			
		||||
            
 | 
			
		||||
        async for msg in ws:
 | 
			
		||||
            if msg.type == aiohttp.WSMsgType.ERROR:
 | 
			
		||||
@ -136,7 +140,7 @@ async def send(event, data, sid=None):
 | 
			
		||||
    try:
 | 
			
		||||
        if sid:
 | 
			
		||||
            ws = sockets.get(sid)
 | 
			
		||||
            if ws and not ws.closed:  # Check if the WebSocket connection is open and not closing
 | 
			
		||||
            if ws != None and not ws.closed:  # Check if the WebSocket connection is open and not closing
 | 
			
		||||
                await ws.send_json({ 'event': event, 'data': data })
 | 
			
		||||
        else:
 | 
			
		||||
            for ws in sockets.values():
 | 
			
		||||
@ -292,3 +296,47 @@ async def update_run_with_output(prompt_id, data):
 | 
			
		||||
 | 
			
		||||
prompt_server.send_json_original = prompt_server.send_json
 | 
			
		||||
prompt_server.send_json = send_json_override.__get__(prompt_server, server.PromptServer)
 | 
			
		||||
 | 
			
		||||
root_path = os.path.dirname(os.path.abspath(__file__))
 | 
			
		||||
two_dirs_up = os.path.dirname(os.path.dirname(root_path))
 | 
			
		||||
log_file_path = os.path.join(two_dirs_up, 'comfy-deploy.log')
 | 
			
		||||
 | 
			
		||||
last_read_line = 0
 | 
			
		||||
 | 
			
		||||
async def watch_file_changes(file_path, callback):
 | 
			
		||||
    global last_read_line
 | 
			
		||||
    last_modified_time = os.stat(file_path).st_mtime
 | 
			
		||||
    while True:
 | 
			
		||||
        time.sleep(1)  # sleep for a while to reduce CPU usage
 | 
			
		||||
        modified_time = os.stat(file_path).st_mtime
 | 
			
		||||
        if modified_time != last_modified_time:
 | 
			
		||||
            last_modified_time = modified_time
 | 
			
		||||
            with open(file_path, 'r') as file:
 | 
			
		||||
                lines = file.readlines()
 | 
			
		||||
            if last_read_line > len(lines):
 | 
			
		||||
                last_read_line = 0  # Reset if log file has been rotated
 | 
			
		||||
            new_lines = lines[last_read_line:]
 | 
			
		||||
            last_read_line = len(lines)
 | 
			
		||||
            if new_lines:
 | 
			
		||||
                await callback(''.join(new_lines))
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def send_first_time_log(sid):
 | 
			
		||||
    with open(log_file_path, 'r') as file:
 | 
			
		||||
        lines = file.readlines()
 | 
			
		||||
    await send("LOGS", ''.join(lines), sid)
 | 
			
		||||
 | 
			
		||||
async def send_logs_to_websocket(logs):
 | 
			
		||||
    await send("LOGS", logs)
 | 
			
		||||
 | 
			
		||||
def start_loop(loop):
 | 
			
		||||
    asyncio.set_event_loop(loop)
 | 
			
		||||
    loop.run_forever()
 | 
			
		||||
 | 
			
		||||
def run_in_new_thread(coroutine):
 | 
			
		||||
    new_loop = asyncio.new_event_loop()
 | 
			
		||||
    t = threading.Thread(target=start_loop, args=(new_loop,), daemon=True)
 | 
			
		||||
    t.start()
 | 
			
		||||
    asyncio.run_coroutine_threadsafe(coroutine, new_loop)
 | 
			
		||||
 | 
			
		||||
run_in_new_thread(watch_file_changes(log_file_path, send_logs_to_websocket))
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										39
									
								
								prestartup_script.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										39
									
								
								prestartup_script.py
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,39 @@
 | 
			
		||||
import datetime
 | 
			
		||||
import os
 | 
			
		||||
import subprocess
 | 
			
		||||
import sys
 | 
			
		||||
import atexit
 | 
			
		||||
import threading
 | 
			
		||||
import logging
 | 
			
		||||
from logging.handlers import RotatingFileHandler
 | 
			
		||||
 | 
			
		||||
class StreamToLogger(object):
 | 
			
		||||
    def __init__(self, original, logger, log_level):
 | 
			
		||||
        self.original_stdout = original
 | 
			
		||||
        self.logger = logger
 | 
			
		||||
        self.log_level = log_level
 | 
			
		||||
 | 
			
		||||
    def write(self, buf):
 | 
			
		||||
        self.original_stdout.write(buf)
 | 
			
		||||
        self.original_stdout.flush()
 | 
			
		||||
        for line in buf.rstrip().splitlines():
 | 
			
		||||
            self.logger.log(self.log_level, line.rstrip())
 | 
			
		||||
 | 
			
		||||
    def flush(self):
 | 
			
		||||
        self.original_stdout.flush()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
logger = logging.getLogger()
 | 
			
		||||
logger.setLevel(logging.INFO)
 | 
			
		||||
 | 
			
		||||
# Create a handler that rotates log files every 1MB
 | 
			
		||||
handler = RotatingFileHandler('comfy-deploy.log', maxBytes=500000, backupCount=5)
 | 
			
		||||
logger.addHandler(handler)
 | 
			
		||||
 | 
			
		||||
# Store original streams
 | 
			
		||||
original_stdout = sys.stdout
 | 
			
		||||
original_stderr = sys.stderr
 | 
			
		||||
 | 
			
		||||
# Redirect stdout and stderr to the logger
 | 
			
		||||
sys.stdout = StreamToLogger(original_stdout, logger, logging.INFO)
 | 
			
		||||
sys.stderr = StreamToLogger(original_stderr, logger, logging.ERROR)
 | 
			
		||||
@ -1,9 +1,17 @@
 | 
			
		||||
"use client";
 | 
			
		||||
 | 
			
		||||
import { Badge } from "@/components/ui/badge";
 | 
			
		||||
import {
 | 
			
		||||
  Dialog,
 | 
			
		||||
  DialogContent,
 | 
			
		||||
  DialogDescription,
 | 
			
		||||
  DialogHeader,
 | 
			
		||||
  DialogTitle,
 | 
			
		||||
  DialogTrigger,
 | 
			
		||||
} from "@/components/ui/dialog";
 | 
			
		||||
import type { getMachines } from "@/server/curdMachine";
 | 
			
		||||
import { Check, CircleOff, SatelliteDish } from "lucide-react";
 | 
			
		||||
import React, { useEffect } from "react";
 | 
			
		||||
import React, { useEffect, useRef, useState } from "react";
 | 
			
		||||
import useWebSocket, { ReadyState } from "react-use-websocket";
 | 
			
		||||
import { create } from "zustand";
 | 
			
		||||
 | 
			
		||||
@ -16,6 +24,12 @@ type State = {
 | 
			
		||||
      data: any;
 | 
			
		||||
    };
 | 
			
		||||
  }[];
 | 
			
		||||
  logs: {
 | 
			
		||||
    machine_id: string;
 | 
			
		||||
    logs: string;
 | 
			
		||||
    timestamp: number;
 | 
			
		||||
  }[];
 | 
			
		||||
  addLogs: (id: string, logs: string) => void;
 | 
			
		||||
  addData: (
 | 
			
		||||
    id: string,
 | 
			
		||||
    json: {
 | 
			
		||||
@ -27,6 +41,13 @@ type State = {
 | 
			
		||||
 | 
			
		||||
export const useStore = create<State>((set) => ({
 | 
			
		||||
  data: [],
 | 
			
		||||
  logs: [],
 | 
			
		||||
  addLogs(id, logs) {
 | 
			
		||||
    set((state) => ({
 | 
			
		||||
      ...state,
 | 
			
		||||
      logs: [...state.logs, { machine_id: id, logs, timestamp: Date.now() }],
 | 
			
		||||
    }));
 | 
			
		||||
  },
 | 
			
		||||
  addData: (id, json) =>
 | 
			
		||||
    set((state) => ({
 | 
			
		||||
      ...state,
 | 
			
		||||
@ -54,7 +75,14 @@ function MachineWS({
 | 
			
		||||
}: {
 | 
			
		||||
  machine: Awaited<ReturnType<typeof getMachines>>[0];
 | 
			
		||||
}) {
 | 
			
		||||
  const { addData } = useStore();
 | 
			
		||||
  const { addData, addLogs } = useStore();
 | 
			
		||||
  const logs = useStore((x) =>
 | 
			
		||||
    x.logs
 | 
			
		||||
      .filter((p) => p.machine_id === machine.id)
 | 
			
		||||
      .sort((a, b) => a.timestamp - b.timestamp)
 | 
			
		||||
  );
 | 
			
		||||
  const [sid, setSid] = useState("");
 | 
			
		||||
 | 
			
		||||
  const wsEndpoint = machine.endpoint.replace(/^http/, "ws");
 | 
			
		||||
  const { lastMessage, readyState } = useWebSocket(
 | 
			
		||||
    `${wsEndpoint}/comfyui-deploy/ws`,
 | 
			
		||||
@ -62,6 +90,9 @@ function MachineWS({
 | 
			
		||||
      shouldReconnect: () => true,
 | 
			
		||||
      reconnectAttempts: 20,
 | 
			
		||||
      reconnectInterval: 1000,
 | 
			
		||||
      // queryParams: {
 | 
			
		||||
      //   clientId: sid,
 | 
			
		||||
      // },
 | 
			
		||||
    }
 | 
			
		||||
  );
 | 
			
		||||
 | 
			
		||||
@ -75,20 +106,72 @@ function MachineWS({
 | 
			
		||||
    [ReadyState.UNINSTANTIATED]: "Uninstantiated",
 | 
			
		||||
  }[readyState];
 | 
			
		||||
 | 
			
		||||
  const container = useRef<HTMLDivElement | null>(null);
 | 
			
		||||
 | 
			
		||||
  useEffect(() => {
 | 
			
		||||
    if (!lastMessage?.data) return;
 | 
			
		||||
 | 
			
		||||
    const message = JSON.parse(lastMessage.data);
 | 
			
		||||
    console.log(message.event, message);
 | 
			
		||||
 | 
			
		||||
    if (message.data.sid) {
 | 
			
		||||
      setSid(message.data.sid);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    if (message.data?.prompt_id) {
 | 
			
		||||
      addData(message.data.prompt_id, message);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    if (message.event === "LOGS") {
 | 
			
		||||
      addLogs(machine.id, message.data);
 | 
			
		||||
    }
 | 
			
		||||
  }, [lastMessage]);
 | 
			
		||||
 | 
			
		||||
  useEffect(() => {
 | 
			
		||||
    // console.log(logs.length, container.current);
 | 
			
		||||
    if (container.current) {
 | 
			
		||||
      const scrollHeight = container.current.scrollHeight;
 | 
			
		||||
 | 
			
		||||
      container.current.scrollTo({
 | 
			
		||||
        top: scrollHeight,
 | 
			
		||||
        behavior: "smooth",
 | 
			
		||||
      });
 | 
			
		||||
    }
 | 
			
		||||
  }, [logs.length]);
 | 
			
		||||
 | 
			
		||||
  return (
 | 
			
		||||
    <Dialog>
 | 
			
		||||
      <DialogTrigger asChild className="">
 | 
			
		||||
        <Badge className="text-sm flex gap-2 font-normal" variant="outline">
 | 
			
		||||
          {machine.name} {connectionStatus}
 | 
			
		||||
        </Badge>
 | 
			
		||||
      </DialogTrigger>
 | 
			
		||||
      <DialogContent className="max-w-3xl max-h-full">
 | 
			
		||||
        <DialogHeader>
 | 
			
		||||
          <DialogTitle>Machine Logs</DialogTitle>
 | 
			
		||||
          <DialogDescription>
 | 
			
		||||
            You can view your run's outputs here
 | 
			
		||||
          </DialogDescription>
 | 
			
		||||
        </DialogHeader>
 | 
			
		||||
        <div
 | 
			
		||||
          ref={(ref) => {
 | 
			
		||||
            if (!container.current && ref) {
 | 
			
		||||
              const scrollHeight = ref.scrollHeight;
 | 
			
		||||
 | 
			
		||||
              ref.scrollTo({
 | 
			
		||||
                top: scrollHeight,
 | 
			
		||||
                behavior: "instant",
 | 
			
		||||
              });
 | 
			
		||||
            }
 | 
			
		||||
            container.current = ref;
 | 
			
		||||
          }}
 | 
			
		||||
          className="flex flex-col text-xs p-2 overflow-y-scroll max-h-[400px] whitespace-break-spaces"
 | 
			
		||||
        >
 | 
			
		||||
          {logs.map((x, i) => (
 | 
			
		||||
            <div key={i}>{x.logs}</div>
 | 
			
		||||
          ))}
 | 
			
		||||
        </div>
 | 
			
		||||
      </DialogContent>
 | 
			
		||||
    </Dialog>
 | 
			
		||||
  );
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -1,6 +1,5 @@
 | 
			
		||||
 | 
			
		||||
import { RunInputs } from "@/components/RunInputs";
 | 
			
		||||
import { LiveStatus } from "./LiveStatus";
 | 
			
		||||
import { RunInputs } from "@/components/RunInputs";
 | 
			
		||||
import { RunOutputs } from "@/components/RunOutputs";
 | 
			
		||||
import {
 | 
			
		||||
  Dialog,
 | 
			
		||||
@ -22,10 +21,7 @@ export async function RunDisplay({
 | 
			
		||||
}) {
 | 
			
		||||
  return (
 | 
			
		||||
    <Dialog>
 | 
			
		||||
      <DialogTrigger
 | 
			
		||||
        asChild
 | 
			
		||||
        className="appearance-none hover:cursor-pointer"
 | 
			
		||||
      >
 | 
			
		||||
      <DialogTrigger asChild className="appearance-none hover:cursor-pointer">
 | 
			
		||||
        <TableRow>
 | 
			
		||||
          <TableCell>{run.number}</TableCell>
 | 
			
		||||
          <TableCell className="font-medium">{run.machine?.name}</TableCell>
 | 
			
		||||
@ -42,7 +38,7 @@ export async function RunDisplay({
 | 
			
		||||
          </DialogDescription>
 | 
			
		||||
        </DialogHeader>
 | 
			
		||||
        <div className="max-h-96 overflow-y-scroll">
 | 
			
		||||
          <RunInputs run={run}/>
 | 
			
		||||
          <RunInputs run={run} />
 | 
			
		||||
          <Suspense>
 | 
			
		||||
            <RunOutputs run_id={run.id} />
 | 
			
		||||
          </Suspense>
 | 
			
		||||
 | 
			
		||||
@ -39,7 +39,7 @@ export async function RunsTable(props: { workflow_id: string }) {
 | 
			
		||||
export async function DeploymentsTable(props: { workflow_id: string }) {
 | 
			
		||||
  const allRuns = await findAllDeployments(props.workflow_id);
 | 
			
		||||
  return (
 | 
			
		||||
    <div className="overflow-auto h-fit lg:h-[400px] w-full">
 | 
			
		||||
    <div className="overflow-auto h-fit  w-full">
 | 
			
		||||
      <Table className="">
 | 
			
		||||
        <TableCaption>A list of your deployments</TableCaption>
 | 
			
		||||
        <TableHeader className="bg-background top-0 sticky">
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user