477199357f
never refer to lora by an alias if multiple loras have same alias or the alias is called none
457 lines
16 KiB
Python
457 lines
16 KiB
Python
import glob
|
|
import os
|
|
import re
|
|
import torch
|
|
from typing import Union
|
|
|
|
from modules import shared, devices, sd_models, errors, scripts
|
|
|
|
metadata_tags_order = {"ss_sd_model_name": 1, "ss_resolution": 2, "ss_clip_skip": 3, "ss_num_train_images": 10, "ss_tag_frequency": 20}
|
|
|
|
re_digits = re.compile(r"\d+")
|
|
re_x_proj = re.compile(r"(.*)_([qkv]_proj)$")
|
|
re_compiled = {}
|
|
|
|
suffix_conversion = {
|
|
"attentions": {},
|
|
"resnets": {
|
|
"conv1": "in_layers_2",
|
|
"conv2": "out_layers_3",
|
|
"time_emb_proj": "emb_layers_1",
|
|
"conv_shortcut": "skip_connection",
|
|
}
|
|
}
|
|
|
|
|
|
def convert_diffusers_name_to_compvis(key, is_sd2):
|
|
def match(match_list, regex_text):
|
|
regex = re_compiled.get(regex_text)
|
|
if regex is None:
|
|
regex = re.compile(regex_text)
|
|
re_compiled[regex_text] = regex
|
|
|
|
r = re.match(regex, key)
|
|
if not r:
|
|
return False
|
|
|
|
match_list.clear()
|
|
match_list.extend([int(x) if re.match(re_digits, x) else x for x in r.groups()])
|
|
return True
|
|
|
|
m = []
|
|
|
|
if match(m, r"lora_unet_down_blocks_(\d+)_(attentions|resnets)_(\d+)_(.+)"):
|
|
suffix = suffix_conversion.get(m[1], {}).get(m[3], m[3])
|
|
return f"diffusion_model_input_blocks_{1 + m[0] * 3 + m[2]}_{1 if m[1] == 'attentions' else 0}_{suffix}"
|
|
|
|
if match(m, r"lora_unet_mid_block_(attentions|resnets)_(\d+)_(.+)"):
|
|
suffix = suffix_conversion.get(m[0], {}).get(m[2], m[2])
|
|
return f"diffusion_model_middle_block_{1 if m[0] == 'attentions' else m[1] * 2}_{suffix}"
|
|
|
|
if match(m, r"lora_unet_up_blocks_(\d+)_(attentions|resnets)_(\d+)_(.+)"):
|
|
suffix = suffix_conversion.get(m[1], {}).get(m[3], m[3])
|
|
return f"diffusion_model_output_blocks_{m[0] * 3 + m[2]}_{1 if m[1] == 'attentions' else 0}_{suffix}"
|
|
|
|
if match(m, r"lora_unet_down_blocks_(\d+)_downsamplers_0_conv"):
|
|
return f"diffusion_model_input_blocks_{3 + m[0] * 3}_0_op"
|
|
|
|
if match(m, r"lora_unet_up_blocks_(\d+)_upsamplers_0_conv"):
|
|
return f"diffusion_model_output_blocks_{2 + m[0] * 3}_{2 if m[0]>0 else 1}_conv"
|
|
|
|
if match(m, r"lora_te_text_model_encoder_layers_(\d+)_(.+)"):
|
|
if is_sd2:
|
|
if 'mlp_fc1' in m[1]:
|
|
return f"model_transformer_resblocks_{m[0]}_{m[1].replace('mlp_fc1', 'mlp_c_fc')}"
|
|
elif 'mlp_fc2' in m[1]:
|
|
return f"model_transformer_resblocks_{m[0]}_{m[1].replace('mlp_fc2', 'mlp_c_proj')}"
|
|
else:
|
|
return f"model_transformer_resblocks_{m[0]}_{m[1].replace('self_attn', 'attn')}"
|
|
|
|
return f"transformer_text_model_encoder_layers_{m[0]}_{m[1]}"
|
|
|
|
return key
|
|
|
|
|
|
class LoraOnDisk:
|
|
def __init__(self, name, filename):
|
|
self.name = name
|
|
self.filename = filename
|
|
self.metadata = {}
|
|
|
|
_, ext = os.path.splitext(filename)
|
|
if ext.lower() == ".safetensors":
|
|
try:
|
|
self.metadata = sd_models.read_metadata_from_safetensors(filename)
|
|
except Exception as e:
|
|
errors.display(e, f"reading lora {filename}")
|
|
|
|
if self.metadata:
|
|
m = {}
|
|
for k, v in sorted(self.metadata.items(), key=lambda x: metadata_tags_order.get(x[0], 999)):
|
|
m[k] = v
|
|
|
|
self.metadata = m
|
|
|
|
self.ssmd_cover_images = self.metadata.pop('ssmd_cover_images', None) # those are cover images and they are too big to display in UI as text
|
|
self.alias = self.metadata.get('ss_output_name', self.name)
|
|
|
|
|
|
class LoraModule:
|
|
def __init__(self, name):
|
|
self.name = name
|
|
self.multiplier = 1.0
|
|
self.modules = {}
|
|
self.mtime = None
|
|
|
|
|
|
class LoraUpDownModule:
|
|
def __init__(self):
|
|
self.up = None
|
|
self.down = None
|
|
self.alpha = None
|
|
|
|
|
|
def assign_lora_names_to_compvis_modules(sd_model):
|
|
lora_layer_mapping = {}
|
|
|
|
for name, module in shared.sd_model.cond_stage_model.wrapped.named_modules():
|
|
lora_name = name.replace(".", "_")
|
|
lora_layer_mapping[lora_name] = module
|
|
module.lora_layer_name = lora_name
|
|
|
|
for name, module in shared.sd_model.model.named_modules():
|
|
lora_name = name.replace(".", "_")
|
|
lora_layer_mapping[lora_name] = module
|
|
module.lora_layer_name = lora_name
|
|
|
|
sd_model.lora_layer_mapping = lora_layer_mapping
|
|
|
|
|
|
def load_lora(name, filename):
|
|
lora = LoraModule(name)
|
|
lora.mtime = os.path.getmtime(filename)
|
|
|
|
sd = sd_models.read_state_dict(filename)
|
|
|
|
keys_failed_to_match = {}
|
|
is_sd2 = 'model_transformer_resblocks' in shared.sd_model.lora_layer_mapping
|
|
|
|
for key_diffusers, weight in sd.items():
|
|
key_diffusers_without_lora_parts, lora_key = key_diffusers.split(".", 1)
|
|
key = convert_diffusers_name_to_compvis(key_diffusers_without_lora_parts, is_sd2)
|
|
|
|
sd_module = shared.sd_model.lora_layer_mapping.get(key, None)
|
|
|
|
if sd_module is None:
|
|
m = re_x_proj.match(key)
|
|
if m:
|
|
sd_module = shared.sd_model.lora_layer_mapping.get(m.group(1), None)
|
|
|
|
if sd_module is None:
|
|
keys_failed_to_match[key_diffusers] = key
|
|
continue
|
|
|
|
lora_module = lora.modules.get(key, None)
|
|
if lora_module is None:
|
|
lora_module = LoraUpDownModule()
|
|
lora.modules[key] = lora_module
|
|
|
|
if lora_key == "alpha":
|
|
lora_module.alpha = weight.item()
|
|
continue
|
|
|
|
if type(sd_module) == torch.nn.Linear:
|
|
module = torch.nn.Linear(weight.shape[1], weight.shape[0], bias=False)
|
|
elif type(sd_module) == torch.nn.modules.linear.NonDynamicallyQuantizableLinear:
|
|
module = torch.nn.Linear(weight.shape[1], weight.shape[0], bias=False)
|
|
elif type(sd_module) == torch.nn.MultiheadAttention:
|
|
module = torch.nn.Linear(weight.shape[1], weight.shape[0], bias=False)
|
|
elif type(sd_module) == torch.nn.Conv2d and weight.shape[2:] == (1, 1):
|
|
module = torch.nn.Conv2d(weight.shape[1], weight.shape[0], (1, 1), bias=False)
|
|
elif type(sd_module) == torch.nn.Conv2d and weight.shape[2:] == (3, 3):
|
|
module = torch.nn.Conv2d(weight.shape[1], weight.shape[0], (3, 3), bias=False)
|
|
else:
|
|
print(f'Lora layer {key_diffusers} matched a layer with unsupported type: {type(sd_module).__name__}')
|
|
continue
|
|
assert False, f'Lora layer {key_diffusers} matched a layer with unsupported type: {type(sd_module).__name__}'
|
|
|
|
with torch.no_grad():
|
|
module.weight.copy_(weight)
|
|
|
|
module.to(device=devices.cpu, dtype=devices.dtype)
|
|
|
|
if lora_key == "lora_up.weight":
|
|
lora_module.up = module
|
|
elif lora_key == "lora_down.weight":
|
|
lora_module.down = module
|
|
else:
|
|
assert False, f'Bad Lora layer name: {key_diffusers} - must end in lora_up.weight, lora_down.weight or alpha'
|
|
|
|
if len(keys_failed_to_match) > 0:
|
|
print(f"Failed to match keys when loading Lora {filename}: {keys_failed_to_match}")
|
|
|
|
return lora
|
|
|
|
|
|
def load_loras(names, multipliers=None):
|
|
already_loaded = {}
|
|
|
|
for lora in loaded_loras:
|
|
if lora.name in names:
|
|
already_loaded[lora.name] = lora
|
|
|
|
loaded_loras.clear()
|
|
|
|
loras_on_disk = [available_lora_aliases.get(name, None) for name in names]
|
|
if any([x is None for x in loras_on_disk]):
|
|
list_available_loras()
|
|
|
|
loras_on_disk = [available_lora_aliases.get(name, None) for name in names]
|
|
|
|
for i, name in enumerate(names):
|
|
lora = already_loaded.get(name, None)
|
|
|
|
lora_on_disk = loras_on_disk[i]
|
|
if lora_on_disk is not None:
|
|
if lora is None or os.path.getmtime(lora_on_disk.filename) > lora.mtime:
|
|
try:
|
|
lora = load_lora(name, lora_on_disk.filename)
|
|
except Exception as e:
|
|
errors.display(e, f"loading Lora {lora_on_disk.filename}")
|
|
continue
|
|
|
|
if lora is None:
|
|
print(f"Couldn't find Lora with name {name}")
|
|
continue
|
|
|
|
lora.multiplier = multipliers[i] if multipliers else 1.0
|
|
loaded_loras.append(lora)
|
|
|
|
|
|
def lora_calc_updown(lora, module, target):
|
|
with torch.no_grad():
|
|
up = module.up.weight.to(target.device, dtype=target.dtype)
|
|
down = module.down.weight.to(target.device, dtype=target.dtype)
|
|
|
|
if up.shape[2:] == (1, 1) and down.shape[2:] == (1, 1):
|
|
updown = (up.squeeze(2).squeeze(2) @ down.squeeze(2).squeeze(2)).unsqueeze(2).unsqueeze(3)
|
|
elif up.shape[2:] == (3, 3) or down.shape[2:] == (3, 3):
|
|
updown = torch.nn.functional.conv2d(down.permute(1, 0, 2, 3), up).permute(1, 0, 2, 3)
|
|
else:
|
|
updown = up @ down
|
|
|
|
updown = updown * lora.multiplier * (module.alpha / module.up.weight.shape[1] if module.alpha else 1.0)
|
|
|
|
return updown
|
|
|
|
|
|
def lora_restore_weights_from_backup(self: Union[torch.nn.Conv2d, torch.nn.Linear, torch.nn.MultiheadAttention]):
|
|
weights_backup = getattr(self, "lora_weights_backup", None)
|
|
|
|
if weights_backup is None:
|
|
return
|
|
|
|
if isinstance(self, torch.nn.MultiheadAttention):
|
|
self.in_proj_weight.copy_(weights_backup[0])
|
|
self.out_proj.weight.copy_(weights_backup[1])
|
|
else:
|
|
self.weight.copy_(weights_backup)
|
|
|
|
|
|
def lora_apply_weights(self: Union[torch.nn.Conv2d, torch.nn.Linear, torch.nn.MultiheadAttention]):
|
|
"""
|
|
Applies the currently selected set of Loras to the weights of torch layer self.
|
|
If weights already have this particular set of loras applied, does nothing.
|
|
If not, restores orginal weights from backup and alters weights according to loras.
|
|
"""
|
|
|
|
lora_layer_name = getattr(self, 'lora_layer_name', None)
|
|
if lora_layer_name is None:
|
|
return
|
|
|
|
current_names = getattr(self, "lora_current_names", ())
|
|
wanted_names = tuple((x.name, x.multiplier) for x in loaded_loras)
|
|
|
|
weights_backup = getattr(self, "lora_weights_backup", None)
|
|
if weights_backup is None:
|
|
if isinstance(self, torch.nn.MultiheadAttention):
|
|
weights_backup = (self.in_proj_weight.to(devices.cpu, copy=True), self.out_proj.weight.to(devices.cpu, copy=True))
|
|
else:
|
|
weights_backup = self.weight.to(devices.cpu, copy=True)
|
|
|
|
self.lora_weights_backup = weights_backup
|
|
|
|
if current_names != wanted_names:
|
|
lora_restore_weights_from_backup(self)
|
|
|
|
for lora in loaded_loras:
|
|
module = lora.modules.get(lora_layer_name, None)
|
|
if module is not None and hasattr(self, 'weight'):
|
|
self.weight += lora_calc_updown(lora, module, self.weight)
|
|
continue
|
|
|
|
module_q = lora.modules.get(lora_layer_name + "_q_proj", None)
|
|
module_k = lora.modules.get(lora_layer_name + "_k_proj", None)
|
|
module_v = lora.modules.get(lora_layer_name + "_v_proj", None)
|
|
module_out = lora.modules.get(lora_layer_name + "_out_proj", None)
|
|
|
|
if isinstance(self, torch.nn.MultiheadAttention) and module_q and module_k and module_v and module_out:
|
|
updown_q = lora_calc_updown(lora, module_q, self.in_proj_weight)
|
|
updown_k = lora_calc_updown(lora, module_k, self.in_proj_weight)
|
|
updown_v = lora_calc_updown(lora, module_v, self.in_proj_weight)
|
|
updown_qkv = torch.vstack([updown_q, updown_k, updown_v])
|
|
|
|
self.in_proj_weight += updown_qkv
|
|
self.out_proj.weight += lora_calc_updown(lora, module_out, self.out_proj.weight)
|
|
continue
|
|
|
|
if module is None:
|
|
continue
|
|
|
|
print(f'failed to calculate lora weights for layer {lora_layer_name}')
|
|
|
|
setattr(self, "lora_current_names", wanted_names)
|
|
|
|
|
|
def lora_forward(module, input, original_forward):
|
|
"""
|
|
Old way of applying Lora by executing operations during layer's forward.
|
|
Stacking many loras this way results in big performance degradation.
|
|
"""
|
|
|
|
if len(loaded_loras) == 0:
|
|
return original_forward(module, input)
|
|
|
|
input = devices.cond_cast_unet(input)
|
|
|
|
lora_restore_weights_from_backup(module)
|
|
lora_reset_cached_weight(module)
|
|
|
|
res = original_forward(module, input)
|
|
|
|
lora_layer_name = getattr(module, 'lora_layer_name', None)
|
|
for lora in loaded_loras:
|
|
module = lora.modules.get(lora_layer_name, None)
|
|
if module is None:
|
|
continue
|
|
|
|
module.up.to(device=devices.device)
|
|
module.down.to(device=devices.device)
|
|
|
|
res = res + module.up(module.down(input)) * lora.multiplier * (module.alpha / module.up.weight.shape[1] if module.alpha else 1.0)
|
|
|
|
return res
|
|
|
|
|
|
def lora_reset_cached_weight(self: Union[torch.nn.Conv2d, torch.nn.Linear]):
|
|
setattr(self, "lora_current_names", ())
|
|
setattr(self, "lora_weights_backup", None)
|
|
|
|
|
|
def lora_Linear_forward(self, input):
|
|
if shared.opts.lora_functional:
|
|
return lora_forward(self, input, torch.nn.Linear_forward_before_lora)
|
|
|
|
lora_apply_weights(self)
|
|
|
|
return torch.nn.Linear_forward_before_lora(self, input)
|
|
|
|
|
|
def lora_Linear_load_state_dict(self, *args, **kwargs):
|
|
lora_reset_cached_weight(self)
|
|
|
|
return torch.nn.Linear_load_state_dict_before_lora(self, *args, **kwargs)
|
|
|
|
|
|
def lora_Conv2d_forward(self, input):
|
|
if shared.opts.lora_functional:
|
|
return lora_forward(self, input, torch.nn.Conv2d_forward_before_lora)
|
|
|
|
lora_apply_weights(self)
|
|
|
|
return torch.nn.Conv2d_forward_before_lora(self, input)
|
|
|
|
|
|
def lora_Conv2d_load_state_dict(self, *args, **kwargs):
|
|
lora_reset_cached_weight(self)
|
|
|
|
return torch.nn.Conv2d_load_state_dict_before_lora(self, *args, **kwargs)
|
|
|
|
|
|
def lora_MultiheadAttention_forward(self, *args, **kwargs):
|
|
lora_apply_weights(self)
|
|
|
|
return torch.nn.MultiheadAttention_forward_before_lora(self, *args, **kwargs)
|
|
|
|
|
|
def lora_MultiheadAttention_load_state_dict(self, *args, **kwargs):
|
|
lora_reset_cached_weight(self)
|
|
|
|
return torch.nn.MultiheadAttention_load_state_dict_before_lora(self, *args, **kwargs)
|
|
|
|
|
|
def list_available_loras():
|
|
available_loras.clear()
|
|
available_lora_aliases.clear()
|
|
forbidden_lora_aliases.clear()
|
|
forbidden_lora_aliases.update({"none": 1})
|
|
|
|
os.makedirs(shared.cmd_opts.lora_dir, exist_ok=True)
|
|
|
|
candidates = list(shared.walk_files(shared.cmd_opts.lora_dir, allowed_extensions=[".pt", ".ckpt", ".safetensors"]))
|
|
for filename in sorted(candidates, key=str.lower):
|
|
if os.path.isdir(filename):
|
|
continue
|
|
|
|
name = os.path.splitext(os.path.basename(filename))[0]
|
|
entry = LoraOnDisk(name, filename)
|
|
|
|
available_loras[name] = entry
|
|
|
|
if entry.alias in available_lora_aliases:
|
|
forbidden_lora_aliases[entry.alias.lower()] = 1
|
|
|
|
available_lora_aliases[name] = entry
|
|
available_lora_aliases[entry.alias] = entry
|
|
|
|
|
|
re_lora_name = re.compile(r"(.*)\s*\([0-9a-fA-F]+\)")
|
|
|
|
|
|
def infotext_pasted(infotext, params):
|
|
if "AddNet Module 1" in [x[1] for x in scripts.scripts_txt2img.infotext_fields]:
|
|
return # if the other extension is active, it will handle those fields, no need to do anything
|
|
|
|
added = []
|
|
|
|
for k, v in params.items():
|
|
if not k.startswith("AddNet Model "):
|
|
continue
|
|
|
|
num = k[13:]
|
|
|
|
if params.get("AddNet Module " + num) != "LoRA":
|
|
continue
|
|
|
|
name = params.get("AddNet Model " + num)
|
|
if name is None:
|
|
continue
|
|
|
|
m = re_lora_name.match(name)
|
|
if m:
|
|
name = m.group(1)
|
|
|
|
multiplier = params.get("AddNet Weight A " + num, "1.0")
|
|
|
|
added.append(f"<lora:{name}:{multiplier}>")
|
|
|
|
if added:
|
|
params["Prompt"] += "\n" + "".join(added)
|
|
|
|
available_loras = {}
|
|
available_lora_aliases = {}
|
|
forbidden_lora_aliases = {}
|
|
loaded_loras = []
|
|
|
|
list_available_loras()
|