feat: add s3 localstack, upload api

This commit is contained in:
BennyKok 2023-12-13 17:14:55 +08:00
parent f9ed8145d2
commit 0835d966f1
34 changed files with 1112 additions and 227 deletions

1
requirements.txt Normal file
View File

@ -0,0 +1 @@
boto3

View File

@ -22,6 +22,7 @@ from enum import Enum
import aiohttp
from aiohttp import web
import boto3
api = None
api_task = None
@ -96,6 +97,7 @@ async def comfy_deploy_run(request):
prompt_metadata[res['prompt_id']] = {
'status_endpoint': data.get('status_endpoint'),
'file_upload_endpoint': data.get('file_upload_endpoint'),
}
status = 200
@ -151,17 +153,18 @@ async def send_json_override(self, event, data, sid=None):
# now we send everything
await send(event, data)
await self.send_json_original(event, data, sid)
if event == 'execution_start':
update_run(prompt_id, Status.RUNNING)
# if event == 'executing':
# update_run(prompt_id, Status.RUNNING)
if event == 'executed':
# the last executing event is none, then the workflow is finished
if event == 'executing' and data.get('node') is None:
update_run(prompt_id, Status.SUCCESS)
await self.send_json_original(event, data, sid)
if event == 'executed' and 'node' in data and 'output' in data:
asyncio.create_task(update_run_with_output(prompt_id, data.get('output')))
# update_run_with_output(prompt_id, data.get('output'))
class Status(Enum):
@ -171,7 +174,10 @@ class Status(Enum):
FAILED = "failed"
def update_run(prompt_id, status: Status):
if prompt_id in prompt_metadata and ('status' not in prompt_metadata[prompt_id] or prompt_metadata[prompt_id]['status'] != status):
if prompt_id not in prompt_metadata:
return
if ('status' not in prompt_metadata[prompt_id] or prompt_metadata[prompt_id]['status'] != status):
status_endpoint = prompt_metadata[prompt_id]['status_endpoint']
body = {
"run_id": prompt_id,
@ -180,5 +186,65 @@ def update_run(prompt_id, status: Status):
prompt_metadata[prompt_id]['status'] = status
requests.post(status_endpoint, json=body)
async def upload_file(prompt_id, filename, subfolder=None):
"""
Uploads file to S3 bucket using S3 client object
:return: None
"""
filename,output_dir = folder_paths.annotated_filepath(filename)
# validation for security: prevent accessing arbitrary path
if filename[0] == '/' or '..' in filename:
return
if output_dir is None:
output_dir = folder_paths.get_directory_by_type("output")
if output_dir is None:
return
if subfolder != None:
full_output_dir = os.path.join(output_dir, subfolder)
if os.path.commonpath((os.path.abspath(full_output_dir), output_dir)) != output_dir:
return
output_dir = full_output_dir
filename = os.path.basename(filename)
file = os.path.join(output_dir, filename)
print("uploading file", file)
file_upload_endpoint = prompt_metadata[prompt_id]['file_upload_endpoint']
content_type = "image/png"
result = requests.get(f"{file_upload_endpoint}?file_name={filename}&run_id={prompt_id}&type={content_type}")
ok = result.json()
with open(file, 'rb') as f:
data = f.read()
headers = {
"x-amz-acl": "public-read",
"Content-Type": content_type,
"Content-Length": str(len(data)),
}
response = requests.put(ok.get("url"), headers=headers, data=data)
print("upload file response", response.status_code)
async def update_run_with_output(prompt_id, data):
if prompt_id in prompt_metadata:
status_endpoint = prompt_metadata[prompt_id]['status_endpoint']
images = data.get('images', [])
for image in images:
await upload_file(prompt_id, image.get("filename"), subfolder=image.get("subfolder"))
body = {
"run_id": prompt_id,
"output_data": data
}
requests.post(status_endpoint, json=body)
prompt_server.send_json_original = prompt_server.send_json
prompt_server.send_json = send_json_override.__get__(prompt_server, server.PromptServer)

2
web/aws/buckets.sh Executable file
View File

@ -0,0 +1,2 @@
#!/usr/bin/env bash
awslocal s3 mb s3://comfyui-deploy

Binary file not shown.

View File

@ -19,4 +19,12 @@ services:
ports:
- "5481:80"
depends_on:
- postgres
- postgres
localstack:
image: localstack/localstack:latest
environment:
SERVICES: s3
ports:
- 4566:4566
volumes:
- ./aws:/etc/localstack/init/ready.d

View File

@ -0,0 +1,31 @@
CREATE TABLE IF NOT EXISTS "comfy_deploy"."workflow_run_outputs" (
"id" uuid PRIMARY KEY DEFAULT gen_random_uuid() NOT NULL,
"run_id" uuid NOT NULL,
"data" jsonb,
"created_at" timestamp DEFAULT now() NOT NULL,
"updated_at" timestamp DEFAULT now() NOT NULL
);
--> statement-breakpoint
ALTER TABLE "comfy_deploy"."machines" DROP CONSTRAINT "machines_user_id_users_id_fk";
--> statement-breakpoint
ALTER TABLE "comfy_deploy"."workflow_runs" DROP CONSTRAINT "workflow_runs_workflow_version_id_workflow_versions_id_fk";
--> statement-breakpoint
ALTER TABLE "comfy_deploy"."workflow_runs" ALTER COLUMN "workflow_version_id" DROP NOT NULL;--> statement-breakpoint
ALTER TABLE "comfy_deploy"."workflow_runs" ALTER COLUMN "machine_id" DROP NOT NULL;--> statement-breakpoint
DO $$ BEGIN
ALTER TABLE "comfy_deploy"."machines" ADD CONSTRAINT "machines_user_id_users_id_fk" FOREIGN KEY ("user_id") REFERENCES "comfy_deploy"."users"("id") ON DELETE cascade ON UPDATE no action;
EXCEPTION
WHEN duplicate_object THEN null;
END $$;
--> statement-breakpoint
DO $$ BEGIN
ALTER TABLE "comfy_deploy"."workflow_runs" ADD CONSTRAINT "workflow_runs_workflow_version_id_workflow_versions_id_fk" FOREIGN KEY ("workflow_version_id") REFERENCES "comfy_deploy"."workflow_versions"("id") ON DELETE set null ON UPDATE no action;
EXCEPTION
WHEN duplicate_object THEN null;
END $$;
--> statement-breakpoint
DO $$ BEGIN
ALTER TABLE "comfy_deploy"."workflow_run_outputs" ADD CONSTRAINT "workflow_run_outputs_run_id_workflow_runs_id_fk" FOREIGN KEY ("run_id") REFERENCES "comfy_deploy"."workflow_runs"("id") ON DELETE cascade ON UPDATE no action;
EXCEPTION
WHEN duplicate_object THEN null;
END $$;

View File

@ -0,0 +1,410 @@
{
"id": "07a389e2-3713-4047-93e7-bf1da2333b16",
"prevId": "4e03f61d-b976-41b4-bbad-7655f73bf0fc",
"version": "5",
"dialect": "pg",
"tables": {
"machines": {
"name": "machines",
"schema": "comfy_deploy",
"columns": {
"id": {
"name": "id",
"type": "uuid",
"primaryKey": true,
"notNull": true,
"default": "gen_random_uuid()"
},
"user_id": {
"name": "user_id",
"type": "text",
"primaryKey": false,
"notNull": true
},
"name": {
"name": "name",
"type": "text",
"primaryKey": false,
"notNull": true
},
"endpoint": {
"name": "endpoint",
"type": "text",
"primaryKey": false,
"notNull": true
},
"created_at": {
"name": "created_at",
"type": "timestamp",
"primaryKey": false,
"notNull": true,
"default": "now()"
},
"updated_at": {
"name": "updated_at",
"type": "timestamp",
"primaryKey": false,
"notNull": true,
"default": "now()"
}
},
"indexes": {},
"foreignKeys": {
"machines_user_id_users_id_fk": {
"name": "machines_user_id_users_id_fk",
"tableFrom": "machines",
"tableTo": "users",
"columnsFrom": [
"user_id"
],
"columnsTo": [
"id"
],
"onDelete": "cascade",
"onUpdate": "no action"
}
},
"compositePrimaryKeys": {},
"uniqueConstraints": {}
},
"users": {
"name": "users",
"schema": "comfy_deploy",
"columns": {
"id": {
"name": "id",
"type": "text",
"primaryKey": true,
"notNull": true
},
"username": {
"name": "username",
"type": "text",
"primaryKey": false,
"notNull": true
},
"name": {
"name": "name",
"type": "text",
"primaryKey": false,
"notNull": true
},
"created_at": {
"name": "created_at",
"type": "timestamp",
"primaryKey": false,
"notNull": false,
"default": "now()"
},
"updated_at": {
"name": "updated_at",
"type": "timestamp",
"primaryKey": false,
"notNull": false,
"default": "now()"
}
},
"indexes": {},
"foreignKeys": {},
"compositePrimaryKeys": {},
"uniqueConstraints": {}
},
"workflow_run_outputs": {
"name": "workflow_run_outputs",
"schema": "comfy_deploy",
"columns": {
"id": {
"name": "id",
"type": "uuid",
"primaryKey": true,
"notNull": true,
"default": "gen_random_uuid()"
},
"run_id": {
"name": "run_id",
"type": "uuid",
"primaryKey": false,
"notNull": true
},
"data": {
"name": "data",
"type": "jsonb",
"primaryKey": false,
"notNull": false
},
"created_at": {
"name": "created_at",
"type": "timestamp",
"primaryKey": false,
"notNull": true,
"default": "now()"
},
"updated_at": {
"name": "updated_at",
"type": "timestamp",
"primaryKey": false,
"notNull": true,
"default": "now()"
}
},
"indexes": {},
"foreignKeys": {
"workflow_run_outputs_run_id_workflow_runs_id_fk": {
"name": "workflow_run_outputs_run_id_workflow_runs_id_fk",
"tableFrom": "workflow_run_outputs",
"tableTo": "workflow_runs",
"columnsFrom": [
"run_id"
],
"columnsTo": [
"id"
],
"onDelete": "cascade",
"onUpdate": "no action"
}
},
"compositePrimaryKeys": {},
"uniqueConstraints": {}
},
"workflow_runs": {
"name": "workflow_runs",
"schema": "comfy_deploy",
"columns": {
"id": {
"name": "id",
"type": "uuid",
"primaryKey": true,
"notNull": true,
"default": "gen_random_uuid()"
},
"workflow_version_id": {
"name": "workflow_version_id",
"type": "uuid",
"primaryKey": false,
"notNull": false
},
"workflow_id": {
"name": "workflow_id",
"type": "uuid",
"primaryKey": false,
"notNull": true
},
"machine_id": {
"name": "machine_id",
"type": "uuid",
"primaryKey": false,
"notNull": false
},
"status": {
"name": "status",
"type": "workflow_run_status",
"primaryKey": false,
"notNull": true,
"default": "'not-started'"
},
"ended_at": {
"name": "ended_at",
"type": "timestamp",
"primaryKey": false,
"notNull": false
},
"created_at": {
"name": "created_at",
"type": "timestamp",
"primaryKey": false,
"notNull": true,
"default": "now()"
}
},
"indexes": {},
"foreignKeys": {
"workflow_runs_workflow_version_id_workflow_versions_id_fk": {
"name": "workflow_runs_workflow_version_id_workflow_versions_id_fk",
"tableFrom": "workflow_runs",
"tableTo": "workflow_versions",
"columnsFrom": [
"workflow_version_id"
],
"columnsTo": [
"id"
],
"onDelete": "set null",
"onUpdate": "no action"
},
"workflow_runs_workflow_id_workflows_id_fk": {
"name": "workflow_runs_workflow_id_workflows_id_fk",
"tableFrom": "workflow_runs",
"tableTo": "workflows",
"columnsFrom": [
"workflow_id"
],
"columnsTo": [
"id"
],
"onDelete": "cascade",
"onUpdate": "no action"
},
"workflow_runs_machine_id_machines_id_fk": {
"name": "workflow_runs_machine_id_machines_id_fk",
"tableFrom": "workflow_runs",
"tableTo": "machines",
"columnsFrom": [
"machine_id"
],
"columnsTo": [
"id"
],
"onDelete": "set null",
"onUpdate": "no action"
}
},
"compositePrimaryKeys": {},
"uniqueConstraints": {}
},
"workflows": {
"name": "workflows",
"schema": "comfy_deploy",
"columns": {
"id": {
"name": "id",
"type": "uuid",
"primaryKey": true,
"notNull": true,
"default": "gen_random_uuid()"
},
"user_id": {
"name": "user_id",
"type": "text",
"primaryKey": false,
"notNull": true
},
"name": {
"name": "name",
"type": "text",
"primaryKey": false,
"notNull": true
},
"created_at": {
"name": "created_at",
"type": "timestamp",
"primaryKey": false,
"notNull": true,
"default": "now()"
},
"updated_at": {
"name": "updated_at",
"type": "timestamp",
"primaryKey": false,
"notNull": true,
"default": "now()"
}
},
"indexes": {},
"foreignKeys": {
"workflows_user_id_users_id_fk": {
"name": "workflows_user_id_users_id_fk",
"tableFrom": "workflows",
"tableTo": "users",
"columnsFrom": [
"user_id"
],
"columnsTo": [
"id"
],
"onDelete": "cascade",
"onUpdate": "no action"
}
},
"compositePrimaryKeys": {},
"uniqueConstraints": {}
},
"workflow_versions": {
"name": "workflow_versions",
"schema": "comfy_deploy",
"columns": {
"workflow_id": {
"name": "workflow_id",
"type": "uuid",
"primaryKey": false,
"notNull": true
},
"id": {
"name": "id",
"type": "uuid",
"primaryKey": true,
"notNull": true,
"default": "gen_random_uuid()"
},
"workflow": {
"name": "workflow",
"type": "jsonb",
"primaryKey": false,
"notNull": false
},
"workflow_api": {
"name": "workflow_api",
"type": "jsonb",
"primaryKey": false,
"notNull": false
},
"version": {
"name": "version",
"type": "integer",
"primaryKey": false,
"notNull": true
},
"created_at": {
"name": "created_at",
"type": "timestamp",
"primaryKey": false,
"notNull": true,
"default": "now()"
},
"updated_at": {
"name": "updated_at",
"type": "timestamp",
"primaryKey": false,
"notNull": true,
"default": "now()"
}
},
"indexes": {},
"foreignKeys": {
"workflow_versions_workflow_id_workflows_id_fk": {
"name": "workflow_versions_workflow_id_workflows_id_fk",
"tableFrom": "workflow_versions",
"tableTo": "workflows",
"columnsFrom": [
"workflow_id"
],
"columnsTo": [
"id"
],
"onDelete": "cascade",
"onUpdate": "no action"
}
},
"compositePrimaryKeys": {},
"uniqueConstraints": {}
}
},
"enums": {
"workflow_run_status": {
"name": "workflow_run_status",
"values": {
"not-started": "not-started",
"running": "running",
"success": "success",
"failed": "failed"
}
}
},
"schemas": {
"comfy_deploy": "comfy_deploy"
},
"_meta": {
"schemas": {},
"tables": {},
"columns": {}
}
}

View File

@ -29,6 +29,13 @@
"when": 1702212969930,
"tag": "0003_oval_mockingbird",
"breakpoints": true
},
{
"idx": 4,
"version": "5",
"when": 1702357291227,
"tag": "0004_zippy_freak",
"breakpoints": true
}
]
}

