Resnet등을 이용하는 해설이 있는 Transfer Learning¶
- 여기서는 케라스를 이용한 다음의 캐글 노트북에 한국어 해설을 추가. (https://www.kaggle.com/akensert/resnet50-keras-baseline-model)
- Input은 영상의학 CT 뇌 이미지이며, 뇌출혈을 판단하고자 함.
- Output은 뇌출혈의 종류 6가지를 맞추는것으로, 확률값을 출력.
- Input은 PNG또는 DICOM형태의 이미지
데이터 준비¶
- linux에서는 allow_growth를 하지않으면, 메모리 소비가 가용 메모리를 넘어버려 다음과같은 CuDNN 관련 에러가 발생
- <E tensorflow/stream_executor/cuda/cuda_dnn.cc:329] Could not create cudnn handle: CUDNN_STATUS_INTERNAL_ERROR>
In [1]:
import numpy as np
import pandas as pd
import pydicom
import os
import matplotlib.pyplot as plt
import collections
from tqdm import tqdm_notebook as tqdm
from datetime import datetime
from math import ceil, floor
import cv2
import tensorflow as tf
import keras
import sys
from keras_applications.resnet import ResNet50
from sklearn.model_selection import ShuffleSplit
from tensorflow.compat.v1 import ConfigProto
from tensorflow.compat.v1 import InteractiveSession
from sklearn.model_selection import train_test_split
config = ConfigProto()
config.gpu_options.allow_growth = True
session = InteractiveSession(config=config)
DICOM이미지 준비¶
- 제공되는 원본파일
In [2]:
test_images_dir = '../input/stage_1_test_images/'
train_images_dir = '../input/stage_1_train_images/'
PNG 이미지 준비¶
- PNG는 224픽셀로 줄어들은 이미지로 DICOM이미지보다 용량이 50배 적음
In [ ]:
train_224_dir = '../input/rsna-train-stage-1-images-png-224x/stage_1_train_png_224x/'
In [ ]:
def _get_first_of_dicom_field_as_int(x):
if type(x) == pydicom.multival.MultiValue:
return int(x[0])
else:
return int(x)
- window사이즈가 중요함. greyscale로 적절하게 병변을 나타내는것이 중요
In [ ]:
def _get_windowing(data):
dicom_fields = [data.WindowCenter, data.WindowWidth, data.RescaleSlope, data.RescaleIntercept]
return [_get_first_of_dicom_field_as_int(x) for x in dicom_fields]
- window사이즈에 따라 균형을 맞춰줌
In [ ]:
def _window_image(img, window_center, window_width, slope, intercept):
img = (img * slope + intercept)
img_min = window_center - window_width//2
img_max = window_center + window_width//2
return np.clip(img, img_min, img_max)
- 표준화 작업 한번 더
In [ ]:
def _normalize(img):
if img.max() == img.min():
return np.zeros(img.shape)
return 2 * (img - img.min())/(img.max() - img.min()) - 1
- 이미지를 읽되, 사이즈에 민감하므로 여러 사이즈를 시도해볼 필요 있음
In [4]:
def _read(path, desired_size=(224, 224)):
"""Will be used in DataGenerator"""
dcm = pydicom.dcmread(path)
window_params = _get_windowing(dcm) # (center, width, slope, intercept)
try:
img = _window_image(dcm.pixel_array, *window_params)
except:
img = np.zeros(desired_size)
img = _normalize(img)
# 512, 512가 아니라면, 픽셀을 다시 맞춰줌 (적은 해상도, 큰 해상도 모두 Try할 필요 있음)
if desired_size != (512, 512):
img = cv2.resize(img, desired_size, interpolation=cv2.INTER_LINEAR)
return img[:,:,np.newaxis]
Data Generator로 데이터 생성¶
- 모든 이미지를 죄다 올려서 하지 않을거라면, 무조건 생성해야하는 클래스
- 파일을 실시간으로 읽어서 데이터를 훈련용으로 필요할때 생성한다.
In [5]:
class DataGenerator(keras.utils.Sequence):
def __init__(self, list_IDs, labels=None, batch_size=1, img_size=(224, 224),
img_dir=train_images_dir, from_png= True, *args, **kwargs):
self.list_IDs = list_IDs
self.labels = labels
self.batch_size = batch_size
self.img_size = img_size
self.img_dir = img_dir
self.on_epoch_end()
self.from_png = from_png
## 한 epoch에 돌리는 step (배치와 연동되므로 배치만큼 나눠줌.)
def __len__(self):
return int(ceil(len(self.indices) / self.batch_size))
## 실제 데이터를 로딩하는 부분
def __getitem__(self, index):
indices = self.indices[index*self.batch_size:(index+1)*self.batch_size]
list_IDs_temp = [self.list_IDs[k] for k in indices]
if self.labels is not None:
if self.from_png == False :
X, Y = self.__data_generation(list_IDs_temp)
return X, Y
if self.from_png == True:#PNG전처리된 이미지를 쓴다.
X, Y = self.__data_generation_from_png(list_IDs_temp)
return X, Y
else:
X = self.__data_generation(list_IDs_temp)
return X
## png로부터 로딩할경우
def __data_generation_from_png(self, list_IDs_temp):
X = np.empty((self.batch_size, *self.img_size, 1))
Y = np.empty((self.batch_size, 6), dtype=np.float32)
for i, ID in enumerate(list_IDs_temp):
X[i,] = _read(self.img_dir+ID+".png", self.img_size)
Y[i,] = self.labels.loc
return X, Y
## DICOM으로부터 로딩할경우
def __data_generation(self, list_IDs_temp):
X = np.empty((self.batch_size, *self.img_size, 1))
if self.labels is not None: # training phase
Y = np.empty((self.batch_size, 6), dtype=np.float32)
for i, ID in enumerate(list_IDs_temp):
X[i,] = _read(self.img_dir+ID+".dcm", self.img_size)
Y[i,] = self.labels.loc[ID].values
return X, Y
else: # test phase
for i, ID in enumerate(list_IDs_temp):
X[i,] = _read(self.img_dir+ID+".dcm", self.img_size)
return X
## 한 EPOCH가 끝날때 실행
def on_epoch_end(self):
# 훈련에서는 불균형셋이므로 언더샘플링을 한다. [TUNING POINT]
if self.labels is not None:
# keep probability of any=0 and any=1
keep_prob = self.labels.iloc[:, 0].map({0: 0.5, 1: 1.0})
keep = (keep_prob > np.random.rand(len(keep_prob)))
self.indices = np.arange(len(self.list_IDs))[keep]
np.random.shuffle(self.indices)
else:
self.indices = np.arange(len(self.list_IDs))
In [6]:
from keras import backend as K
## competition이 Any에 대한 가중치가 있기 때문에 식이 이렇게 되어있음
def weighted_log_loss(y_true, y_pred):
class_weights = np.array([2., 1., 1., 1., 1., 1.])
eps = K.epsilon()
y_pred = K.clip(y_pred, eps, 1.0-eps)
out = -( y_true * K.log( y_pred) * class_weights
+ (1.0 - y_true) * K.log(1.0 - y_pred) * class_weights)
return K.mean(out, axis=-1)
def _normalized_weighted_average(arr, weights=None):
if weights is not None:
scl = K.sum(weights)
weights = K.expand_dims(weights, axis=1)
return K.sum(K.dot(arr, weights), axis=1) / scl
return K.mean(arr, axis=1)
def weighted_loss(y_true, y_pred):
"""
Will be used as the metric in model.compile()
---------------------------------------------
Similar to the custom loss function 'weighted_log_loss()' above
but with normalized weights, which should be very similar
to the official competition metric:
https://www.kaggle.com/kambarakun/lb-probe-weights-n-of-positives-scoring
and hence:
sklearn.metrics.log_loss with sample weights
"""
class_weights = K.variable([2., 1., 1., 1., 1., 1.])
eps = K.epsilon()
y_pred = K.clip(y_pred, eps, 1.0-eps)
loss = -( y_true * K.log( y_pred)
+ (1.0 - y_true) * K.log(1.0 - y_pred))
loss_samples = _normalized_weighted_average(loss, class_weights)
return K.mean(loss_samples)
# 시합용 Metric
def weighted_log_loss_metric(trues, preds):
class_weights = [2., 1., 1., 1., 1., 1.]
epsilon = 1e-7
preds = np.clip(preds, epsilon, 1-epsilon)
loss = trues * np.log(preds) + (1 - trues) * np.log(1 - preds)
loss_samples = np.average(loss, axis=1, weights=class_weights)
return - loss_samples.mean()
Checkpoint생성¶
- 중간중간 확인할 체크포인트를 형성한다
In [7]:
class PredictionCheckpoint(keras.callbacks.Callback):
def __init__(self, test_df, valid_df,
test_images_dir=test_images_dir,
valid_images_dir=train_images_dir,
batch_size=32, input_size=(224, 224)):
self.test_df = test_df
self.valid_df = valid_df
self.test_images_dir = test_images_dir
self.valid_images_dir = valid_images_dir
self.batch_size = batch_size
self.input_size = input_size
def on_train_begin(self, logs={}):
self.test_predictions = []
self.valid_predictions = []
# 한 Epoch이 끝날때마다, test에대한 prediction과 validation에 대한 prediction을 실행
def on_epoch_end(self,batch, logs={}):
self.test_predictions.append(
self.model.predict_generator(
DataGenerator(self.test_df.index, None, self.batch_size, self.input_size, self.test_images_dir), verbose=2)[:len(self.test_df)])
self.valid_predictions.append(
self.model.predict_generator(
DataGenerator(self.valid_df.index, None, self.batch_size, self.input_size, self.valid_images_dir), verbose=2)[:len(self.valid_df)])
print("validation loss: %.4f" %
weighted_log_loss_metric(self.valid_df.values,
np.average(self.valid_predictions, axis=0,
weights=[2**i for i in range(len(self.valid_predictions))])))
# here you could save the predictions with np.save()
In [8]:
class MyDeepModel:
def __init__(self, engine, input_dims, batch_size=5, num_epochs=4, learning_rate=1e-3,
decay_rate=1.0, decay_steps=1, weights="imagenet", verbose=1):
self.engine = engine #backbone에 대한 설정
self.input_dims = input_dims
self.batch_size = batch_size
self.num_epochs = num_epochs
self.learning_rate = learning_rate
self.decay_rate = decay_rate
self.decay_steps = decay_steps
self.weights = weights
self.verbose = verbose
self._build()
def _build(self):
# 초기의 시작점을 확인하고 input_dimension 설정
inputs = keras.layers.Input((*self.input_dims, 1))
x = keras.layers.Conv2D(filters=3, kernel_size=(1, 1), strides=(1, 1), name="initial_conv2d")(inputs)
x = keras.layers.BatchNormalization(axis=3, epsilon=1.001e-5, name='initial_bn')(x)
x = keras.layers.Activation('relu', name='initial_relu')(x)
# resnet등의 기본 backbone을 설정
engine = self.engine(include_top=False, weights=self.weights, input_shape=(*self.input_dims, 3),
backend = keras.backend, layers = keras.layers,
models = keras.models, utils = keras.utils)
x = engine(x)
# average풀링 방식으로 최종적으로 6개의 output을 출력
x = keras.layers.GlobalAveragePooling2D(name='avg_pool')(x)
out = keras.layers.Dense(6, activation="sigmoid", name='dense_output')(x)
self.model = keras.models.Model(inputs=inputs, outputs=out)
# 실제 모델을 컴파일
self.model.compile(loss=weighted_log_loss, optimizer=keras.optimizers.Adam(0.0), metrics=[weighted_loss])
def fit_and_predict(self, train_df, valid_df, test_df):
# validation score 계산용
pred_history = PredictionCheckpoint(test_df, valid_df)
# 모델 저장용
checkpointer = keras.callbacks.ModelCheckpoint(filepath='%s-{epoch:02d}.hdf5' % self.engine.__name__, verbose=1, save_weights_only=True, save_best_only=False)
# learning rate 저장용
scheduler = keras.callbacks.LearningRateScheduler(lambda epoch: self.learning_rate * pow(self.decay_rate, floor(epoch / self.decay_steps)))
self.model.fit_generator(
DataGenerator(
train_df.index,
train_df,
self.batch_size,
self.input_dims,
train_images_dir,
False
),
epochs=self.num_epochs,
verbose=self.verbose,
use_multiprocessing=True,
workers=4,
callbacks=[pred_history, scheduler]
)
return pred_history
def save(self, path):
self.model.save_weights(path)
def load(self, path):
self.model.load_weights(path)
In [ ]:
def read_testset(filename="../input/stage_1_sample_submission.csv"):
df = pd.read_csv(filename)
df["Image"] = df["ID"].str.slice(stop=12)
df["Diagnosis"] = df["ID"].str.slice(start=13)
df = df.loc[:, ["Label", "Diagnosis", "Image"]]
df = df.set_index(['Image', 'Diagnosis']).unstack(level=-1)
return df
In [9]:
def read_trainset(filename="../input/stage_1_train.csv"):
df = pd.read_csv(filename)
df["Image"] = df["ID"].str.slice(stop=12)
df["Diagnosis"] = df["ID"].str.slice(start=13)
duplicates_to_remove = [
1598538, 1598539, 1598540, 1598541, 1598542, 1598543,
312468, 312469, 312470, 312471, 312472, 312473,
2708700, 2708701, 2708702, 2708703, 2708704, 2708705,
3032994, 3032995, 3032996, 3032997, 3032998, 3032999
]
df = df.drop(index=duplicates_to_remove)
df = df.reset_index(drop=True)
df = df.loc[:, ["Label", "Diagnosis", "Image"]]
df = df.set_index(['Image', 'Diagnosis']).unstack(level=-1)
return df
test_df = read_testset()
df = read_trainset()
In [ ]:
# 80 / 20으로 비율 나눔
ss = ShuffleSplit(n_splits=5, test_size=0.2, random_state=42).split(df.index)
# shuffle split이기 떄문에 계속적으로 랜덤하게 바꿈 next를 통해 산출가능 (TUNING POINT)
next(ss)
train_idx, valid_idx = next(ss)
# 모델 확인 (weight라든지, engine은 바꾸는게 좋을수도.)
model = MyDeepModel(engine=ResNet50, input_dims=(224, 224), batch_size=16, learning_rate=1e-3,
num_epochs=6, decay_rate=1, decay_steps=1, weights="imagenet", verbose=1)
# 최종적으로 훈련시작
history = model.fit_and_predict(df.iloc[train_idx], df.iloc[valid_idx], test_df)
In [ ]:
test_df.iloc[:, :] = np.average(history.test_predictions, axis=0, weights=[2**i for i in range(len(history.test_predictions))])
test_df = test_df.stack().reset_index()
test_df.insert(loc=0, column='ID', value=test_df['Image'].astype(str) + "_" + test_df['Diagnosis'])
test_df = test_df.drop(["Image", "Diagnosis"], axis=1)
test_df.to_csv('submission.csv', index=False)
In [ ]: