article thumbnail image
Published 2022. 3. 31. 11:49

📌 이 글은 권철민 님의 딥러닝 CNN 완벽가이드를 바탕으로 작성한 포스팅입니다.

 


 

목차

  1. AlexNet 개요 
  2. AlexNet 구성
  3. AlexNet 구현
  4. CIFAR10 데이터셋을 이용한 AlexNet 학습 성능 및 테스트

AlexNet 개요 

AlexNet Structure

 

  • Convolution layer 5개 + Fully Connected Layer 3개로 구성
  • Activation 함수로 ReLU 함수를 처음으로 사용
  • MaxPooling 으로 Pooling 적용 및 Overlapping Pooling 적용
  • Local Response Normalization(LRN) 사용
  • Overfitting을 개선하기 위해서 Drop out Layer와 Weight의 Decay 기법 적용
  • Data Augmentation 적용(좌우 반전, Crop, PCA 변환 등)

AlexNet 구성

 

  • 11x11, 5x5 사이즈의 큰 사이즈의 Kernel 적용. 이후 3x3 Kernel을 3번 이어서 적용
  • Receptive Field가 큰 사이즈를 초기 Feature map에 적용하는 것이 보다 많은 feature 정보를 만드는데 효율적이라고 판단
  • 하지만 많은 weight parameter 갯수로 인하여 컴퓨팅 연산량이 크게 증가 함. 이를 극복하기 위하여 병렬 GPU를 활용할 수 있도록 CNN 모델을 병렬화하였다.

AlexNet 구현

from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense , Conv2D , Dropout , Flatten , Activation, MaxPooling2D , GlobalAveragePooling2D
from tensorflow.keras.optimizers import Adam , RMSprop 
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.callbacks import ReduceLROnPlateau , EarlyStopping , ModelCheckpoint , LearningRateScheduler
from tensorflow.keras import regularizers

# input shape, classes 개수, kernel_regularizer등을 인자로 가져감. 
def create_alexnet(in_shape=(227, 227, 3), n_classes=10, kernel_regular=None):
    # 첫번째 CNN->ReLU->MaxPool, kernel_size를 매우 크게 가져감(11, 11)
    input_tensor = Input(shape=in_shape)
    
    x = Conv2D(filters= 96, kernel_size=(11,11), strides=(4,4), padding='valid')(input_tensor)
    x = Activation('relu')(x)
    # Local Response Normalization을 대신하여 Batch Normalization 적용. 
    x = BatchNormalization()(x) # Batch size만큼 다시 정규화한다. 그래야 layer를 거치면서 분포가 크게 어긋나는 것을 예방할 수 있다.
    x = MaxPooling2D(pool_size=(3,3), strides=(2,2))(x) 


    # 두번째 CNN->ReLU->MaxPool. kernel_size=(5, 5)
    x = Conv2D(filters=256, kernel_size=(5,5), strides=(1,1), padding='same',kernel_regularizer=kernel_regular)(x)
    x = Activation('relu')(x)
    x = BatchNormalization()(x)
    x = MaxPooling2D(pool_size=(3,3), strides=(2,2))(x)


    # 3x3 Conv 2번 연속 적용. filters는 384개
    x = Conv2D(filters=384, kernel_size=(3,3), strides=(1,1), padding='same', kernel_regularizer=kernel_regular)(x)
    x = Activation('relu')(x)
    x = BatchNormalization()(x)

    x = Conv2D(filters=384, kernel_size=(3,3), strides=(1,1), padding='same', kernel_regularizer=kernel_regular)(x)
    x = Activation('relu')(x)
    x = BatchNormalization()(x)

    # 3x3 Conv를 적용하되 filters 수를 줄이고 maxpooling을 적용
    x = Conv2D(filters=256, kernel_size=(3,3), strides=(1,1), padding='same', kernel_regularizer=kernel_regular)(x)
    x = Activation('relu')(x)
    x = BatchNormalization()(x)
    x = MaxPooling2D(pool_size=(3,3), strides=(2,2))(x)

    # Dense 연결을 위한 Flatten
    x = Flatten()(x)

    # Dense + Dropout을 연속 적용. 
    x = Dense(units = 4096, activation = 'relu')(x)
    x = Dropout(0.5)(x) # 당시에는 굉장히 획기적

    x = Dense(units = 4096, activation = 'relu')(x)
    x = Dropout(0.5)(x)

    # 마지막 softmax 층 적용. 
    output = Dense(units = n_classes, activation = 'softmax')(x)

    model = Model(inputs=input_tensor, outputs=output)
    model.summary()
    
    return model

 