View File

@ -15,6 +15,8 @@
"db-dev": "bun run db-up && bun run migrate-local"
},
"dependencies": {
"@aws-sdk/client-s3": "^3.472.0",
"@aws-sdk/s3-request-presigner": "^3.472.0",
"@clerk/nextjs": "^4.27.4",
"@hookform/resolvers": "^3.3.2",
"@neondatabase/serverless": "^0.6.0",
@ -26,6 +28,7 @@
"@radix-ui/react-slot": "^1.0.2",
"@radix-ui/react-tabs": "^1.0.4",
"@tanstack/react-table": "^8.10.7",
"@types/uuid": "^9.0.7",
"class-variance-authority": "^0.7.0",
"clsx": "^2.0.0",
"dayjs": "^1.11.10",
@ -36,8 +39,10 @@
"react-dom": "^18",
"react-hook-form": "^7.48.2",
"react-use-websocket": "^4.5.0",
"sonner": "^1.2.4",
"tailwind-merge": "^2.1.0",
"tailwindcss-animate": "^1.0.7",
"uuid": "^9.0.1",
"zod": "^3.22.4",
"zustand": "^4.4.7"
},

View File

@ -1,12 +1,11 @@
import { RunDisplay } from "../../components/RunDisplay";
import { LoadingIcon } from "@/components/LoadingIcon";
import { RunsTable } from "../../components/RunsTable";
import { findFirstTableWithVersion } from "../../server/findFirstTableWithVersion";
import { MachinesWSMain } from "@/components/MachinesWS";
import {
MachineSelect,
RunWorkflowButton,
VersionSelect,
} from "@/components/VersionSelect";
import { Badge } from "@/components/ui/badge";
import {
Card,
CardContent,
@ -14,49 +13,8 @@ import {
CardHeader,
CardTitle,
} from "@/components/ui/card";
import {
Table,
TableBody,
TableCaption,
TableHead,
TableHeader,
TableRow,
} from "@/components/ui/table";
import { db } from "@/db/db";
import {
workflowRunsTable,
workflowTable,
workflowVersionTable,
} from "@/db/schema";
import { getRelativeTime } from "@/lib/getRelativeTime";
import { getMachines } from "@/server/curdMachine";
import { desc, eq } from "drizzle-orm";
export async function findFirstTableWithVersion(workflow_id: string) {
return await db.query.workflowTable.findFirst({
with: { versions: { orderBy: desc(workflowVersionTable.version) } },
where: eq(workflowTable.id, workflow_id),
});
}
export async function findAllRuns(workflow_id: string) {
return await db.query.workflowRunsTable.findMany({
where: eq(workflowRunsTable.workflow_id, workflow_id),
orderBy: desc(workflowRunsTable.created_at),
with: {
machine: {
columns: {
name: true,
},
},
version: {
columns: {
version: true,
},
},
},
});
}
export default async function Page({
params,
@ -69,7 +27,7 @@ export default async function Page({
const machines = await getMachines();
return (
<div className="mt-4 w-full flex flex-col lg:flex-row gap-4">
<div className="mt-4 w-full flex flex-col lg:flex-row gap-4 max-h-[calc(100dvh-100px)]">
<Card className="w-full lg:w-fit lg:min-w-[500px] h-fit">
<CardHeader>
<CardTitle>{workflow?.name}</CardTitle>
@ -101,46 +59,3 @@ export default async function Page({
</div>
);
}
async function RunsTable(props: { workflow_id: string }) {
const allRuns = await findAllRuns(props.workflow_id);
return (
<Table>
<TableCaption>A list of your recent runs.</TableCaption>
<TableHeader>
<TableRow>
<TableHead className="w-[100px]">Version</TableHead>
<TableHead>Machine</TableHead>
<TableHead>Time</TableHead>
<TableHead>Live Status</TableHead>
<TableHead className="text-right">Status</TableHead>
</TableRow>
</TableHeader>
<TableBody>
{allRuns.map((run) => (
<RunDisplay run={run} key={run.id} />
))}
</TableBody>
</Table>
);
}
export function StatusBadge({
run,
}: {
run: Awaited<ReturnType<typeof findAllRuns>>[0];
}) {
switch (run.status) {
case "running":
return (
<Badge variant="secondary">
{run.status} <LoadingIcon />
</Badge>
);
case "success":
return <Badge variant="success">{run.status}</Badge>;
case "failed":
return <Badge variant="destructive">{run.status}</Badge>;
}
return <Badge variant="secondary">{run.status}</Badge>;
}

View File

@ -1,5 +1,6 @@
import { parseDataSafe } from "../../../lib/parseDataSafe";
import { createRun } from "../../../server/createRun";
import { NextResponse } from "next/server";
import { z } from "zod";
const Request = z.object({
@ -8,12 +9,6 @@ const Request = z.object({
machine_id: z.string(),
});
export const ComfyAPI_Run = z.object({
prompt_id: z.string(),
number: z.number(),
node_errors: z.any(),
});
export async function POST(request: Request) {
const [data, error] = await parseDataSafe(Request, request);
if (!data || error) return error;
@ -22,5 +17,29 @@ export async function POST(request: Request) {
const { workflow_version_id, machine_id } = data;
return await createRun(origin, workflow_version_id, machine_id);
try {
const workflow_run_id = await createRun(
origin,
workflow_version_id,
machine_id
);
return NextResponse.json(
{
workflow_run_id: workflow_run_id,
},
{
status: 200,
}
);
} catch (error: any) {
return NextResponse.json(
{
error: error.message,
},
{
status: 500,
}
);
}
}

View File

@ -0,0 +1,44 @@
import { parseDataSafe } from "../../../lib/parseDataSafe";
import { handleResourceUpload } from "@/server/resource";
import { NextResponse } from "next/server";
import { z } from "zod";
const Request = z.object({
file_name: z.string(),
run_id: z.string(),
type: z.enum(["image/png", "image/jpeg"]),
});
export const dynamic = "force-dynamic";
export async function GET(request: Request) {
const [data, error] = await parseDataSafe(Request, request);
if (!data || error) return error;
const { file_name, run_id, type } = data;
try {
const uploadUrl = await handleResourceUpload({
resourceBucket: "comfyui-deploy",
resourceId: `outputs/runs/${run_id}/${file_name}`,
resourceType: type,
isPublic: true,
});
return NextResponse.json(
{
url: uploadUrl,
},
{ status: 200 }
);
} catch (error: unknown) {
const errorMessage =
error instanceof Error ? error.message : "Unknown error";
return NextResponse.json(
{
error: errorMessage,
},
{ status: 500 }
);
}
}

View File

@ -1,35 +1,47 @@
import { parseDataSafe } from "../../../lib/parseDataSafe";
import { db } from "@/db/db";
import { workflowRunsTable } from "@/db/schema";
import { workflowRunOutputs, workflowRunsTable } from "@/db/schema";
import { eq } from "drizzle-orm";
import { revalidatePath } from "next/cache";
import { NextResponse } from "next/server";
import { z } from "zod";
const Request = z.object({
run_id: z.string(),
status: z.enum(["not-started", "running", "success", "failed"]),
status: z.enum(["not-started", "running", "success", "failed"]).optional(),
output_data: z.any().optional(),
});
export async function POST(request: Request) {
const [data, error] = await parseDataSafe(Request, request);
if (!data || error) return error;
const { run_id, status } = data;
const { run_id, status, output_data } = data;
const workflow_run = await db
.update(workflowRunsTable)
.set({
status: status,
})
.where(eq(workflowRunsTable.id, run_id))
.returning();
console.log(run_id, status, output_data);
const workflow_version = await db.query.workflowVersionTable.findFirst({
where: eq(workflowRunsTable.id, workflow_run[0].workflow_version_id),
});
if (output_data) {
const workflow_run_output = await db.insert(workflowRunOutputs).values({
run_id: run_id,
data: output_data,
});
} else if (status) {
console.log("status", status);
const workflow_run = await db
.update(workflowRunsTable)
.set({
status: status,
ended_at:
status === "success" || status === "failed" ? new Date() : null,
})
.where(eq(workflowRunsTable.id, run_id))
.returning();
}
revalidatePath(`./${workflow_version?.workflow_id}`);
// const workflow_version = await db.query.workflowVersionTable.findFirst({
// where: eq(workflowRunsTable.id, workflow_run[0].workflow_version_id),
// });
// revalidatePath(`./${workflow_version?.workflow_id}`);
return NextResponse.json(
{

View File

@ -1,9 +1,9 @@
import { parseDataSafe } from "../../../lib/parseDataSafe";
import { db } from "@/db/db";
import { workflowTable, workflowVersionTable } from "@/db/schema";
import { eq, sql } from "drizzle-orm";
import { sql } from "drizzle-orm";
import { NextResponse } from "next/server";
import { ZodFormattedError, z } from "zod";
import { z } from "zod";
const corsHeaders = {
"Access-Control-Allow-Origin": "*",
@ -36,7 +36,7 @@ export async function POST(request: Request) {
const [data, error] = await parseDataSafe(
UploadRequest,
request,
corsHeaders,
corsHeaders
);
if (!data || error) return error;
@ -96,7 +96,7 @@ export async function POST(request: Request) {
status: 500,
statusText: "Invalid request",
headers: corsHeaders,
},
}
);
}
} catch (error: any) {
@ -108,7 +108,7 @@ export async function POST(request: Request) {
status: 500,
statusText: "Invalid request",
headers: corsHeaders,
},
}
);
}
@ -120,6 +120,6 @@ export async function POST(request: Request) {
{
status: 200,
headers: corsHeaders,
},
}
);
}

View File

@ -0,0 +1,9 @@
import { NextResponse, type NextRequest } from "next/server";
export async function GET(request: NextRequest) {
const file = new URL(request.url).searchParams.get("file");
console.log(file);
return NextResponse.redirect(
`${process.env.SPACES_ENDPOINT}/comfyui-deploy/${file}`
);
}

View File

@ -2,6 +2,7 @@ import "./globals.css";
import { NavbarRight } from "@/components/NavbarRight";
import type { Metadata } from "next";
import { Inter } from "next/font/google";
import { Toaster } from "sonner";
const inter = Inter({ subsets: ["latin"] });
@ -29,6 +30,7 @@ export default function RootLayout({
<div className="md:px-10 px-6 w-full flex items-start">
{children}
</div>
<Toaster richColors />
</main>
</body>
</html>

View File

@ -1,14 +1,8 @@
import { MachineList } from "@/components/MachineList";
import { WorkflowList } from "@/components/WorkflowList";
import { db } from "@/db/db";
import {
machinesTable,
usersTable,
workflowTable,
workflowVersionTable,
} from "@/db/schema";
import { machinesTable, usersTable } from "@/db/schema";
import { auth, clerkClient } from "@clerk/nextjs";
import { desc, eq, sql } from "drizzle-orm";
import { desc, eq } from "drizzle-orm";
export default function Page() {
return <MachineListServer />;

View File

@ -56,6 +56,7 @@ import {
import { ArrowUpDown, MoreHorizontal } from "lucide-react";
import * as React from "react";
import { useForm } from "react-hook-form";
import { toast } from "sonner";
import { z } from "zod";
export type Machine = {
@ -159,9 +160,8 @@ export const columns: ColumnDef<Machine>[] = [
<DropdownMenuLabel>Actions</DropdownMenuLabel>
<DropdownMenuItem
className="text-destructive"
onClick={() => {
deleteMachine(workflow.id);
// navigator.clipboard.writeText(payment.id)
onClick={async () => {
callServerWithToast(await deleteMachine(workflow.id));
}}
>
Delete Machine
@ -176,6 +176,17 @@ export const columns: ColumnDef<Machine>[] = [
},
];
async function callServerWithToast(result: {
message: string;
error?: boolean;
}) {
if (result.error) {
toast.error(result.message);
} else {
toast.success(result.message);
}
}
export function MachineList({ data }: { data: Machine[] }) {
const [sorting, setSorting] = React.useState<SortingState>([]);
const [columnFilters, setColumnFilters] = React.useState<ColumnFiltersState>(
@ -333,8 +344,8 @@ function AddMachinesDialog() {
const form = useForm<z.infer<typeof formSchema>>({
resolver: zodResolver(formSchema),
defaultValues: {
name: "",
endpoint: "",
name: "My Local Machine",
endpoint: "http://127.0.0.1:8188",
},
});

View File

@ -8,6 +8,7 @@ import { create } from "zustand";
type State = {
data: {
id: string;
timestamp: number;
json: {
event: string;
data: any;
@ -27,7 +28,7 @@ export const useStore = create<State>((set) => ({
addData: (id, json) =>
set((state) => ({
...state,
data: [...state.data, { id, json }],
data: [...state.data, { id, json, timestamp: Date.now() }],
})),
}));

View File

@ -1,27 +1,123 @@
"use client";
import type { findAllRuns } from "../app/[workflow_id]/page";
import { StatusBadge } from "../app/[workflow_id]/page";
import { useStore } from "@/components/MachinesWS";
import { TableCell, TableRow } from "@/components/ui/table";
import { StatusBadge } from "@/components/StatusBadge";
import {
Dialog,
DialogContent,
DialogDescription,
DialogHeader,
DialogTitle,
DialogTrigger,
} from "@/components/ui/dialog";
import {
Table,
TableBody,
TableCell,
TableHead,
TableHeader,
TableRow,
} from "@/components/ui/table";
import { getRelativeTime } from "@/lib/getRelativeTime";
import { type findAllRuns } from "@/server/findAllRuns";
import { getRunsOutput } from "@/server/getRunsOutput";
import { useEffect, useState } from "react";
export function RunDisplay({
run,
}: {
run: Awaited<ReturnType<typeof findAllRuns>>[0];
}) {
const data = useStore((state) => state.data.find((x) => x.id === run.id));
const data = useStore(
(state) =>
state.data
.filter((x) => x.id === run.id)
.sort((a, b) => b.timestamp - a.timestamp)?.[0]
);
let status = run.status;
if (data?.json.event == "executing" && data.json.data.node == undefined) {
status = "success";
} else if (data?.json.event == "executing") {
status = "running";
}
return (
<TableRow>
<TableCell>{run.version.version}</TableCell>
<TableCell className="font-medium">{run.machine.name}</TableCell>
<TableCell>{getRelativeTime(run.created_at)}</TableCell>
<TableCell>{data ? data.json.event : "-"}</TableCell>
<TableCell className="text-right">
<StatusBadge run={run} />
</TableCell>
</TableRow>
<Dialog>
<DialogTrigger asChild className="appearance-none hover:cursor-pointer">
<TableRow>
<TableCell>{run.version?.version}</TableCell>
<TableCell className="font-medium">{run.machine?.name}</TableCell>
<TableCell>{getRelativeTime(run.created_at)}</TableCell>
<TableCell>
{data && status != "success"
? `${data.json.event} - ${data.json.data.node}`
: "-"}
</TableCell>
<TableCell className="text-right">
<StatusBadge status={status} />
</TableCell>
</TableRow>
</DialogTrigger>
<DialogContent>
<DialogHeader>
<DialogTitle>Run outputs</DialogTitle>
<DialogDescription>
You can view your run&apos;s outputs here
</DialogDescription>
</DialogHeader>
<RunOutputs run_id={run.id} />
</DialogContent>
</Dialog>
);
}
export function RunOutputs({ run_id }: { run_id: string }) {
const [outputs, setOutputs] = useState<
Awaited<ReturnType<typeof getRunsOutput>>
>([]);
useEffect(() => {
getRunsOutput(run_id).then((x) => setOutputs(x));
}, [run_id]);
return (
<Table>
{/* <TableCaption>A list of your recent runs.</TableCaption> */}
<TableHeader className="bg-background top-0 sticky">
<TableRow>
<TableHead className="w-[100px]">File</TableHead>
<TableHead className="">Output</TableHead>
</TableRow>
</TableHeader>
<TableBody>
{outputs?.map((run) => {
const fileName = run.data.images[0].filename;
// const filePath
return (
<TableRow key={run.id}>
<TableCell>{fileName}</TableCell>
<TableCell>
<OutputRender run_id={run_id} filename={fileName} />
</TableCell>
</TableRow>
);
})}
</TableBody>
</Table>
);
}
export function OutputRender(props: { run_id: string; filename: string }) {
if (props.filename.endsWith(".png")) {
return (
<img
alt={props.filename}
src={`api/view?file=${encodeURIComponent(
`outputs/runs/${props.run_id}/${props.filename}`
)}`}
/>
);
}
}

View File

@ -0,0 +1,35 @@
import { findAllRuns } from "../server/findAllRuns";
import { RunDisplay } from "./RunDisplay";
import {
Table,
TableBody,
TableCaption,
TableHead,
TableHeader,
TableRow,
} from "@/components/ui/table";
export async function RunsTable(props: { workflow_id: string }) {
const allRuns = await findAllRuns(props.workflow_id);
return (
<div className="overflow-auto h-[400px] w-full">
<Table className="">
<TableCaption>A list of your recent runs.</TableCaption>
<TableHeader className="bg-background top-0 sticky">
<TableRow>
<TableHead className=" w-[100px]">Version</TableHead>
<TableHead className="">Machine</TableHead>
<TableHead className="">Time</TableHead>
<TableHead className="">Live Status</TableHead>
<TableHead className=" text-right">Status</TableHead>
</TableRow>
</TableHeader>
<TableBody>
{allRuns.map((run) => (
<RunDisplay run={run} key={run.id} />
))}
</TableBody>
</Table>
</div>
);
}

View File

@ -0,0 +1,23 @@
import type { findAllRuns } from "../server/findAllRuns";
import { LoadingIcon } from "@/components/LoadingIcon";
import { Badge } from "@/components/ui/badge";
export function StatusBadge({
status,
}: {
status: Awaited<ReturnType<typeof findAllRuns>>[0]["status"];
}) {
switch (status) {
case "running":
return (
<Badge variant="secondary">
{status} <LoadingIcon />
</Badge>
);
case "success":
return <Badge variant="success">{status}</Badge>;
case "failed":
return <Badge variant="destructive">{status}</Badge>;
}
return <Badge variant="secondary">{status}</Badge>;
}

View File

@ -1,6 +1,5 @@
"use client";
import type { findFirstTableWithVersion } from "@/app/[workflow_id]/page";
import { LoadingIcon } from "@/components/LoadingIcon";
import { Button } from "@/components/ui/button";
import {
@ -14,6 +13,7 @@ import {
} from "@/components/ui/select";
import { createRun } from "@/server/createRun";
import type { getMachines } from "@/server/curdMachine";
import type { findFirstTableWithVersion } from "@/server/findFirstTableWithVersion";
import { Play } from "lucide-react";
import { parseAsInteger, useQueryState } from "next-usequerystate";
import { useState } from "react";
@ -56,7 +56,7 @@ export function MachineSelect({
machines: Awaited<ReturnType<typeof getMachines>>;
}) {
const [machine, setMachine] = useQueryState("machine", {
defaultValue: machines[0].id ?? "",
defaultValue: machines?.[0].id ?? "",
});
return (
<Select
@ -111,6 +111,7 @@ export function RunWorkflowButton({
try {
const origin = window.location.origin;
await createRun(origin, workflow_version_id, machine);
// console.log(res.json());
setIsLoading(false);
} catch (error) {
setIsLoading(false);

View File

@ -1,28 +1,27 @@
import * as React from "react"
import { cn } from "@/lib/utils"
import { cn } from "@/lib/utils";
import * as React from "react";
const Table = React.forwardRef<
HTMLTableElement,
React.HTMLAttributes<HTMLTableElement>
>(({ className, ...props }, ref) => (
<div className="relative w-full overflow-auto">
<table
ref={ref}
className={cn("w-full caption-bottom text-sm", className)}
{...props}
/>
</div>
))
Table.displayName = "Table"
// <div className="relative w-full overflow-auto">
<table
ref={ref}
className={cn("w-full caption-bottom text-sm", className)}
{...props}
/>
// </div>
));
Table.displayName = "Table";
const TableHeader = React.forwardRef<
HTMLTableSectionElement,
React.HTMLAttributes<HTMLTableSectionElement>
>(({ className, ...props }, ref) => (
<thead ref={ref} className={cn("[&_tr]:border-b", className)} {...props} />
))
TableHeader.displayName = "TableHeader"
));
TableHeader.displayName = "TableHeader";
const TableBody = React.forwardRef<
HTMLTableSectionElement,
@ -33,8 +32,8 @@ const TableBody = React.forwardRef<
className={cn("[&_tr:last-child]:border-0", className)}
{...props}
/>
))
TableBody.displayName = "TableBody"
));
TableBody.displayName = "TableBody";
const TableFooter = React.forwardRef<
HTMLTableSectionElement,
@ -48,8 +47,8 @@ const TableFooter = React.forwardRef<
)}
{...props}
/>
))
TableFooter.displayName = "TableFooter"
));
TableFooter.displayName = "TableFooter";
const TableRow = React.forwardRef<
HTMLTableRowElement,
@ -63,8 +62,8 @@ const TableRow = React.forwardRef<
)}
{...props}
/>
))
TableRow.displayName = "TableRow"
));
TableRow.displayName = "TableRow";
const TableHead = React.forwardRef<
HTMLTableCellElement,
@ -78,8 +77,8 @@ const TableHead = React.forwardRef<
)}
{...props}
/>
))
TableHead.displayName = "TableHead"
));
TableHead.displayName = "TableHead";
const TableCell = React.forwardRef<
HTMLTableCellElement,
@ -90,8 +89,8 @@ const TableCell = React.forwardRef<
className={cn("p-4 align-middle [&:has([role=checkbox])]:pr-0", className)}
{...props}
/>
))
TableCell.displayName = "TableCell"
));
TableCell.displayName = "TableCell";
const TableCaption = React.forwardRef<
HTMLTableCaptionElement,
@ -102,8 +101,8 @@ const TableCaption = React.forwardRef<
className={cn("mt-4 text-sm text-muted-foreground", className)}
{...props}
/>
))
TableCaption.displayName = "TableCaption"
));
TableCaption.displayName = "TableCaption";
export {
Table,
@ -114,4 +113,4 @@ export {
TableRow,
TableCell,
TableCaption,
}
};

