- 원본 링크 : https://keras.io/guides/distributed_training_with_torch/
- 최종 수정일 : 2024-09-18
PyTorch를 사용한 멀티 GPU 분산 트레이닝 (Multi-GPU distributed training with PyTorch)
목차
저자: fchollet
생성일: 2023/06/29
최종편집일: 2023/06/29
설명: PyTorch로 Keras 모델을 사용하여, 멀티 GPU 트레이닝을 진행하는 가이드입니다.
소개
일반적으로 여러 디바이스에 계산을 분산시키는 방법에는 두 가지가 있습니다:
- 데이터 병렬 처리
- 데이터 병렬 처리에서는 하나의 모델이 여러 장치나 여러 머신에 복제됩니다.
- 각 장치는 서로 다른 배치의 데이터를 처리한 후, 결과를 병합합니다.
- 이 설정에는 다양한 변형이 있으며, 서로 다른 모델 복제본이 결과를 병합하는 방식이나, 각 배치마다 동기화되는지 여부 등에 차이가 있습니다.
- 모델 병렬 처리
- 모델 병렬 처리에서는 하나의 모델의 다른 부분이 서로 다른 장치에서 실행되어, 하나의 데이터 배치를 함께 처리합니다.
- 이는 여러 가지 브랜치를 특징으로 하는 자연스럽게 병렬화된 아키텍처를 가진 모델에 가장 적합합니다.
이 가이드는 데이터 병렬 처리, 특히 동기식 데이터 병렬 처리에 중점을 둡니다. 여기서 모델의 서로 다른 복제본은 각 배치를 처리한 후 동기화됩니다. 동기화는 모델의 수렴 동작을 단일 장치에서의 트레이닝과 동일하게 유지시킵니다.
이 가이드는 PyTorch의 DistributedDataParallel
모듈 래퍼를 사용하여, Keras를 여러 GPU(일반적으로 2~16개)에서 트레이닝하는 방법을 가르칩니다. 이 설정은 단일 머신에 설치된 여러 GPU를 사용하는 싱글 호스트, 멀티 디바이스 트레이닝으로, 연구자들과 소규모 산업 워크플로우에서 가장 일반적으로 사용됩니다. 이 과정에서 코드를 최소한으로 변경하여 사용할 수 있습니다.
셋업
먼저, 우리가 트레이닝할 모델을 생성하는 함수와, 트레이닝할 데이터셋(MNIST)을 생성하는 함수를 정의해봅시다.
import os
os.environ["KERAS_BACKEND"] = "torch"
import torch
import numpy as np
import keras
def get_model():
# 배치 정규화와 드롭아웃이 포함된, 간단한 컨브넷(convnet)을 만듭니다.
inputs = keras.Input(shape=(28, 28, 1))
x = keras.layers.Rescaling(1.0 / 255.0)(inputs)
x = keras.layers.Conv2D(filters=12, kernel_size=3, padding="same", use_bias=False)(x)
x = keras.layers.BatchNormalization(scale=False, center=True)(x)
x = keras.layers.ReLU()(x)
x = keras.layers.Conv2D(filters=24, kernel_size=6, use_bias=False, strides=2)(x)
x = keras.layers.BatchNormalization(scale=False, center=True)(x)
x = keras.layers.ReLU()(x)
x = keras.layers.Conv2D(filters=32, kernel_size=6, padding="same", strides=2, name="large_k")(x)
x = keras.layers.BatchNormalization(scale=False, center=True)(x)
x = keras.layers.ReLU()(x)
x = keras.layers.GlobalAveragePooling2D()(x)
x = keras.layers.Dense(256, activation="relu")(x)
x = keras.layers.Dropout(0.5)(x)
outputs = keras.layers.Dense(10)(x)
model = keras.Model(inputs, outputs)
return model
def get_dataset():
# 데이터를 불러오고, 트레이닝과 테스트 세트로 나눕니다.
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()
# 이미지를 [0, 1] 범위로 스케일링합니다.
x_train = x_train.astype("float32")
x_test = x_test.astype("float32")
# 이미지가 (28, 28, 1) shape을 가지도록 합니다.
x_train = np.expand_dims(x_train, -1)
x_test = np.expand_dims(x_test, -1)
print("x_train shape:", x_train.shape)
# TensorDataset을 생성합니다.
dataset = torch.utils.data.TensorDataset(
torch.from_numpy(x_train), torch.from_numpy(y_train)
)
return dataset
이제 GPU를 대상으로 하는 간단한 PyTorch 트레이닝 루프를 정의해보겠습니다. (.cuda()
호출에 주목하세요)
def train_model(model, dataloader, num_epochs, optimizer, loss_fn):
for epoch in range(num_epochs):
running_loss = 0.0
running_loss_count = 0
for batch_idx, (inputs, targets) in enumerate(dataloader):
inputs = inputs.cuda(non_blocking=True)
targets = targets.cuda(non_blocking=True)
# 순방향 패스 (Forward pass)
outputs = model(inputs)
loss = loss_fn(outputs, targets)
# 역방향 패스 및 최적화 (Backward and optimize)
optimizer.zero_grad()
loss.backward()
optimizer.step()
running_loss += loss.item()
running_loss_count += 1
# 손실 통계 출력
print(
f"Epoch {epoch + 1}/{num_epochs}, "
f"Loss: {running_loss / running_loss_count}"
)
단일 호스트, 다중 장치 동기 트레이닝
이 설정에서는, 여러 개의 GPU가 있는 하나의 머신(일반적으로 2~16개의 GPU)에서 트레이닝을 진행합니다. 각 디바이스는 복제본(replica)이라고 불리는 모델의 사본을 실행합니다. 간단히 설명하기 위해, 다음 내용에서는 8개의 GPU를 사용하는 것으로 가정하겠습니다. 이는 일반성을 잃지 않습니다.
작동 방식
트레이닝의 각 단계에서:
- 현재 데이터 배치(글로벌 배치)는 8개의 서로 다른 하위 배치(로컬 배치)로 나뉩니다. 예를 들어, 글로벌 배치에 512개의 샘플이 있으면, 8개의 로컬 배치 각각에는 64개의 샘플이 포함됩니다.
- 8개의 복제본 각각은 로컬 배치를 독립적으로 처리합니다: 순전파를 실행한 후, 역전파를 수행하여, 모델 손실에 대한 가중치의 그래디언트를 출력합니다.
- 로컬 그래디언트로부터 발생한 가중치 업데이트는 8개의 복제본 간에 효율적으로 병합됩니다. 이 병합은 각 스텝이 끝날 때 이루어지기 때문에, 복제본은 항상 동기화된 상태를 유지합니다.
실제로, 모델 레플리카의 가중치를 동기적으로 업데이트하는 과정은 각 개별 가중치 변수 레벨에서 처리됩니다. 이는 미러드 변수(mirrored variable) 객체를 통해 이루어집니다.
사용 방법
단일 호스트에서 여러 장치로 동기식 트레이닝을 수행하려면, torch.nn.parallel.DistributedDataParallel
모듈 래퍼를 사용합니다. 아래는 그 동작 방식입니다:
torch.multiprocessing.start_processes
를 사용하여 장치별로 하나의 프로세스를 시작합니다. 각 프로세스는per_device_launch_fn
함수를 실행합니다.per_device_launch_fn
함수는 다음과 같은 작업을 수행합니다:torch.distributed.init_process_group
과torch.cuda.set_device
를 사용하여, 해당 프로세스에서 사용할 장치를 설정합니다.torch.utils.data.distributed.DistributedSampler
와torch.utils.data.DataLoader
를 사용하여, 데이터를 분산 데이터 로더로 변환합니다.torch.nn.parallel.DistributedDataParallel
을 사용하여, 모델을 분산된 PyTorch 모듈로 변환합니다.- 그런 다음
train_model
함수를 호출합니다.
train_model
함수는 각 프로세스에서 실행되며, 각 프로세스에서 모델은 별도의 장치를 사용합니다.
다음은 각 단계를 유틸리티 함수로 나눈 흐름입니다:
# 설정
num_gpu = torch.cuda.device_count()
num_epochs = 2
batch_size = 64
print(f"{num_gpu}개의 GPU에서 실행 중입니다")
def setup_device(current_gpu_index, num_gpus):
# 장치 설정
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "56492"
device = torch.device("cuda:{}".format(current_gpu_index))
torch.distributed.init_process_group(
backend="nccl",
init_method="env://",
world_size=num_gpus,
rank=current_gpu_index,
)
torch.cuda.set_device(device)
def cleanup():
torch.distributed.destroy_process_group()
def prepare_dataloader(dataset, current_gpu_index, num_gpus, batch_size):
sampler = torch.utils.data.distributed.DistributedSampler(
dataset,
num_replicas=num_gpus,
rank=current_gpu_index,
shuffle=False,
)
dataloader = torch.utils.data.DataLoader(
dataset,
sampler=sampler,
batch_size=batch_size,
shuffle=False,
)
return dataloader
def per_device_launch_fn(current_gpu_index, num_gpu):
# 프로세스 그룹 설정
setup_device(current_gpu_index, num_gpu)
dataset = get_dataset()
model = get_model()
# 데이터 로더 준비
dataloader = prepare_dataloader(dataset, current_gpu_index, num_gpu, batch_size)
# torch 최적화 도구 생성
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
# torch 손실 함수 생성
loss_fn = torch.nn.CrossEntropyLoss()
# 모델을 장치에 배치
model = model.to(current_gpu_index)
ddp_model = torch.nn.parallel.DistributedDataParallel(
model, device_ids=[current_gpu_index], output_device=current_gpu_index
)
train_model(ddp_model, dataloader, num_epochs, optimizer, loss_fn)
cleanup()
결과를 보려면 클릭하세요.
Running on 0 GPUs
/opt/conda/envs/keras-torch/lib/python3.10/site-packages/torch/cuda/__init__.py:611: UserWarning: Can't initialize NVML
warnings.warn("Can't initialize NVML")
멀티 프로세스를 시작하는 시간입니다:
if __name__ == "__main__":
# notebooks을 지원하기 위해, "spawn" 대신 "fork" 방식을 사용합니다.
torch.multiprocessing.start_processes(
per_device_launch_fn,
args=(num_gpu,),
nprocs=num_gpu,
join=True,
start_method="fork",
)
이제 끝났습니다!