model = create_alexnet(in_shape=(227, 227, 3), n_classes=10, kernel_regular=regularizers.l2(l2=1e-4))

 

파라미터 값이 정말 많아서 당시 하드웨어로 구현하는게 쉽지 않았다.


CIFAR10 데이터셋을 이용한 AlexNet 학습 성능 및 테스트

from tensorflow.keras.datasets import cifar10

# 전체 6만개 데이터 중, 5만개는 학습 데이터용, 1만개는 테스트 데이터용으로 분리
(train_images, train_labels), (test_images, test_labels) = cifar10.load_data()
print("train dataset shape:", train_images.shape, train_labels.shape)
print("test dataset shape:", test_images.shape, test_labels.shape)

 


학습/검증/테스트 데이터 세트로 나누고 데이터 전처리 수행

  • 학습/검증/테스트 데이터로 분할. 검증 데이터는 학습 데이터의 20% 할당.
  • 레이블의 원-핫 인코딩과 이미지 픽셀값의 스케일링 적용
import tensorflow as tf
import numpy as np
import pandas as pd

import random as python_random
from tensorflow.keras.utils import to_categorical
from sklearn.model_selection import train_test_split
from tensorflow.keras.datasets import cifar10

def zero_one_scaler(image):
    return image/255.0

def get_preprocessed_ohe(images, labels, pre_func=None):
    # preprocessing 함수가 입력되면 이를 이용하여 image array를 scaling 적용.
    if pre_func is not None:
        images = pre_func(images)
    # OHE 적용    
    oh_labels = to_categorical(labels) # 넘파이에 바로 적용하려면 to_categorical
    return images, oh_labels

# 학습/검증/테스트 데이터 세트에 전처리 및 OHE 적용한 뒤 반환 
def get_train_valid_test_set(train_images, train_labels, test_images, test_labels, valid_size=0.15, random_state=2021):
    # 학습 및 테스트 데이터 세트를  0 ~ 1사이값 float32로 변경 및 OHE 적용. 
    train_images, train_oh_labels = get_preprocessed_ohe(train_images, train_labels)
    test_images, test_oh_labels = get_preprocessed_ohe(test_images, test_labels)
    
    # 학습 데이터를 검증 데이터 세트로 다시 분리
    tr_images, val_images, tr_oh_labels, val_oh_labels = train_test_split(train_images, train_oh_labels, test_size=valid_size, random_state=random_state)
    
    return (tr_images, tr_oh_labels), (val_images, val_oh_labels), (test_images, test_oh_labels )

# CIFAR10 데이터 재 로딩 및 Scaling/OHE 전처리 적용하여 학습/검증/데이터 세트 생성. 
(train_images, train_labels), (test_images, test_labels) = cifar10.load_data()
print(train_images.shape, train_labels.shape, test_images.shape, test_labels.shape)
(tr_images, tr_oh_labels), (val_images, val_oh_labels), (test_images, test_oh_labels) = \
    get_train_valid_test_set(train_images, train_labels, test_images, test_labels, valid_size=0.2, random_state=2021)

print(tr_images.shape, tr_oh_labels.shape, val_images.shape, val_oh_labels.shape, test_images.shape, test_oh_labels.shape)

 

# 이미지 사이즈가 너무 작으면 모델의 MaxPooling에서 오류 발생. 
model = create_alexnet(in_shape=(128, 128, 3), n_classes=10, kernel_regular=regularizers.l2(l2=1e-4))

