1 Star 0 Fork 205

陈小龙/MindSpeed-MM_1

forked from Ascend/MindSpeed-MM 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
inference_sora.py 6.60 KB
一键复制 编辑 原始数据 按行查看 历史
import os
import torch
import mindspeed.megatron_adaptor
import torch.distributed as dist
from megatron.core import mpu
from megatron.training.checkpointing import load_checkpoint
from megatron.training.initialize import initialize_megatron
from megatron.training import get_args
from mindspeed_mm.configs.config import merge_mm_args, mm_extra_args_provider
from mindspeed_mm.arguments import extra_args_provider_decorator
from mindspeed_mm.tasks.inference.pipeline import sora_pipeline_dict
from mindspeed_mm.tasks.inference.pipeline.utils.sora_utils import (
save_videos,
save_video_grid,
load_prompts,
load_images,
load_conditional_pixel_values
)
from mindspeed_mm.models.predictor import PredictModel
from mindspeed_mm.models.diffusion import DiffusionModel
from mindspeed_mm.models.ae import AEModel
from mindspeed_mm.models.text_encoder import TextEncoder
from mindspeed_mm import Tokenizer
from mindspeed_mm.utils.utils import get_dtype, get_device, is_npu_available
if is_npu_available():
import torch_npu
from torch_npu.contrib import transfer_to_npu
torch.npu.config.allow_internal_format = False
def prepare_pipeline(args, device):
ori_args = get_args()
vae = AEModel(args.ae).get_model().to(device, args.ae.dtype).eval()
text_encoder = TextEncoder(args.text_encoder).get_model().to(device).eval()
predict_model = PredictModel(args.predictor).get_model()
if ori_args.load is not None:
load_checkpoint([predict_model], None, None, strict=False)
predict_model = predict_model.to(device, args.predictor.dtype).eval()
scheduler = DiffusionModel(args.diffusion).get_model()
tokenizer = Tokenizer(args.tokenizer).get_tokenizer()
if not hasattr(vae, 'dtype'):
vae.dtype = args.ae.dtype
sora_pipeline_class = sora_pipeline_dict[args.pipeline_class]
sora_pipeline = sora_pipeline_class(vae=vae, text_encoder=text_encoder, tokenizer=tokenizer, scheduler=scheduler,
predict_model=predict_model, config=args.pipeline_config)
return sora_pipeline
def main():
initialize_megatron(extra_args_provider=extra_args_provider_decorator(mm_extra_args_provider), args_defaults={})
args = get_args()
merge_mm_args(args)
if not hasattr(args, "dist_train"):
args.dist_train = False
args = args.mm.model
# prepare arguments
torch.set_grad_enabled(False)
dtype = get_dtype(args.dtype)
device = get_device(args.device)
prompts = load_prompts(args.prompt)
images = load_images(args.image) if hasattr(args, "image") else None
conditional_pixel_values_path = load_conditional_pixel_values(args.conditional_pixel_values_path) if hasattr(args, "conditional_pixel_values_path") else None
mask_type = args.mask_type if hasattr(args, "mask_type") else None
crop_for_hw = args.crop_for_hw if hasattr(args, "crop_for_hw") else None
max_hxw = args.max_hxw if hasattr(args, "max_hxw") else None
if images is not None and len(prompts) != len(images):
raise AssertionError(f'The number of images {len(images)} and the numbers of prompts {len(prompts)} do not match')
if len(prompts) % args.micro_batch_size != 0:
raise AssertionError(f'The number of prompts {len(prompts)} is not divisible by the batch size {args.micro_batch_size}')
save_fps = args.fps // args.frame_interval
# prepare pipeline
sora_pipeline = prepare_pipeline(args, device)
# == Iter over all samples ==
video_grids = []
start_idx = 0
rank = mpu.get_data_parallel_rank()
world_size = mpu.get_data_parallel_world_size()
for i in range(rank * args.micro_batch_size, len(prompts), args.micro_batch_size * world_size):
# == prepare batch prompts ==
batch_prompts = prompts[i: i + args.micro_batch_size]
kwargs = {}
if conditional_pixel_values_path:
batch_pixel_values_path = conditional_pixel_values_path[i: i + args.micro_batch_size]
kwargs.update({"conditional_pixel_values_path": batch_pixel_values_path,
"mask_type": mask_type,
"crop_for_hw": crop_for_hw,
"max_hxw": max_hxw})
if images is not None:
batch_images = images[i: i + args.micro_batch_size]
else:
batch_images = None
videos = sora_pipeline(prompt=batch_prompts,
image=batch_images,
fps=save_fps,
use_prompt_preprocess=args.use_prompt_preprocess,
device=device,
dtype=dtype,
**kwargs
)
if mpu.get_context_parallel_rank() == 0 and mpu.get_tensor_model_parallel_rank() == 0:
save_videos(videos, i, args.save_path, save_fps)
start_idx += len(batch_prompts) * world_size
video_grids.append(videos)
if len(video_grids) > 0:
video_grids = torch.cat(video_grids, dim=0).to(device)
if len(prompts) < args.micro_batch_size * world_size:
active_ranks = range(len(prompts) // args.micro_batch_size)
else:
active_ranks = range(world_size)
active_ranks = [x * mpu.get_tensor_model_parallel_world_size() * mpu.get_context_parallel_world_size() for x in active_ranks]
dist.barrier()
gathered_videos = []
rank = dist.get_rank()
if rank == 0:
for r in active_ranks:
if r != 0: # main process does not need to receive from itself
# receive tensor shape
shape_tensor = torch.empty(5, dtype=torch.int, device=device)
dist.recv(shape_tensor, src=r)
shape_videos = shape_tensor.tolist()
# create receiving buffer based on received shape
received_videos = torch.empty(shape_videos, dtype=video_grids.dtype, device=device)
dist.recv(received_videos, src=r)
gathered_videos.append(received_videos.cpu())
else:
gathered_videos.append(video_grids.cpu())
elif rank in active_ranks:
# send tensor shape first
shape_tensor = torch.tensor(video_grids.shape, dtype=torch.int, device=device)
dist.send(shape_tensor, dst=0)
# send the tensor
dist.send(video_grids, dst=0)
dist.barrier()
if rank == 0:
video_grids = torch.cat(gathered_videos, dim=0)
save_video_grid(video_grids, args.save_path, save_fps)
print("Inference finished.")
print("Saved %s samples to %s" % (video_grids.shape[0], args.save_path))
if __name__ == "__main__":
main()
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/ascend-cxl/MindSpeed-MM_1.git
git@gitee.com:ascend-cxl/MindSpeed-MM_1.git
ascend-cxl
MindSpeed-MM_1
MindSpeed-MM_1
master

搜索帮助