diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..1db657b --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +boto3 \ No newline at end of file diff --git a/routes.py b/routes.py index 6db1603..6ce05db 100644 --- a/routes.py +++ b/routes.py @@ -22,6 +22,7 @@ from enum import Enum import aiohttp from aiohttp import web +import boto3 api = None api_task = None @@ -96,6 +97,7 @@ async def comfy_deploy_run(request): prompt_metadata[res['prompt_id']] = { 'status_endpoint': data.get('status_endpoint'), + 'file_upload_endpoint': data.get('file_upload_endpoint'), } status = 200 @@ -151,17 +153,18 @@ async def send_json_override(self, event, data, sid=None): # now we send everything await send(event, data) + await self.send_json_original(event, data, sid) if event == 'execution_start': update_run(prompt_id, Status.RUNNING) - # if event == 'executing': - # update_run(prompt_id, Status.RUNNING) - - if event == 'executed': + # the last executing event is none, then the workflow is finished + if event == 'executing' and data.get('node') is None: update_run(prompt_id, Status.SUCCESS) - await self.send_json_original(event, data, sid) + if event == 'executed' and 'node' in data and 'output' in data: + asyncio.create_task(update_run_with_output(prompt_id, data.get('output'))) + # update_run_with_output(prompt_id, data.get('output')) class Status(Enum): @@ -171,7 +174,10 @@ class Status(Enum): FAILED = "failed" def update_run(prompt_id, status: Status): - if prompt_id in prompt_metadata and ('status' not in prompt_metadata[prompt_id] or prompt_metadata[prompt_id]['status'] != status): + if prompt_id not in prompt_metadata: + return + + if ('status' not in prompt_metadata[prompt_id] or prompt_metadata[prompt_id]['status'] != status): status_endpoint = prompt_metadata[prompt_id]['status_endpoint'] body = { "run_id": prompt_id, @@ -180,5 +186,65 @@ def update_run(prompt_id, status: Status): prompt_metadata[prompt_id]['status'] = status requests.post(status_endpoint, json=body) + +async def upload_file(prompt_id, filename, subfolder=None): + """ + Uploads file to S3 bucket using S3 client object + :return: None + """ + filename,output_dir = folder_paths.annotated_filepath(filename) + + # validation for security: prevent accessing arbitrary path + if filename[0] == '/' or '..' in filename: + return + + if output_dir is None: + output_dir = folder_paths.get_directory_by_type("output") + + if output_dir is None: + return + + if subfolder != None: + full_output_dir = os.path.join(output_dir, subfolder) + if os.path.commonpath((os.path.abspath(full_output_dir), output_dir)) != output_dir: + return + output_dir = full_output_dir + + filename = os.path.basename(filename) + file = os.path.join(output_dir, filename) + + print("uploading file", file) + + file_upload_endpoint = prompt_metadata[prompt_id]['file_upload_endpoint'] + + content_type = "image/png" + + result = requests.get(f"{file_upload_endpoint}?file_name={filename}&run_id={prompt_id}&type={content_type}") + ok = result.json() + + with open(file, 'rb') as f: + data = f.read() + headers = { + "x-amz-acl": "public-read", + "Content-Type": content_type, + "Content-Length": str(len(data)), + } + response = requests.put(ok.get("url"), headers=headers, data=data) + print("upload file response", response.status_code) + +async def update_run_with_output(prompt_id, data): + if prompt_id in prompt_metadata: + status_endpoint = prompt_metadata[prompt_id]['status_endpoint'] + + images = data.get('images', []) + for image in images: + await upload_file(prompt_id, image.get("filename"), subfolder=image.get("subfolder")) + + body = { + "run_id": prompt_id, + "output_data": data + } + requests.post(status_endpoint, json=body) + prompt_server.send_json_original = prompt_server.send_json prompt_server.send_json = send_json_override.__get__(prompt_server, server.PromptServer) \ No newline at end of file diff --git a/web/aws/buckets.sh b/web/aws/buckets.sh new file mode 100755 index 0000000..384bb6c --- /dev/null +++ b/web/aws/buckets.sh @@ -0,0 +1,2 @@ +#!/usr/bin/env bash +awslocal s3 mb s3://comfyui-deploy \ No newline at end of file diff --git a/web/bun.lockb b/web/bun.lockb index 5897671..6229656 100755 Binary files a/web/bun.lockb and b/web/bun.lockb differ diff --git a/web/docker-compose.yml b/web/docker-compose.yml index 8cce5eb..7bf20d1 100644 --- a/web/docker-compose.yml +++ b/web/docker-compose.yml @@ -19,4 +19,12 @@ services: ports: - "5481:80" depends_on: - - postgres \ No newline at end of file + - postgres + localstack: + image: localstack/localstack:latest + environment: + SERVICES: s3 + ports: + - 4566:4566 + volumes: + - ./aws:/etc/localstack/init/ready.d \ No newline at end of file diff --git a/web/drizzle/0004_zippy_freak.sql b/web/drizzle/0004_zippy_freak.sql new file mode 100644 index 0000000..43b6cd4 --- /dev/null +++ b/web/drizzle/0004_zippy_freak.sql @@ -0,0 +1,31 @@ +CREATE TABLE IF NOT EXISTS "comfy_deploy"."workflow_run_outputs" ( + "id" uuid PRIMARY KEY DEFAULT gen_random_uuid() NOT NULL, + "run_id" uuid NOT NULL, + "data" jsonb, + "created_at" timestamp DEFAULT now() NOT NULL, + "updated_at" timestamp DEFAULT now() NOT NULL +); +--> statement-breakpoint +ALTER TABLE "comfy_deploy"."machines" DROP CONSTRAINT "machines_user_id_users_id_fk"; +--> statement-breakpoint +ALTER TABLE "comfy_deploy"."workflow_runs" DROP CONSTRAINT "workflow_runs_workflow_version_id_workflow_versions_id_fk"; +--> statement-breakpoint +ALTER TABLE "comfy_deploy"."workflow_runs" ALTER COLUMN "workflow_version_id" DROP NOT NULL;--> statement-breakpoint +ALTER TABLE "comfy_deploy"."workflow_runs" ALTER COLUMN "machine_id" DROP NOT NULL;--> statement-breakpoint +DO $$ BEGIN + ALTER TABLE "comfy_deploy"."machines" ADD CONSTRAINT "machines_user_id_users_id_fk" FOREIGN KEY ("user_id") REFERENCES "comfy_deploy"."users"("id") ON DELETE cascade ON UPDATE no action; +EXCEPTION + WHEN duplicate_object THEN null; +END $$; +--> statement-breakpoint +DO $$ BEGIN + ALTER TABLE "comfy_deploy"."workflow_runs" ADD CONSTRAINT "workflow_runs_workflow_version_id_workflow_versions_id_fk" FOREIGN KEY ("workflow_version_id") REFERENCES "comfy_deploy"."workflow_versions"("id") ON DELETE set null ON UPDATE no action; +EXCEPTION + WHEN duplicate_object THEN null; +END $$; +--> statement-breakpoint +DO $$ BEGIN + ALTER TABLE "comfy_deploy"."workflow_run_outputs" ADD CONSTRAINT "workflow_run_outputs_run_id_workflow_runs_id_fk" FOREIGN KEY ("run_id") REFERENCES "comfy_deploy"."workflow_runs"("id") ON DELETE cascade ON UPDATE no action; +EXCEPTION + WHEN duplicate_object THEN null; +END $$; diff --git a/web/drizzle/meta/0004_snapshot.json b/web/drizzle/meta/0004_snapshot.json new file mode 100644 index 0000000..91ae114 --- /dev/null +++ b/web/drizzle/meta/0004_snapshot.json @@ -0,0 +1,410 @@ +{ + "id": "07a389e2-3713-4047-93e7-bf1da2333b16", + "prevId": "4e03f61d-b976-41b4-bbad-7655f73bf0fc", + "version": "5", + "dialect": "pg", + "tables": { + "machines": { + "name": "machines", + "schema": "comfy_deploy", + "columns": { + "id": { + "name": "id", + "type": "uuid", + "primaryKey": true, + "notNull": true, + "default": "gen_random_uuid()" + }, + "user_id": { + "name": "user_id", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "name": { + "name": "name", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "endpoint": { + "name": "endpoint", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + } + }, + "indexes": {}, + "foreignKeys": { + "machines_user_id_users_id_fk": { + "name": "machines_user_id_users_id_fk", + "tableFrom": "machines", + "tableTo": "users", + "columnsFrom": [ + "user_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {} + }, + "users": { + "name": "users", + "schema": "comfy_deploy", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true + }, + "username": { + "name": "username", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "name": { + "name": "name", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false, + "default": "now()" + } + }, + "indexes": {}, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {} + }, + "workflow_run_outputs": { + "name": "workflow_run_outputs", + "schema": "comfy_deploy", + "columns": { + "id": { + "name": "id", + "type": "uuid", + "primaryKey": true, + "notNull": true, + "default": "gen_random_uuid()" + }, + "run_id": { + "name": "run_id", + "type": "uuid", + "primaryKey": false, + "notNull": true + }, + "data": { + "name": "data", + "type": "jsonb", + "primaryKey": false, + "notNull": false + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + } + }, + "indexes": {}, + "foreignKeys": { + "workflow_run_outputs_run_id_workflow_runs_id_fk": { + "name": "workflow_run_outputs_run_id_workflow_runs_id_fk", + "tableFrom": "workflow_run_outputs", + "tableTo": "workflow_runs", + "columnsFrom": [ + "run_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {} + }, + "workflow_runs": { + "name": "workflow_runs", + "schema": "comfy_deploy", + "columns": { + "id": { + "name": "id", + "type": "uuid", + "primaryKey": true, + "notNull": true, + "default": "gen_random_uuid()" + }, + "workflow_version_id": { + "name": "workflow_version_id", + "type": "uuid", + "primaryKey": false, + "notNull": false + }, + "workflow_id": { + "name": "workflow_id", + "type": "uuid", + "primaryKey": false, + "notNull": true + }, + "machine_id": { + "name": "machine_id", + "type": "uuid", + "primaryKey": false, + "notNull": false + }, + "status": { + "name": "status", + "type": "workflow_run_status", + "primaryKey": false, + "notNull": true, + "default": "'not-started'" + }, + "ended_at": { + "name": "ended_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + } + }, + "indexes": {}, + "foreignKeys": { + "workflow_runs_workflow_version_id_workflow_versions_id_fk": { + "name": "workflow_runs_workflow_version_id_workflow_versions_id_fk", + "tableFrom": "workflow_runs", + "tableTo": "workflow_versions", + "columnsFrom": [ + "workflow_version_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "set null", + "onUpdate": "no action" + }, + "workflow_runs_workflow_id_workflows_id_fk": { + "name": "workflow_runs_workflow_id_workflows_id_fk", + "tableFrom": "workflow_runs", + "tableTo": "workflows", + "columnsFrom": [ + "workflow_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + }, + "workflow_runs_machine_id_machines_id_fk": { + "name": "workflow_runs_machine_id_machines_id_fk", + "tableFrom": "workflow_runs", + "tableTo": "machines", + "columnsFrom": [ + "machine_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "set null", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {} + }, + "workflows": { + "name": "workflows", + "schema": "comfy_deploy", + "columns": { + "id": { + "name": "id", + "type": "uuid", + "primaryKey": true, + "notNull": true, + "default": "gen_random_uuid()" + }, + "user_id": { + "name": "user_id", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "name": { + "name": "name", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + } + }, + "indexes": {}, + "foreignKeys": { + "workflows_user_id_users_id_fk": { + "name": "workflows_user_id_users_id_fk", + "tableFrom": "workflows", + "tableTo": "users", + "columnsFrom": [ + "user_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {} + }, + "workflow_versions": { + "name": "workflow_versions", + "schema": "comfy_deploy", + "columns": { + "workflow_id": { + "name": "workflow_id", + "type": "uuid", + "primaryKey": false, + "notNull": true + }, + "id": { + "name": "id", + "type": "uuid", + "primaryKey": true, + "notNull": true, + "default": "gen_random_uuid()" + }, + "workflow": { + "name": "workflow", + "type": "jsonb", + "primaryKey": false, + "notNull": false + }, + "workflow_api": { + "name": "workflow_api", + "type": "jsonb", + "primaryKey": false, + "notNull": false + }, + "version": { + "name": "version", + "type": "integer", + "primaryKey": false, + "notNull": true + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + } + }, + "indexes": {}, + "foreignKeys": { + "workflow_versions_workflow_id_workflows_id_fk": { + "name": "workflow_versions_workflow_id_workflows_id_fk", + "tableFrom": "workflow_versions", + "tableTo": "workflows", + "columnsFrom": [ + "workflow_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {} + } + }, + "enums": { + "workflow_run_status": { + "name": "workflow_run_status", + "values": { + "not-started": "not-started", + "running": "running", + "success": "success", + "failed": "failed" + } + } + }, + "schemas": { + "comfy_deploy": "comfy_deploy" + }, + "_meta": { + "schemas": {}, + "tables": {}, + "columns": {} + } +} \ No newline at end of file diff --git a/web/drizzle/meta/_journal.json b/web/drizzle/meta/_journal.json index 970601c..c5c9eea 100644 --- a/web/drizzle/meta/_journal.json +++ b/web/drizzle/meta/_journal.json @@ -29,6 +29,13 @@ "when": 1702212969930, "tag": "0003_oval_mockingbird", "breakpoints": true + }, + { + "idx": 4, + "version": "5", + "when": 1702357291227, + "tag": "0004_zippy_freak", + "breakpoints": true } ] } \ No newline at end of file diff --git a/web/package.json b/web/package.json index 9b8fb55..0d25ba4 100644 --- a/web/package.json +++ b/web/package.json @@ -15,6 +15,8 @@ "db-dev": "bun run db-up && bun run migrate-local" }, "dependencies": { + "@aws-sdk/client-s3": "^3.472.0", + "@aws-sdk/s3-request-presigner": "^3.472.0", "@clerk/nextjs": "^4.27.4", "@hookform/resolvers": "^3.3.2", "@neondatabase/serverless": "^0.6.0", @@ -26,6 +28,7 @@ "@radix-ui/react-slot": "^1.0.2", "@radix-ui/react-tabs": "^1.0.4", "@tanstack/react-table": "^8.10.7", + "@types/uuid": "^9.0.7", "class-variance-authority": "^0.7.0", "clsx": "^2.0.0", "dayjs": "^1.11.10", @@ -36,8 +39,10 @@ "react-dom": "^18", "react-hook-form": "^7.48.2", "react-use-websocket": "^4.5.0", + "sonner": "^1.2.4", "tailwind-merge": "^2.1.0", "tailwindcss-animate": "^1.0.7", + "uuid": "^9.0.1", "zod": "^3.22.4", "zustand": "^4.4.7" }, diff --git a/web/src/app/[workflow_id]/page.tsx b/web/src/app/[workflow_id]/page.tsx index b4aea3e..2e651ed 100644 --- a/web/src/app/[workflow_id]/page.tsx +++ b/web/src/app/[workflow_id]/page.tsx @@ -1,12 +1,11 @@ -import { RunDisplay } from "../../components/RunDisplay"; -import { LoadingIcon } from "@/components/LoadingIcon"; +import { RunsTable } from "../../components/RunsTable"; +import { findFirstTableWithVersion } from "../../server/findFirstTableWithVersion"; import { MachinesWSMain } from "@/components/MachinesWS"; import { MachineSelect, RunWorkflowButton, VersionSelect, } from "@/components/VersionSelect"; -import { Badge } from "@/components/ui/badge"; import { Card, CardContent, @@ -14,49 +13,8 @@ import { CardHeader, CardTitle, } from "@/components/ui/card"; -import { - Table, - TableBody, - TableCaption, - TableHead, - TableHeader, - TableRow, -} from "@/components/ui/table"; -import { db } from "@/db/db"; -import { - workflowRunsTable, - workflowTable, - workflowVersionTable, -} from "@/db/schema"; import { getRelativeTime } from "@/lib/getRelativeTime"; import { getMachines } from "@/server/curdMachine"; -import { desc, eq } from "drizzle-orm"; - -export async function findFirstTableWithVersion(workflow_id: string) { - return await db.query.workflowTable.findFirst({ - with: { versions: { orderBy: desc(workflowVersionTable.version) } }, - where: eq(workflowTable.id, workflow_id), - }); -} - -export async function findAllRuns(workflow_id: string) { - return await db.query.workflowRunsTable.findMany({ - where: eq(workflowRunsTable.workflow_id, workflow_id), - orderBy: desc(workflowRunsTable.created_at), - with: { - machine: { - columns: { - name: true, - }, - }, - version: { - columns: { - version: true, - }, - }, - }, - }); -} export default async function Page({ params, @@ -69,7 +27,7 @@ export default async function Page({ const machines = await getMachines(); return ( -