인풋텐서의 사이즈를 128x128 로 해주자


CIFAR10 원본 이미지 크기 32x32 를 128x128로 증가 시키는 Sequence Dataset 생성

  • 128x128로 CIFAR10 모든 이미지 배열값을 증가시키면 RAM 부족 발생.
  • 배치 크기 만큼의 개수만 원본 이미지를 128x128로 증가 시킨 뒤(opencv의 resize()), 이를 모델에 입력하는 로직으로 Sequence Dataset 구성.

 

IMAGE_SIZE = 128
BATCH_SIZE = 64

 

from tensorflow.keras.utils import Sequence
import cv2
import sklearn

# 입력 인자 images_array labels는 모두 numpy array로 들어옴. 
# 인자로 입력되는 images_array는 전체 32x32 image array임. 
class CIFAR_Dataset(Sequence):
    def __init__(self, images_array, labels, batch_size=BATCH_SIZE, augmentor=None, shuffle=False, pre_func=None):
        '''
        파라미터 설명
        images_array: 원본 32x32 만큼의 image 배열값. 
        labels: 해당 image의 label들
        batch_size: __getitem__(self, index) 호출 시 마다 가져올 데이터 batch 건수
        augmentor: albumentations 객체
        shuffle: 학습 데이터의 경우 epoch 종료시마다 데이터를 섞을지 여부
        '''
        # 객체 생성 인자로 들어온 값을 객체 내부 변수로 할당. 
        # 인자로 입력되는 images_array는 전체 32x32 image array임.
        self.images_array = images_array
        self.labels = labels
        self.batch_size = batch_size
        self.augmentor = augmentor
        self.pre_func = pre_func
        # train data의 경우 
        self.shuffle = shuffle
        if self.shuffle:
            # 객체 생성시에 한번 데이터를 섞음. 
            #self.on_epoch_end()
            pass
    
    # Sequence를 상속받은 Dataset은 batch_size 단위로 입력된 데이터를 처리함. 
    # __len__()은 전체 데이터 건수가 주어졌을 때 batch_size단위로 몇번 데이터를 반환하는지 나타남
    def __len__(self):
        # batch_size단위로 데이터를 몇번 가져와야하는지 계산하기 위해 전체 데이터 건수를 batch_size로 나누되, 정수로 정확히 나눠지지 않을 경우 1회를 더한다. 
        return int(np.ceil(len(self.labels) / self.batch_size))
    
    # batch_size 단위로 image_array, label_array 데이터를 가져와서 변환한 뒤 다시 반환함
    # 인자로 몇번째 batch 인지를 나타내는 index를 입력하면 해당 순서에 해당하는 batch_size 만큼의 데이타를 가공하여 반환
    # batch_size 갯수만큼 변환된 image_array와 label_array 반환. 
    def __getitem__(self, index):
        # index는 몇번째 batch인지를 나타냄. 
        # batch_size만큼 순차적으로 데이터를 가져오려면 array에서 index*self.batch_size:(index+1)*self.batch_size 만큼의 연속 데이터를 가져오면 됨
        # 32x32 image array를 self.batch_size만큼 가져옴. 
        images_fetch = self.images_array[index*self.batch_size:(index+1)*self.batch_size]
        if self.labels is not None:
            label_batch = self.labels[index*self.batch_size:(index+1)*self.batch_size]
        
        # 만일 객체 생성 인자로 albumentation으로 만든 augmentor가 주어진다면 아래와 같이 augmentor를 이용하여 image 변환
        # albumentations은 개별 image만 변환할 수 있으므로 batch_size만큼 할당된 image_name_batch를 한 건씩 iteration하면서 변환 수행. 
        # 변환된 image 배열값을 담을 image_batch 선언. image_batch 배열은 float32 로 설정. 
        image_batch = np.zeros((images_fetch.shape[0], IMAGE_SIZE, IMAGE_SIZE, 3), dtype='float32')
        
        # batch_size에 담긴 건수만큼 iteration 하면서 opencv image load -> image augmentation 변환(augmentor가 not None일 경우)-> image_batch에 담음. 
        for image_index in range(images_fetch.shape[0]):
            #image = cv2.cvtColor(cv2.imread(image_name_batch[image_index]), cv2.COLOR_BGR2RGB) # image array가 들어오니까 할 필요 없다
            # 원본 image를 IMAGE_SIZE x IMAGE_SIZE 크기로 변환
            image = cv2.resize(images_fetch[image_index], (IMAGE_SIZE, IMAGE_SIZE))
            # 만약 augmentor가 주어졌다면 이를 적용. 
            if self.augmentor is not None:
                image = self.augmentor(image=image)['image']
                
            # 만약 scaling 함수가 입력되었다면 이를 적용하여 scaling 수행. 
            if self.pre_func is not None:
                image = self.pre_func(image)
            
            # image_batch에 순차적으로 변환된 image를 담음.               
            image_batch[image_index] = image
        
        return image_batch, label_batch
    
    # epoch가 한번 수행이 완료 될 때마다 모델의 fit()에서 호출됨. 
    def on_epoch_end(self):
        if(self.shuffle):
            #print('epoch end')
            # 원본 image배열과 label를 쌍을 맞춰서 섞어준다. scikt learn의 utils.shuffle에서 해당 기능 제공
            self.images_array, self.labels = sklearn.utils.shuffle(self.images_array, self.labels)
        else:
            pass

 

