Moon-Kee Bahk(박문기)
33 min readNov 9, 2020

[Keras/Tensorflow환경에서 다중 CPU, NVIDIA-GPU, Google-TUP, Graphcore IPU 분산(병렬)처리 설정법]

[How to use/setup/config Multi CPUs, NVIDIA-GPUs, Google-TPUs, Graphcore-IPUs Distributed(Parallel) Processing on Kerea/Tensorflow]

본 문서에서는 Keras/Tensorflow환경에서 다중의 CPU, GPU, TPU, IPU를 통해 Deep Learning 분산처리/병렬프로세싱을 설정하는 방법에 대해서 기술합니다.

[설치환경]

  1. O/S: Ubuntu 18.04 LTS

2. 각종 Driver설치법, 각 벤더에 최적화된 Tensorflow, Tensorflow-gpu, Tensorflow-IPU등의 설치방법은 기술하지 않았습니다.

3. 각 수행환경을 확인하는 방법은 기술했습니다.

4. 각 분산(병렬)수행 환경에서 결과와 시간을 비교측정했습니다. 그러나 Code의 변환에 촛점을 맞추었지, 각 벤더의 환경에 최적화된 Code를 비교하지 않았습니다.

5. Intel-CPU는 2소켓, 총40 코어입니다.

6. NVIDIA-GPU는 V100 4대를 사용했습니다.

7. Graphcore-IPU는 GC-2 8개중 2개를 사용했습니다.

[분산처리없는 CPU 수행환경]

MNIST파일을 다중 CPU 분산을 수행하지 않는 코드입니다.

# 0. 모듈 인포트

from tensorflow import keras

from tensorflow.keras import utils

from tensorflow.keras.datasets import mnist

from tensorflow.keras.models import Sequential

from tensorflow.keras.layers import Dense, Activation

import time

start = time.time() #시작시간 저장

# 1. 데이타셋 준비하기

(X_train, Y_train), (X_test, Y_test) = mnist.load_data()

X_train = X_train.reshape(60000, 28*28).astype(‘float32’) / 255.0

X_test = X_test.reshape(10000, 28*28).astype(‘float32’) / 255.0

Y_train = utils.to_categorical(Y_train)

Y_test = utils.to_categorical(Y_test)

# Overfitting을 찾아보기 위해 Traing Data세트중에서 Validation Set으로 추가분할

X_val = X_train[50000:60000]

Y_val = Y_train[50000:60000]

X_train = X_train[0:50000]

Y_train = Y_train[0:50000]

# 2. 모델 구성하기

model = Sequential()

model.add(Dense(64, input_dim=28*28, activation=’relu’, kernel_initializer=’glorot_uniform’, bias_initializer=’zeros’))

model.add(Dense(32, activation=’relu’))

model.add(Dense(16, activation=’relu’))

model.add(Dense(10, activation=’softmax’))

# 3. 모델 엮기

model.compile(optimizer=keras.optimizers.SGD(learning_rate=0.1), loss=’categorical_crossentropy’, metrics=[‘accuracy’])

model.summary()

# 4. 학습Training

model.fit(x=X_train, y=Y_train, epochs=10, batch_size=8, validation_data=(X_val, Y_val) )

# 5. 추론Inference

loss_and_metrics = model.evaluate(x=X_test, y=Y_test, batch_size=8)

print(“loss_and_metrics : “ + str(loss_and_metrics))

print(“Running Time :”, round(time.time() — start, 2), “(Sec.)”) # 현재시간-시작시간= 실행시간

수행의 결과는 아래와 같습니다.

수행 시 CPU는 분산처리가 되지 않아 다음과 같이 각 CPU를 골고루 사용하지 않습니다. 수행시간은 약 82.95 초 걸렸습니다.

GitHub Code URL:

아래 keras 서술형에 분산전략을 설정했으나 CPU분산이 되지 않았습니다. 원인을 찾고있는 중입니다.

# Module Import

import tensorflow as tf

from tensorflow import keras

# Load MNIST DataSet

