Merge pull request #21 from hodanov/feature/refactoring

Separate setup.py
This commit is contained in:
hodanov 2023-07-08 20:39:28 +09:00 committed by GitHub
commit 24733b3bf7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 300 additions and 296 deletions

View File

@ -1,5 +1,5 @@
deploy:
modal deploy ./setup_files/setup.py
cd ./setup_files && modal deploy main.py
# `--upscaler` is a name of upscaler you want to use.
# You can use upscalers the below:

View File

@ -53,6 +53,7 @@ Images are generated and output to the `outputs/` directory.
├── Dockerfile # To build a base image.
├── config.yml # To set a model, vae and some tools.
├── requirements.txt
├── main.py # A main script to run inference.
└── setup.py # Build an application to deploy on Modal.
```

289
setup_files/main.py Normal file
View File

@ -0,0 +1,289 @@
from __future__ import annotations
import io
import os
import diffusers
import PIL.Image
import torch
from modal import Secret, method
from modal.cls import ClsMixin
from setup import (BASE_CACHE_PATH, BASE_CACHE_PATH_CONTROLNET,
BASE_CACHE_PATH_LORA, BASE_CACHE_PATH_TEXTUAL_INVERSION,
stub)
@stub.cls(
gpu="A10G",
secrets=[Secret.from_dotenv(__file__)],
)
class StableDiffusion(ClsMixin):
"""
A class that wraps the Stable Diffusion pipeline and scheduler.
"""
def __enter__(self):
import yaml
config = {}
with open("/config.yml", "r") as file:
config = yaml.safe_load(file)
self.cache_path = os.path.join(BASE_CACHE_PATH, config["model"]["name"])
if os.path.exists(self.cache_path):
print(f"The directory '{self.cache_path}' exists.")
else:
print(f"The directory '{self.cache_path}' does not exist.")
torch.cuda.memory._set_allocator_settings("max_split_size_mb:256")
self.pipe = diffusers.StableDiffusionPipeline.from_pretrained(
self.cache_path,
custom_pipeline="lpw_stable_diffusion",
torch_dtype=torch.float16,
)
# TODO: Add support for other schedulers.
self.pipe.scheduler = diffusers.EulerAncestralDiscreteScheduler.from_pretrained(
# self.pipe.scheduler = diffusers.DPMSolverMultistepScheduler.from_pretrained(
self.cache_path,
subfolder="scheduler",
)
vae = config.get("vae")
if vae is not None:
self.pipe.vae = diffusers.AutoencoderKL.from_pretrained(
self.cache_path,
subfolder="vae",
)
self.pipe.to("cuda")
loras = config.get("loras")
if loras is not None:
for lora in loras:
path = os.path.join(BASE_CACHE_PATH_LORA, lora["name"])
if os.path.exists(path):
print(f"The directory '{path}' exists.")
else:
print(f"The directory '{path}' does not exist. Need to execute 'modal deploy' first.")
self.pipe.load_lora_weights(".", weight_name=path)
textual_inversions = config.get("textual_inversions")
if textual_inversions is not None:
for textual_inversion in textual_inversions:
path = os.path.join(BASE_CACHE_PATH_TEXTUAL_INVERSION, textual_inversion["name"])
if os.path.exists(path):
print(f"The directory '{path}' exists.")
else:
print(f"The directory '{path}' does not exist. Need to execute 'modal deploy' first.")
self.pipe.load_textual_inversion(path)
self.pipe.enable_xformers_memory_efficient_attention()
# TODO: Repair the controlnet loading.
controlnets = config.get("controlnets")
if controlnets is not None:
for controlnet in controlnets:
path = os.path.join(BASE_CACHE_PATH_CONTROLNET, controlnet["name"])
controlnet = diffusers.ControlNetModel.from_pretrained(path, torch_dtype=torch.float16)
self.controlnet_pipe = diffusers.StableDiffusionControlNetPipeline.from_pretrained(
self.cache_path,
controlnet=controlnet,
custom_pipeline="lpw_stable_diffusion",
scheduler=self.pipe.scheduler,
vae=self.pipe.vae,
torch_dtype=torch.float16,
)
self.controlnet_pipe.to("cuda")
self.controlnet_pipe.enable_xformers_memory_efficient_attention()
@method()
def count_token(self, p: str, n: str) -> int:
"""
Count the number of tokens in the prompt and negative prompt.
"""
from transformers import CLIPTokenizer
tokenizer = CLIPTokenizer.from_pretrained(
self.cache_path,
subfolder="tokenizer",
)
token_size_p = len(tokenizer.tokenize(p))
token_size_n = len(tokenizer.tokenize(n))
token_size = token_size_p
if token_size_p <= token_size_n:
token_size = token_size_n
max_embeddings_multiples = 1
max_length = tokenizer.model_max_length - 2
if token_size > max_length:
max_embeddings_multiples = token_size // max_length + 1
print(f"token_size: {token_size}, max_embeddings_multiples: {max_embeddings_multiples}")
return max_embeddings_multiples
@method()
def run_inference(
self,
prompt: str,
n_prompt: str,
height: int = 512,
width: int = 512,
samples: int = 1,
batch_size: int = 1,
steps: int = 30,
seed: int = 1,
upscaler: str = "",
use_face_enhancer: bool = False,
fix_by_controlnet_tile: bool = False,
) -> list[bytes]:
"""
Runs the Stable Diffusion pipeline on the given prompt and outputs images.
"""
max_embeddings_multiples = self.count_token(p=prompt, n=n_prompt)
generator = torch.Generator("cuda").manual_seed(seed)
with torch.inference_mode():
with torch.autocast("cuda"):
generated_images = self.pipe.text2img(
prompt * batch_size,
negative_prompt=n_prompt * batch_size,
height=height,
width=width,
num_inference_steps=steps,
guidance_scale=7.5,
max_embeddings_multiples=max_embeddings_multiples,
generator=generator,
).images
base_images = generated_images
"""
Fix the generated images by the control_v11f1e_sd15_tile when `fix_by_controlnet_tile` is `True`.
https://huggingface.co/lllyasviel/control_v11f1e_sd15_tile
"""
if fix_by_controlnet_tile:
for image in base_images:
image = self.resize_image(image=image, scale_factor=2)
with torch.inference_mode():
with torch.autocast("cuda"):
fixed_by_controlnet = self.controlnet_pipe(
prompt=prompt * batch_size,
negative_prompt=n_prompt * batch_size,
num_inference_steps=steps,
strength=0.3,
guidance_scale=7.5,
max_embeddings_multiples=max_embeddings_multiples,
generator=generator,
image=image,
).images
generated_images.extend(fixed_by_controlnet)
base_images = fixed_by_controlnet
if upscaler != "":
upscaled = self.upscale(
base_images=base_images,
half_precision=False,
tile=700,
upscaler=upscaler,
use_face_enhancer=use_face_enhancer,
)
generated_images.extend(upscaled)
image_output = []
for image in generated_images:
with io.BytesIO() as buf:
image.save(buf, format="PNG")
image_output.append(buf.getvalue())
return image_output
@method()
def resize_image(self, image: PIL.Image.Image, scale_factor: int) -> PIL.Image.Image:
image = image.convert("RGB")
width, height = image.size
img = image.resize((width * scale_factor, height * scale_factor), resample=PIL.Image.LANCZOS)
return img
@method()
def upscale(
self,
base_images: list[PIL.Image],
half_precision: bool = False,
tile: int = 0,
tile_pad: int = 10,
pre_pad: int = 0,
upscaler: str = "",
use_face_enhancer: bool = False,
) -> list[PIL.Image]:
"""
Upscale the generated images by the upscaler when `upscaler` is selected.
The upscaler can be selected from the following list:
- `RealESRGAN_x4plus`
- `RealESRNet_x4plus`
- `RealESRGAN_x4plus_anime_6B`
- `RealESRGAN_x2plus`
https://github.com/xinntao/Real-ESRGAN
"""
import numpy
from basicsr.archs.rrdbnet_arch import RRDBNet
from gfpgan import GFPGANer
from realesrgan import RealESRGANer
from tqdm import tqdm
model_name = upscaler
if model_name == "RealESRGAN_x4plus":
upscale_model = RRDBNet(num_in_ch=3, num_out_ch=3, num_feat=64, num_block=23, num_grow_ch=32, scale=4)
netscale = 4
elif model_name == "RealESRNet_x4plus":
upscale_model = RRDBNet(num_in_ch=3, num_out_ch=3, num_feat=64, num_block=23, num_grow_ch=32, scale=4)
netscale = 4
elif model_name == "RealESRGAN_x4plus_anime_6B":
upscale_model = RRDBNet(num_in_ch=3, num_out_ch=3, num_feat=64, num_block=6, num_grow_ch=32, scale=4)
netscale = 4
elif model_name == "RealESRGAN_x2plus":
upscale_model = RRDBNet(num_in_ch=3, num_out_ch=3, num_feat=64, num_block=23, num_grow_ch=32, scale=2)
netscale = 2
else:
raise NotImplementedError("Model name not supported")
upsampler = RealESRGANer(
scale=netscale,
model_path=os.path.join(BASE_CACHE_PATH, "esrgan", f"{model_name}.pth"),
dni_weight=None,
model=upscale_model,
tile=tile,
tile_pad=tile_pad,
pre_pad=pre_pad,
half=half_precision,
gpu_id=None,
)
if use_face_enhancer:
face_enhancer = GFPGANer(
model_path=os.path.join(BASE_CACHE_PATH, "esrgan", "GFPGANv1.3.pth"),
upscale=netscale,
arch="clean",
channel_multiplier=2,
bg_upsampler=upsampler,
)
upscaled_imgs = []
with tqdm(total=len(base_images)) as progress_bar:
for img in base_images:
img = numpy.array(img)
if use_face_enhancer:
_, _, enhance_result = face_enhancer.enhance(
img,
has_aligned=False,
only_center_face=False,
paste_back=True,
)
else:
enhance_result, _ = upsampler.enhance(img)
upscaled_imgs.append(PIL.Image.fromarray(enhance_result))
progress_bar.update(1)
return upscaled_imgs

View File

@ -1,13 +1,9 @@
from __future__ import annotations
import io
import os
from urllib.request import Request, urlopen
import diffusers
import yaml
from modal import Image, Mount, Secret, Stub, method
from modal.cls import ClsMixin
from modal import Image, Mount, Secret, Stub
BASE_CACHE_PATH = "/vol/cache"
BASE_CACHE_PATH_LORA = "/vol/cache/lora"
@ -19,6 +15,8 @@ def download_file(url, file_name, file_path):
"""
Download files.
"""
from urllib.request import Request, urlopen
req = Request(url, headers={"User-Agent": "Mozilla/5.0"})
downloaded = urlopen(req).read()
dir_names = os.path.join(file_path, file_name)
@ -70,6 +68,8 @@ def build_image():
"""
Build the Docker image.
"""
import yaml
token = os.environ["HUGGING_FACE_TOKEN"]
config = {}
with open("/config.yml", "r") as file:
@ -109,302 +109,16 @@ def build_image():
stub = Stub("stable-diffusion-cli")
base_stub = Image.from_dockerfile(
path="./setup_files/Dockerfile",
context_mount=Mount.from_local_file("./setup_files/requirements.txt"),
path="Dockerfile",
context_mount=Mount.from_local_file("requirements.txt"),
)
stub.image = base_stub.extend(
dockerfile_commands=[
"FROM base",
"COPY ./config.yml /",
"COPY config.yml /",
],
context_mount=Mount.from_local_file("./setup_files/config.yml"),
context_mount=Mount.from_local_file("config.yml"),
).run_function(
build_image,
secrets=[Secret.from_dotenv(__file__)],
)
@stub.cls(
gpu="A10G",
secrets=[Secret.from_dotenv(__file__)],
)
class StableDiffusion(ClsMixin):
"""
A class that wraps the Stable Diffusion pipeline and scheduler.
"""
def __enter__(self):
import torch
config = {}
with open("/config.yml", "r") as file:
config = yaml.safe_load(file)
self.cache_path = os.path.join(BASE_CACHE_PATH, config["model"]["name"])
if os.path.exists(self.cache_path):
print(f"The directory '{self.cache_path}' exists.")
else:
print(f"The directory '{self.cache_path}' does not exist.")
torch.cuda.memory._set_allocator_settings("max_split_size_mb:256")
self.pipe = diffusers.StableDiffusionPipeline.from_pretrained(
self.cache_path,
custom_pipeline="lpw_stable_diffusion",
torch_dtype=torch.float16,
)
# TODO: Add support for other schedulers.
self.pipe.scheduler = diffusers.EulerAncestralDiscreteScheduler.from_pretrained(
# self.pipe.scheduler = diffusers.DPMSolverMultistepScheduler.from_pretrained(
self.cache_path,
subfolder="scheduler",
)
vae = config.get("vae")
if vae is not None:
self.pipe.vae = diffusers.AutoencoderKL.from_pretrained(
self.cache_path,
subfolder="vae",
)
self.pipe.to("cuda")
loras = config.get("loras")
if loras is not None:
for lora in loras:
path = os.path.join(BASE_CACHE_PATH_LORA, lora["name"])
if os.path.exists(path):
print(f"The directory '{path}' exists.")
else:
print(f"The directory '{path}' does not exist. Download it...")
download_file(lora["download_url"], lora["name"], BASE_CACHE_PATH_LORA)
self.pipe.load_lora_weights(".", weight_name=path)
textual_inversions = config.get("textual_inversions")
if textual_inversions is not None:
for textual_inversion in textual_inversions:
path = os.path.join(BASE_CACHE_PATH_TEXTUAL_INVERSION, textual_inversion["name"])
if os.path.exists(path):
print(f"The directory '{path}' exists.")
else:
print(f"The directory '{path}' does not exist. Download it...")
download_file(
textual_inversion["download_url"],
textual_inversion["name"],
BASE_CACHE_PATH_TEXTUAL_INVERSION,
)
self.pipe.load_textual_inversion(path)
self.pipe.enable_xformers_memory_efficient_attention()
# TODO: Repair the controlnet loading.
controlnets = config.get("controlnets")
if controlnets is not None:
for controlnet in controlnets:
path = os.path.join(BASE_CACHE_PATH_CONTROLNET, controlnet["name"])
controlnet = diffusers.ControlNetModel.from_pretrained(path, torch_dtype=torch.float16)
self.controlnet_pipe = diffusers.StableDiffusionControlNetPipeline.from_pretrained(
self.cache_path,
controlnet=controlnet,
custom_pipeline="lpw_stable_diffusion",
scheduler=self.pipe.scheduler,
vae=self.pipe.vae,
torch_dtype=torch.float16,
)
self.controlnet_pipe.to("cuda")
self.controlnet_pipe.enable_xformers_memory_efficient_attention()
@method()
def count_token(self, p: str, n: str) -> int:
"""
Count the number of tokens in the prompt and negative prompt.
"""
from transformers import CLIPTokenizer
tokenizer = CLIPTokenizer.from_pretrained(
self.cache_path,
subfolder="tokenizer",
)
token_size_p = len(tokenizer.tokenize(p))
token_size_n = len(tokenizer.tokenize(n))
token_size = token_size_p
if token_size_p <= token_size_n:
token_size = token_size_n
max_embeddings_multiples = 1
max_length = tokenizer.model_max_length - 2
if token_size > max_length:
max_embeddings_multiples = token_size // max_length + 1
print(f"token_size: {token_size}, max_embeddings_multiples: {max_embeddings_multiples}")
return max_embeddings_multiples
@method()
def run_inference(
self,
prompt: str,
n_prompt: str,
height: int = 512,
width: int = 512,
samples: int = 1,
batch_size: int = 1,
steps: int = 30,
seed: int = 1,
upscaler: str = "",
use_face_enhancer: bool = False,
fix_by_controlnet_tile: bool = False,
) -> list[bytes]:
"""
Runs the Stable Diffusion pipeline on the given prompt and outputs images.
"""
import torch
max_embeddings_multiples = self.count_token(p=prompt, n=n_prompt)
generator = torch.Generator("cuda").manual_seed(seed)
with torch.inference_mode():
with torch.autocast("cuda"):
generated_images = self.pipe.text2img(
prompt * batch_size,
negative_prompt=n_prompt * batch_size,
height=height,
width=width,
num_inference_steps=steps,
guidance_scale=7.5,
max_embeddings_multiples=max_embeddings_multiples,
generator=generator,
).images
base_images = generated_images
"""
Fix the generated images by the control_v11f1e_sd15_tile when `fix_by_controlnet_tile` is `True`.
https://huggingface.co/lllyasviel/control_v11f1e_sd15_tile
"""
if fix_by_controlnet_tile:
for image in base_images:
image = self.resize_image(image=image, scale_factor=2)
with torch.inference_mode():
with torch.autocast("cuda"):
fixed_by_controlnet = self.controlnet_pipe(
prompt=prompt * batch_size,
negative_prompt=n_prompt * batch_size,
num_inference_steps=steps,
strength=0.3,
guidance_scale=7.5,
max_embeddings_multiples=max_embeddings_multiples,
generator=generator,
image=image,
).images
generated_images.extend(fixed_by_controlnet)
base_images = fixed_by_controlnet
if upscaler != "":
upscaled = self.upscale(
base_images=base_images,
half_precision=False,
tile=700,
upscaler=upscaler,
use_face_enhancer=use_face_enhancer,
)
generated_images.extend(upscaled)
image_output = []
for image in generated_images:
with io.BytesIO() as buf:
image.save(buf, format="PNG")
image_output.append(buf.getvalue())
return image_output
@method()
def resize_image(self, image: Image.Image, scale_factor: int) -> Image.Image:
from PIL import Image
image = image.convert("RGB")
width, height = image.size
img = image.resize((width * scale_factor, height * scale_factor), resample=Image.LANCZOS)
return img
@method()
def upscale(
self,
base_images: list[Image.Image],
half_precision: bool = False,
tile: int = 0,
tile_pad: int = 10,
pre_pad: int = 0,
upscaler: str = "",
use_face_enhancer: bool = False,
) -> list[Image.Image]:
"""
Upscale the generated images by the upscaler when `upscaler` is selected.
The upscaler can be selected from the following list:
- `RealESRGAN_x4plus`
- `RealESRNet_x4plus`
- `RealESRGAN_x4plus_anime_6B`
- `RealESRGAN_x2plus`
https://github.com/xinntao/Real-ESRGAN
"""
import numpy
from basicsr.archs.rrdbnet_arch import RRDBNet
from PIL import Image
from realesrgan import RealESRGANer
from tqdm import tqdm
model_name = upscaler
if model_name == "RealESRGAN_x4plus":
upscale_model = RRDBNet(num_in_ch=3, num_out_ch=3, num_feat=64, num_block=23, num_grow_ch=32, scale=4)
netscale = 4
elif model_name == "RealESRNet_x4plus":
upscale_model = RRDBNet(num_in_ch=3, num_out_ch=3, num_feat=64, num_block=23, num_grow_ch=32, scale=4)
netscale = 4
elif model_name == "RealESRGAN_x4plus_anime_6B":
upscale_model = RRDBNet(num_in_ch=3, num_out_ch=3, num_feat=64, num_block=6, num_grow_ch=32, scale=4)
netscale = 4
elif model_name == "RealESRGAN_x2plus":
upscale_model = RRDBNet(num_in_ch=3, num_out_ch=3, num_feat=64, num_block=23, num_grow_ch=32, scale=2)
netscale = 2
else:
raise NotImplementedError("Model name not supported")
upsampler = RealESRGANer(
scale=netscale,
model_path=os.path.join(BASE_CACHE_PATH, "esrgan", f"{model_name}.pth"),
dni_weight=None,
model=upscale_model,
tile=tile,
tile_pad=tile_pad,
pre_pad=pre_pad,
half=half_precision,
gpu_id=None,
)
from gfpgan import GFPGANer
if use_face_enhancer:
face_enhancer = GFPGANer(
model_path=os.path.join(BASE_CACHE_PATH, "esrgan", "GFPGANv1.3.pth"),
upscale=netscale,
arch="clean",
channel_multiplier=2,
bg_upsampler=upsampler,
)
upscaled_imgs = []
with tqdm(total=len(base_images)) as progress_bar:
for img in base_images:
img = numpy.array(img)
if use_face_enhancer:
_, _, enhance_result = face_enhancer.enhance(
img,
has_aligned=False,
only_center_face=False,
paste_back=True,
)
else:
enhance_result, _ = upsampler.enhance(img)
upscaled_imgs.append(Image.fromarray(enhance_result))
progress_bar.update(1)
return upscaled_imgs