def zero_one_scaler(image):
    return image/255.0

tr_ds = CIFAR_Dataset(tr_images, tr_oh_labels, batch_size=BATCH_SIZE, augmentor=None, shuffle=True, pre_func=zero_one_scaler)
val_ds = CIFAR_Dataset(val_images, val_oh_labels, batch_size=BATCH_SIZE, augmentor=None, shuffle=False, pre_func=zero_one_scaler)

print(next(iter(tr_ds))[0].shape, next(iter(val_ds))[0].shape)
print(next(iter(tr_ds))[1].shape, next(iter(val_ds))[1].shape)
print(next(iter(tr_ds))[0][0])

 


Input 크기가 128x128x3 인 AlexNet 모델을 생성하고 epochs는 30으로 설정하고 학습

model = create_alexnet(in_shape=(128, 128, 3), n_classes=10, kernel_regular=regularizers.l2(l2=1e-4))

model.compile(optimizer=Adam(lr=0.001), loss='categorical_crossentropy', metrics=['accuracy'])

# 5번 iteration내에 validation loss가 향상되지 않으면 learning rate을 기존 learning rate * 0.2로 줄임.  
rlr_cb = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5, mode='min', verbose=1)
ely_cb = EarlyStopping(monitor='val_loss', patience=10, mode='min', verbose=1)

history = model.fit(tr_ds, epochs=30, 
                    #steps_per_epoch=int(np.ceil(tr_images.shape[0]/BATCH_SIZE)),
                    validation_data=val_ds, 
                    #validation_steps=int(np.ceil(val_images.shape[0]/BATCH_SIZE)), 
                    callbacks=[rlr_cb, ely_cb]
                   )

test_ds = CIFAR_Dataset(test_images, test_oh_labels, batch_size=BATCH_SIZE, augmentor=None, shuffle=False, pre_func=zero_one_scaler)
model.evaluate(test_ds)

정확도가 81% 가까이 나온다.

learning_rate를 0.0001로 해보면 더 좋은 결과가 나온다.

'🖼 Computer Vision > CNN' 카테고리의 다른 글

CNN - GoogLeNet  (0) 2022.04.01
CNN - VGGNet  (0) 2022.04.01
CNN - Image data scaling preprocessing  (0) 2022.03.27
OpenCV 이미지 로딩시 BGR을 RGB로 변환해야 하는 이유  (0) 2022.03.27
CNN - Data Augmentation  (0) 2022.03.04
복사했습니다!