View File

@ -70,21 +70,22 @@ export const workflowRunStatus = pgEnum("workflow_run_status", [
// We still want to keep the workflow run record.
export const workflowRunsTable = dbSchema.table("workflow_runs", {
id: uuid("id").primaryKey().defaultRandom().notNull(),
workflow_version_id: uuid("workflow_version_id")
.notNull()
.references(() => workflowVersionTable.id, {
onDelete: "no action",
}),
// when workflow version deleted, still want to keep this record
workflow_version_id: uuid("workflow_version_id").references(
() => workflowVersionTable.id,
{
onDelete: "set null",
}
),
workflow_id: uuid("workflow_id")
.notNull()
.references(() => workflowTable.id, {
onDelete: "no action",
}),
machine_id: uuid("machine_id")
.notNull()
.references(() => machinesTable.id, {
onDelete: "no action",
onDelete: "cascade",
}),
// when machine deleted, still want to keep this record
machine_id: uuid("machine_id").references(() => machinesTable.id, {
onDelete: "set null",
}),
status: workflowRunStatus("status").notNull().default("not-started"),
ended_at: timestamp("ended_at"),
created_at: timestamp("created_at").defaultNow().notNull(),
@ -101,12 +102,26 @@ export const workflowRunRelations = relations(workflowRunsTable, ({ one }) => ({
}),
}));
// We still want to keep the workflow run record.
export const workflowRunOutputs = dbSchema.table("workflow_run_outputs", {
id: uuid("id").primaryKey().defaultRandom().notNull(),
run_id: uuid("run_id")
.notNull()
.references(() => workflowRunsTable.id, {
onDelete: "cascade",
}),
data: jsonb("data").$type<any>(),
created_at: timestamp("created_at").defaultNow().notNull(),
updated_at: timestamp("updated_at").defaultNow().notNull(),
});
// when user delete, also delete all the workflow versions
export const machinesTable = dbSchema.table("machines", {
id: uuid("id").primaryKey().defaultRandom().notNull(),
user_id: text("user_id")
.references(() => usersTable.id, {
onDelete: "no action",
onDelete: "cascade",
})
.notNull(),
name: text("name").notNull(),

View File

@ -9,8 +9,16 @@ export async function parseDataSafe<T extends ZodType<any, any, any>>(
): Promise<[z.infer<T> | undefined, NextResponse | undefined]> {
let data: z.infer<T> | undefined = undefined;
try {
data = await schema.parseAsync(await request.json());
} catch (e: any) {
if (request.method === "GET") {
// Parse data from query parameters for GET requests
const url = new URL(request.url);
const params = Object.fromEntries(url.searchParams);
data = await schema.parseAsync(params);
} else {
// Parse data from request body for other types of requests
data = await schema.parseAsync(await request.json());
}
} catch (e: unknown) {
if (e instanceof ZodError) {
const message = e.flatten().fieldErrors;
return [

View File

@ -1,11 +1,10 @@
"use server";
import { ComfyAPI_Run } from "../app/api/create-run/route";
import { db } from "@/db/db";
import { workflowRunsTable } from "@/db/schema";
import { ComfyAPI_Run } from "@/types/ComfyAPI_Run";
import { eq } from "drizzle-orm";
import { revalidatePath } from "next/cache";
import { NextResponse } from "next/server";
import "server-only";
export async function createRun(
@ -18,9 +17,10 @@ export async function createRun(
});
if (!machine) {
return new Response("Machine not found", {
status: 404,
});
throw new Error("Machine not found");
// return new Response("Machine not found", {
// status: 404,
// });
}
const workflow_version_data =
@ -38,9 +38,10 @@ export async function createRun(
// })
// : null;
if (!workflow_version_data) {
return new Response("Workflow version not found", {
status: 404,
});
throw new Error("Workflow version not found");
// return new Response("Workflow version not found", {
// status: 404,
// });
}
const comfyui_endpoint = `${machine.endpoint}/comfy-deploy/run`;
@ -54,22 +55,22 @@ export async function createRun(
body: JSON.stringify({
workflow_api: workflow_version_data.workflow_api,
status_endpoint: `${origin}/api/update-run`,
file_upload_endpoint: `${origin}/api/file-upload`,
}),
})
.then(async (res) => ComfyAPI_Run.parseAsync(await res.json()))
.catch((error) => {
console.error(error);
return new Response(error.details, {
status: 500,
});
});
}).then(async (res) => ComfyAPI_Run.parseAsync(await res.json()));
// .catch((error) => {
// console.error(error);
// return new Response(error.details, {
// status: 500,
// });
// });
console.log(result);
// console.log(result);
// return the error
if (result instanceof Response) {
return result;
}
// // return the error
// if (result instanceof Response) {
// return result;
// }
// Add to our db
const workflow_run = await db
@ -84,12 +85,14 @@ export async function createRun(
revalidatePath(`/${workflow_version_data.workflow_id}`);
return NextResponse.json(
{
workflow_run_id: workflow_run[0].id,
},
{
status: 200,
}
);
return workflow_run[0].id;
// return NextResponse.json(
// {
// workflow_run_id: workflow_run[0].id,
// },
// {
// status: 200,
// }
// );
}

View File

@ -40,7 +40,17 @@ export async function addMachine(name: string, endpoint: string) {
revalidatePath("/machines");
}
export async function deleteMachine(machine_id: string) {
await db.delete(machinesTable).where(eq(machinesTable.id, machine_id));
revalidatePath("/machines");
export async function deleteMachine(
machine_id: string
): Promise<{ message: string; error?: boolean }> {
try {
await db.delete(machinesTable).where(eq(machinesTable.id, machine_id));
revalidatePath("/machines");
return { message: "Machine Deleted" };
} catch (error: unknown) {
return {
message: `Error: ${error.detail}`,
error: true,
};
}
}

View File

@ -0,0 +1,23 @@
import { db } from "@/db/db";
import { workflowRunsTable } from "@/db/schema";
import { desc, eq } from "drizzle-orm";
export async function findAllRuns(workflow_id: string) {
return await db.query.workflowRunsTable.findMany({
where: eq(workflowRunsTable.workflow_id, workflow_id),
orderBy: desc(workflowRunsTable.created_at),
with: {
machine: {
columns: {
name: true,
endpoint: true,
},
},
version: {
columns: {
version: true,
},
},
},
});
}

View File

@ -0,0 +1,10 @@
import { db } from "@/db/db";
import { workflowTable, workflowVersionTable } from "@/db/schema";
import { desc, eq } from "drizzle-orm";
export async function findFirstTableWithVersion(workflow_id: string) {
return await db.query.workflowTable.findFirst({
with: { versions: { orderBy: desc(workflowVersionTable.version) } },
where: eq(workflowTable.id, workflow_id),
});
}

View File

@ -0,0 +1,12 @@
"use server";
import { db } from "@/db/db";
import { workflowRunOutputs } from "@/db/schema";
import { eq } from "drizzle-orm";
export async function getRunsOutput(run_id: string) {
return await db
.select()
.from(workflowRunOutputs)
.where(eq(workflowRunOutputs.run_id, run_id));
}

106
web/src/server/resource.ts Normal file
View File

@ -0,0 +1,106 @@
import type { PutObjectCommandInput } from "@aws-sdk/client-s3";
import {
DeleteObjectCommand,
GetObjectCommand,
PutObjectCommand,
S3,
} from "@aws-sdk/client-s3";
import { getSignedUrl } from "@aws-sdk/s3-request-presigner";
const s3Client = new S3({
endpoint: process.env.SPACES_ENDPOINT, //"https://nyc3.digitaloceanspaces.com",
region: process.env.SPACES_REGION, //"nyc3",
credentials: {
accessKeyId: process.env.SPACES_KEY!,
secretAccessKey: process.env.SPACES_SECRET!,
},
forcePathStyle: true,
});
function replaceCDNUrl(url: string) {
url = url.replace(
process.env.SPACES_ENDPOINT!,
process.env.SPACES_ENDPOINT_CDN!
);
return url;
}
export type ResourceObject = {
resourceBucket: string;
resourceId: string;
resourceType: "image/png" | "application/zip" | string;
isPublic?: boolean;
};
export async function handleResourceUpload(
resource: Partial<ResourceObject>
): Promise<string> {
const p: PutObjectCommandInput = {
Key: resource.resourceId,
Bucket: resource.resourceBucket,
ContentType: resource.resourceType,
};
// Only set ACL if resource is public
if (resource.isPublic) {
p.ACL = "public-read";
}
const url = await getSignedUrl(s3Client, new PutObjectCommand(p), {
expiresIn: 5 * 60,
});
return url;
}
export async function handResourceRemove(
resource: Partial<ResourceObject>
): Promise<boolean> {
console.log("Removing resources", resource);
try {
const result = await s3Client.send(
new DeleteObjectCommand({
Key: `/public-download/sdk/${resource.resourceId}`,
Bucket: resource.resourceBucket,
})
);
console.log(result);
} catch (err) {
console.log("Error", err);
return false;
}
return true;
}
export async function handleResourceDownload(
resource: Partial<ResourceObject>
): Promise<string> {
const url = await getSignedUrl(
s3Client,
new GetObjectCommand({
Key: resource.resourceId,
Bucket: resource.resourceBucket,
ResponseCacheControl: "no-cache, no-store",
}),
{ expiresIn: 5 * 60 }
);
return replaceCDNUrl(url);
}
export async function handleResourceDelete(
resource: ResourceObject
): Promise<string> {
try {
const result = await s3Client.send(
new DeleteObjectCommand({
Key: resource.resourceId,
Bucket: resource.resourceBucket,
})
);
} catch (e) {
//TODO handle error
return "error";
}
return "ok";
}

View File

@ -0,0 +1,7 @@
import { z } from "zod";
export const ComfyAPI_Run = z.object({
prompt_id: z.string(),
number: z.number(),
node_errors: z.any(),
});

View File

@ -59,12 +59,12 @@ const config: Config = {
},
keyframes: {
"accordion-down": {
from: { height: 0 },
from: { height: "0" },
to: { height: "var(--radix-accordion-content-height)" },
},
"accordion-up": {
from: { height: "var(--radix-accordion-content-height)" },
to: { height: 0 },
to: { height: "0" },
},
},
animation: {