(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()

# one-hot enconding 수행

# 5 → 0 0 0 0 0 1 0 0 0 0

# 1 → 0 1 0 0 0 0 0 0 0 0

y_train = keras.utils.to_categorical(y=y_train, num_classes = 10)

y_test = keras.utils.to_categorical(y=y_test, num_classes=10)

#Reshaping DataSet

x_train = x_train.reshape(60000, 28*28)

x_test = x_test.reshape(10000, 28*28)

print(y_train.shape, y_test.shape)

print(x_train.shape, x_test.shape)

#모델생성

strategy = tf.distribute.get_strategy()

with strategy.scope():

model = keras.Sequential([

keras.layers.Dense(32, activation=’sigmoid’, input_shape=(28*28,)),

keras.layers.Dense(32, activation=’sigmoid’),

keras.layers.Dense(10, activation=’sigmoid’)])

#모델컴파일

model.compile(optimizer=keras.optimizers.SGD(learning_rate=0.1), loss=”categorical_crossentropy”, metrics=[‘accuracy’])

model.summary()

#모델훈련

model.fit(x=x_train, y=y_train, batch_size=128, epochs=10, validation_data=(x_test, y_test))

#모델 평가

model.evaluate(x_test, y_test )

#end of with

수행시 CPU 사용형태입니다.

결과는 아래와 같습니다.

코드는 아래에 있습니다.

[다중 CPU 코어 분산처리 설정]

코드는 아래와 같습니다.

# Copyright 2020 Graphcore Ltd.

import tensorflow as tf

from tensorflow import keras

import time

print(tf.config.list_physical_devices(“CPU”), “\n\n”)

if tf.__version__[0] != ‘2’:

raise ImportError(“TensorFlow 2 is required for this example”)

# The input data and labels.

mnist = tf.keras.datasets.mnist

start = time.time() # 시작 시간 저장

(x_train, y_train), (x_test, y_test) = mnist.load_data()

(x_train, x_test) = (x_train / 255.0, x_test / 255.0)

# Add a channels dimension.

x_train = x_train[…, tf.newaxis]

x_test = x_test[…, tf.newaxis]

def create_train_dataset():

print(“==============================Processing Training DataSet==============================\n\n”)

train_ds = tf.data.Dataset.from_tensor_slices((x_train, y_train)).shuffle(60000).batch(1, drop_remainder=True)

train_ds = train_ds.map(lambda d, l: (tf.cast(d, tf.float32), tf.cast(l, tf.float32)))

return train_ds.repeat()

#end of def

def create_test_dataset():

print(“==============================Processing Test DataSet==============================\n\n”)

test_ds = tf.data.Dataset.from_tensor_slices((x_test, y_test)).shuffle(10000).batch(1, drop_remainder=True)

test_ds = test_ds.map(lambda d, l: (tf.cast(d, tf.float32), tf.cast(l, tf.float32)))

return test_ds.repeat()

#end of def

# Create the model using the IPU-specific Sequential class instead of the

# standard tf.keras.Sequential class

def create_model():

model = keras.Sequential([

keras.layers.Flatten(),

keras.layers.Dense(128, activation=’relu’),

keras.layers.Dense(256, activation=’relu’),

keras.layers.Dense(128, activation=’relu’),

keras.layers.Dense(10, activation=’softmax’)])

return model

#end of def

strategy = tf.distribute.get_strategy()

def main():

# Get the training dataset.

print(“==============================Getting Training DataSet==============================\n\n”)

ds1 = create_train_dataset()

print(“==============================Getting Test DataSet==============================\n\n”)

ds2 = create_test_dataset()

# Create an instance of the model.

print(“==============================Building Model==============================\n\n”)

model = create_model()

# Train the model.

print(“==============================Start Model Training==============================\n\n”)

with strategy.scope():

model.compile(loss = tf.keras.losses.SparseCategoricalCrossentropy(), optimizer = tf.keras.optimizers.Adam(), metrics=[‘sparse_categorical_accuracy’])

model.fit(ds1, steps_per_epoch=2000, epochs=50)

print(“\n\n==============================Checking the result==============================\n\n”)

(loss, accuracy) = model.evaluate(ds2, steps=1000)

print(“Validation loss: {}”.format(loss))

print(“Validation accuracy: {}%”.format(100.0 * accuracy))

print(“\n\n==============================Job Done by Intel CPU with 2 Sockets, 40 Cores !!!==============================”)

#end of with:

#end of main

if __name__ == ‘__main__’:

main()

print(“Running Time :”, round(time.time() — start, 2),”(Sec.)”) # 현재시각 — 시작시간 = 실행 시간

프로그램 코드를 함수형태로 변경하고,

main()함수 위에 “strategy = tf.distribute.get_strategy()” 명령을 이용해 다중 CPU, 단일 GPU 일때 사용하는 기본분산전략을 수행합니다.

아래와 같이 분산전략이 잘 수행됩니다. CPU도 분산처리가 잘되고, Process도 CPU 코어 숫자만큼 만들어 집니다.

결과는 아래와 같습니다

중간생략

검증손실: 0.11, 검증정확도: 97.10, 총수행시간은 약 97.1초 걸렸습니다.

코드는 아래에 있습니다.

[다중 NVIDIA GPU 분산처리 설정]

현재 테스트환경 GPU는 NVIDIA V100 모델이면 총 숫자는 4개입니다.

우분투에서 NVIDIA 드라이버 설치 방법 성공적으로 되었는지 확인 하기 위해서 아래의 명령어를 실행합니다.

$ sudo lspci -k | grep NVIDIA -A5

아래의 /proc/driver/nvidia/version 파일의 내용을 확인하면 드라이버 버전을 확인 할 수 있습니다.

$ sudo cat /proc/driver/nvidia/version

다음 명령으로 GPU관련 시스템 정보를 확인합니다.

$nvidia-smi

하드웨어 및 드라이버가 이상없이 동작하고, 위 명령의 윗부분에 Driver Version, CUDA Version등이 확인됩니다.

아래 명령으로 CUDA 세부버젼을 확인합니다.

$nvcc -V

아래와 같은 다중 GPU를 사용할 수 있도록 분산전략을 가진 코드를 작성하였습니다.

import tensorflow as tf

from tensorflow import keras

import time

import os

if tf.__version__[0] != ‘2’:

raise ImportError(“TensorFlow 2 is required for this example”)

print(“Tensorflow version “ + tf.__version__)

print(tf.config.list_physical_devices(“CPU”))

print(tf.config.list_physical_devices(“GPU”))

gpus = tf.config.experimental.list_logical_devices(“GPU”)

if len(gpus) > 1:

strategy = tf.distribute.MirroredStrategy([gpu.name for gpu in gpus])

print(‘\n\nRunning on multiple GPUs ‘, [gpu.name for gpu in gpus])

else:

strategy = tf.distribute.get_strategy() # default strategy that works on CPU and single GPU

print(‘\n\nRunning on single GPU ‘, gpus[0].name)

print(“\n\nNumber of accelerators: “, strategy.num_replicas_in_sync,”\n\n”)

# The input data and labels.

mnist = tf.keras.datasets.mnist

#start = time.time() # 시작 시간 저장

(x_train, y_train), (x_test, y_test) = mnist.load_data()

(x_train, x_test) = (x_train / 255.0, x_test / 255.0)

# Add a channels dimension.

x_train = x_train[…, tf.newaxis]

x_test = x_test[…, tf.newaxis]

def create_train_dataset():

print(“==============================Processing Training DataSet==============================\n\n”)

train_ds = tf.data.Dataset.from_tensor_slices((x_train, y_train)).shuffle(60000).batch(1, drop_remainder=True)

train_ds = train_ds.map(lambda d, l: (tf.cast(d, tf.float32), tf.cast(l, tf.float32)))

return train_ds.repeat()

def create_test_dataset():

print(“==============================Processing Test DataSet==============================\n\n”)

test_ds = tf.data.Dataset.from_tensor_slices((x_test, y_test)).shuffle(10000).batch(1, drop_remainder=True)

test_ds = test_ds.map(lambda d, l: (tf.cast(d, tf.float32), tf.cast(l, tf.float32)))

return test_ds.repeat()

# standard tf.keras.Sequential class

def create_model():

model = tf.keras.Sequential([

keras.layers.Flatten(),

keras.layers.Dense(128, activation=’relu’),

keras.layers.Dense(256, activation=’relu’),

keras.layers.Dense(128, activation=’relu’),

keras.layers.Dense(10, activation=’softmax’)])

model.compile(loss = tf.keras.losses.SparseCategoricalCrossentropy(),

optimizer = tf.keras.optimizers.Adam(),

metrics=[‘sparse_categorical_accuracy’])

return model

start = time.time() # 시작 시간 저장

def main():

with strategy.scope():

# Get the training dataset.

print(“==============================Getting Training DataSet==============================\n\n”)

ds1 = create_train_dataset()

print(“==============================Getting Test DataSet==============================\n\n”)

ds2 = create_test_dataset()

# Create an instance of the model.

print(“==============================Building Model & Compile ==============================\n\n”)

model = create_model()

print(“==============================Model Training ==============================\n\n”)

model.fit(ds1, steps_per_epoch=2000, epochs=10)

print(“\n\n==============================Checking the result==============================\n\n”)

loss, accuracy = model.evaluate(ds2, steps=1000)

print(“Validation loss: {}”.format(loss))

print(“Validation accuracy: {}%”.format(100.0 * accuracy))

print(“\n\n==============================Finished Training by….==============================”)

#end of with:

if __name__ == ‘__main__’:

main()

print(“실행시간 :”, round(time.time() — start, 2),”(초)”) # 현재시각 — 시작시간 = 실행 시간

위의 코드에서 GPU가 2개 이상인 경우 “tf.distribute.MirroredStrategy([gpu.name for gpu in gpus])” 전략을 사용하고, 1개밖에 없는 경우는 경우 “tf.distribute.get_strategy()” 를 사용합니다.

수행한 결과는 아래와 같습니다. “watch -n 1 nvidia-smi” 명령으로 다중 GPU사용을 모니터링 합니다.

위와 같이 GPU-Util 이 최고 100%을 사용하지 못하고 있지만, 분산처리를 수행되고 있습니다.

수행화면은 아래와 같습니다.

검증손실: 0.35, 검증정확도: 90.89, 전체 수행시간은 69.45정도 수행되었습니다.

코드는 아래에 있습니다.

[다중 Google TPU 코어 분산처리 설정]

TPU는 colab.research.google.com을 이용해 작성했습니다. colab.research.google.com에서 “[런타임][런타임 유형변경][하드웨어 가속기] 에서 “TPU”를 선택하세요.

전체코드는 아래와 같습니다.

git에서 위쪽부분에 “Open in Colab” 단추를 눌러 google colab에서 오픈해서 보시면 편합니다.

TPU를 사용하도록 설정하는 과정은 아래의 코드입니다.

TPU는 초기화 과정이 오래 걸립니다.

분산전략 적용부분은 아래와 같이 설정합니다.

수행결과는 아래와 같습니다.

검증손실: 0.16, 검증정확도: 98.28, 수행시간은 235.56 초 입니다.

전체 코드는 아래에서 확인하세요.

[다중 Graphcore IPU 분산처리 설정]

Graphcore IPU의 Device Driver나 Poplar Library(NVIDIA CUDA, AMD ROCm,..과 같음), 그리고 PopART(Graphcore Deep Learning Framework, Tensorflow, PyTorch와 같음)를 설치하고, Graphcore IPU에 최적화된 Tensorflow-ipu를 설정하는 방법은 아래의 git에 있습니다. AMD와 Intel도 각 회사의 장비에 최적화된 Tensorflow나 PyTorch를 제공합니다.

여기서는 NVIDIA GPU처럼 Device Driver, Poplar Lib등이 제대로 설치되어 있는가를 확인합니다. 참고로 모든 소프트웨어들을 Python Virtual 환경에 분리해서 설치해 두었습니다.

PCIe 컨트롤러가 IPU를 인식한 것을 확인합니다.

$ sudo lspci -k | grep ipu -A5

Graphcore IPU Device Driver가 설치된 것을 확인합니다.

$ modinfo ipu-driver

poplar library정상 설치 확인

$ popc — version

popart 정상설치 확인명령

$python3

>>>import popart

>>>print(popart.__vesion__)

설치된 ipu 갯수와 동작을 확인하는 명령

$gc-monitor

총 7개의 Graphcore IPU가 보입니다. gc-monitor는 nvidia-smi, amd-smi 명령과 같습니다.

tensorflow-ipu 버젼도 정상적으로 설치 및 동작을 확인합니다.

$python3

>>>import tensorflow as tf

>>>print(tf.__version__)

Graphcore IPU 분산코드는 아래와 같습니다.

# Copyright 2020 Graphcore Ltd.

import tensorflow as tf

from tensorflow import keras

import time

from tensorflow.python import ipu

print(tf.config.list_physical_devices(“IPU”), “\n\n”)

# Configure the IPU system

cfg = ipu.utils.create_ipu_config()

cfg = ipu.utils.auto_select_ipus(cfg, 4)

ipu.utils.configure_ipu_system(cfg)

if tf.__version__[0] != ‘2’:

raise ImportError(“TensorFlow 2 is required for this example”)

# The input data and labels.

mnist = tf.keras.datasets.mnist

start = time.time() # 시작 시간 저장

(x_train, y_train), (x_test, y_test) = mnist.load_data()

(x_train, x_test) = (x_train / 255.0, x_test / 255.0)

# Add a channels dimension.

x_train = x_train[…, tf.newaxis]

x_test = x_test[…, tf.newaxis]

def create_train_dataset():

print(“==============================Processing Training DataSet==============================\n\n”)

train_ds = tf.data.Dataset.from_tensor_slices((x_train, y_train)).shuffle(60000).batch(1, drop_remainder=True)

train_ds = train_ds.map(lambda d, l: (tf.cast(d, tf.float32), tf.cast(l, tf.float32)))

return train_ds.repeat()

#end of def

def create_test_dataset():

print(“==============================Processing Test DataSet==============================\n\n”)

test_ds = tf.data.Dataset.from_tensor_slices((x_test, y_test)).shuffle(10000).batch(1, drop_remainder=True)

test_ds = test_ds.map(lambda d, l: (tf.cast(d, tf.float32), tf.cast(l, tf.float32)))

return test_ds.repeat()

#end of def

# Create the model using the IPU-specific Sequential class instead of the

# standard tf.keras.Sequential class

def create_model():

model = ipu.keras.Sequential([

keras.layers.Flatten(),

keras.layers.Dense(128, activation=’relu’),

keras.layers.Dense(256, activation=’relu’),

keras.layers.Dense(128, activation=’relu’),

keras.layers.Dense(10, activation=’softmax’)])

return model

#end of def

def main():

# Get the training dataset.

print(“==============================Getting Training DataSet==============================\n\n”)

ds1 = create_train_dataset()

print(“==============================Getting Test DataSet==============================\n\n”)

ds2 = create_test_dataset()

# Create an instance of the model.

print(“==============================Building Model==============================\n\n”)

model = create_model()

# Train the model.

print(“==============================Start Model Training==============================\n\n”)

strategy = ipu.ipu_strategy.IPUStrategy()

with strategy.scope():

model.compile(loss = tf.keras.losses.SparseCategoricalCrossentropy(), optimizer = tf.keras.optimizers.Adam(), metrics=[‘sparse_categorical_accuracy’])

model.fit(ds1, steps_per_epoch=2000, epochs=50)

print(“\n\n==============================Checking the result==============================\n\n”)

(loss, accuracy) = model.evaluate(ds2, steps=1000)

print(“Validation loss: {}”.format(loss))

print(“Validation accuracy: {}%”.format(100.0 * accuracy))

print(“\n\n==============================Job Done by Graphcore IPU with 16 Sockets, 19,456 Cores!!!==============================”)

#end of with:

#end of def

if __name__ == ‘__main__’:

main()

print(“Running Time :”, round(time.time() — start, 2),”(Sec.)”) # 현재시각 — 시작시간 = 실행 시간

기본적인 코드는 Google TPU와 유사합니다.

IPU를 인식하고 설정하는 부분은

# Configure the IPU system

cfg = ipu.utils.create_ipu_config()

cfg = ipu.utils.auto_select_ipus(cfg, 4) //마지막 “4”라는 숮자가 4개의 IPU를 사용하겠다는 의미입니다. 1~16개까지 사용이 가능합니다.

ipu.utils.configure_ipu_system(cfg)

모델을 만들 때 tf.keras.Sequential 대신에 ipu.keras.Sequential 을 사용합니다.

def create_model():

model = ipu.keras.Sequential([

keras.layers.Flatten(),

분산전략 수립과 적용은 아래와 같습니다.

strategy = ipu.ipu_strategy.IPUStrategy()

with strategy.scope():

수행결과는 아래와 같습니다. 아래 예는 2개의 IPU를 사용합니다.

중간생략

검증손실: 0.12, 검증정확도: 96.79, 수행시간 10.9초 입니다. 놀라운 속도입니다. 단 2개의 IPU만으로 NVIDIA V100 4개, Google TPU 8 Core보다 엄청난 속도로 빠르게 수행합니다.

수행 중 gc-monitor를 이용한 ipu사용 현황입니다.

전체코드는 아래 git에 있습니다.

이상. 끝.