In [None]:
import numpy as np

# Configuration

In [None]:
configuration = {}
configuration['image_size'] = 25
configuration['n_classes'] = 6
configuration['n_channels'] = 3
configuration['epochs'] = 20
configuration['stopper_patience'] = 1
configuration['val_split'] = 0.3
configuration['model_name'] = 'histonet'
configuration['checkpoint_file'] = 'models/{}.h5'.format(
    configuration['model_name'])

# Utilities

In [None]:
from os import listdir

from scipy.misc import imread
from scipy.misc import imresize

from keras.utils import np_utils

def loadImages(path):
    imagesList = listdir(path)
    n_images = len(imagesList)
    dim = configuration['image_size']
    n_channels = configuration['n_channels']
    
    loadedImages = np.empty((n_images, dim, dim, n_channels))

    counter = 0
    for image in imagesList:
        img = imread(path + image)
        img = imresize(img, dim)
        loadedImages[counter, :, :] = img
        counter = counter + 1

    return loadedImages

def organise_data_from_folders(ref_path, folders, n_images) :
    dim = configuration['image_size']
    n_channels = configuration['n_channels']

    prev = 0
    x_train = np.empty((n_images, dim, dim, n_channels))
    for i in range(len(folders)) :
        data = loadImages(ref_path.format(folders[i]))
        length = len(data)
        x_train[prev : prev + length, :, :, :] = data
        prev = prev + length
    
    return x_train

def create_label_vector(images_per_class) :
    classes = range(configuration['n_classes'])
    y_train = np.array([i for i in classes for j in range(images_per_class)])
    return np_utils.to_categorical(y_train)

# Architectures

In [None]:
from keras.applications.inception_v3 import InceptionV3
from keras.applications.vgg16 import VGG16
from keras.applications.vgg19 import VGG19
from keras.applications.resnet50 import ResNet50
from keras.preprocessing import image
from keras.models import Model
from keras.layers import Dense, GlobalAveragePooling2D
from keras import backend as K

K.set_image_dim_ordering('tf')

def generate_model(num_classes, base_model) :
    x = base_model.output
    x = GlobalAveragePooling2D()(x)

    x = Dense(1024, activation='relu')(x)
    
    x = Dense(256, activation='relu')(x)

    predictions = Dense(num_classes, activation='softmax')(x)

    model = Model(inputs=base_model.input, outputs=predictions)

    for layer in base_model.layers:
        layer.trainable = False

    model.compile(
        optimizer='Adam', loss='categorical_crossentropy')

    return model

def generate_VGG_16_model(num_classes) :
    return generate_model(
        num_classes, VGG16(weights='imagenet', include_top=False))

def generate_VGG_19_model(num_classes) :
    return generate_model(
        num_classes, VGG19(weights='imagenet', include_top=False))

def generate_ResNet_50_model(num_classes) :
    return generate_model(
        num_classes, ResNet50(weights='imagenet', include_top=False))

def generate_inception_model(num_classes) :
    return generate_model(
        num_classes, InceptionV3(weights='imagenet', include_top=False))

from keras.layers import Dense
from keras.layers import Dropout
from keras.layers import Flatten
from keras.layers import average
from keras.layers import Input
from keras.layers.convolutional import Conv2D
from keras.layers.convolutional import MaxPooling2D
from keras.layers.convolutional import ZeroPadding2D

K.set_image_dim_ordering('th')

def generate_histonet_model(num_classes):
    r_input = Input(shape=(1, 25, 25))
    g_input = Input(shape=(1, 25, 25))
    b_input = Input(shape=(1, 25, 25))

    red = Conv2D(16, kernel_size=(5, 5), padding='valid', activation='relu')(r_input)
    red = ZeroPadding2D(padding=(1, 1))(red)
    red = MaxPooling2D(pool_size=(2, 2))(red)
    red = Conv2D(24, kernel_size=(3, 3), padding='valid', activation='relu')(red)
    red = ZeroPadding2D(padding=(1, 1))(red)
    red = MaxPooling2D(pool_size=(2, 2))(red)
    red = Conv2D(48, kernel_size=(3, 3), padding='valid', activation='relu')(red)
    red = Flatten()(red)
    red = Dropout(0.3)(red)

    green = Conv2D(16, kernel_size=(5, 5), padding='valid', activation='relu')(g_input)
    green = ZeroPadding2D(padding=(1, 1))(green)
    green = MaxPooling2D(pool_size=(2, 2))(green)
    green = Conv2D(24, kernel_size=(3, 3), padding='valid', activation='relu')(green)
    green = ZeroPadding2D(padding=(1, 1))(green)
    green = MaxPooling2D(pool_size=(2, 2))(green)
    green = Conv2D(48, kernel_size=(3, 3), padding='valid', activation='relu')(green)
    green = Flatten()(green)
    green = Dropout(0.3)(green)

    blue = Conv2D(16, kernel_size=(5, 5), padding='valid', activation='relu')(b_input)
    blue = ZeroPadding2D(padding=(1, 1))(blue)
    blue = MaxPooling2D(pool_size=(2, 2))(blue)
    blue = Conv2D(24, kernel_size=(3, 3), padding='valid', activation='relu')(blue)
    blue = ZeroPadding2D(padding=(1, 1))(blue)
    blue = MaxPooling2D(pool_size=(2, 2))(blue)
    blue = Conv2D(48, kernel_size=(3, 3), padding='valid', activation='relu')(blue)
    blue = Flatten()(blue)
    blue = Dropout(0.3)(blue)

    comb = average([red, green, blue])
    prediction = Dense(256, activation='relu')(comb)
    prediction = Dense(num_classes, activation='softmax')(prediction)

    model = Model(inputs=[r_input, g_input, b_input], outputs=[prediction])
    model.compile(optimizer='Adam', loss='categorical_crossentropy')
    return model

def build_model(model_name, num_classes) :
    if model_name == 'vgg16' :
        return generate_VGG_16_model(num_classes)
    elif model_name == 'vgg19' :
        return generate_VGG_19_model(num_classes)
    elif model_name == 'resnet' :
        return generate_ResNet_50_model(num_classes)
    elif model_name == 'inception' :
        return generate_inception_model(num_classes)
    elif model_name == 'histonet' :
        return generate_histonet_model(num_classes)
    else :
        raise NameError('Incorrect model name')

# 1. Training

## Reading training dat

In [None]:
ref_path_train = 'training_set/{}/'

train_folders_1 = ['ArteriaElasticaMCL600', 'CorazonMCL600', 'ArteriaMuscularMCL600',
                'RegionesLuz600', 'ArteriaVenaCorazonCL600', 'VenaGranCalibreMCL600']

train_folders_2 = ['ArteriaElasticaMCLTest600', 'CorazonMCLTest600', 'ArteriaMuscularMCLTest600',
                'RegionesLuzTest600', 'ArteriaVenaCorazonCLTest600', 'VenaGranCalibreMCLTest600']

samples_per_folder_1 = 700
samples_per_folder_2 = 300

n_images_folder_1 = samples_per_folder_1 * configuration['n_classes']
n_images_folder_2 = samples_per_folder_2 * configuration['n_classes']

X_train_RGB = np.vstack((
    organise_data_from_folders(ref_path_train, train_folders_1, n_images_folder_1),
    organise_data_from_folders(ref_path_train, train_folders_2, n_images_folder_2)))

y_train = np.vstack((
    create_label_vector(samples_per_folder_1),
    create_label_vector(samples_per_folder_2)))

## Normalisation

In [None]:
mean_R = X_train_RGB[:, :, :, 0].mean()
mean_G = X_train_RGB[:, :, :, 1].mean()
mean_B = X_train_RGB[:, :, :, 2].mean()
std_R = X_train_RGB[:, :, :, 0].std()
std_G = X_train_RGB[:, :, :, 1].std()
std_B = X_train_RGB[:, :, :, 2].std()
X_train_RGB[:, :, :, 0] = (X_train_RGB[:, :, :, 0] - mean_R) / std_R
X_train_RGB[:, :, :, 1] = (X_train_RGB[:, :, :, 1] - mean_G) / std_G
X_train_RGB[:, :, :, 2] = (X_train_RGB[:, :, :, 2] - mean_B) / std_B

## Training the model

In [None]:
from keras.callbacks import ModelCheckpoint, EarlyStopping

model = build_model(
    configuration['model_name'], configuration['n_classes'])

stopper = EarlyStopping(
    patience=configuration['stopper_patience'])
checkpointer = ModelCheckpoint(
    filepath=configuration['checkpoint_file'], verbose=0, save_best_only=True)

if configuration['model_name'] == 'histonet' :
    dim = configuration['image_size']
    X_train_R = X_train_RGB[:, :, :, 0].reshape((-1, 1, dim, dim))
    X_train_G = X_train_RGB[:, :, :, 1].reshape((-1, 1, dim, dim))
    X_train_B = X_train_RGB[:, :, :, 2].reshape((-1, 1, dim, dim))

    model.fit(
        [X_train_R, X_train_G, X_train_B], y_train,
        epochs = configuration['epochs'],
        validation_split = configuration['val_split'],
        verbose = 2,
        callbacks = [checkpointer, stopper])
else :
    model.fit(
        X_train_RGB, y_train,
        epochs = configuration['epochs'],
        validation_split = configuration['val_split'],
        verbose = 2,
        callbacks = [checkpointer, stopper])

# 2. Testing

## Loading trained model

In [None]:
model = build_model(
    configuration['model_name'], configuration['n_classes'])

model.load_weights(configuration['checkpoint_file'])

## Reading test data

In [None]:
ref_path_test = 'test_set/{}/'

test_folders = ['AE/2', 'AE/4', 'AE/6', 'AE/7', 'AE/8', 'AE/10', 
                'AM/1', 'AM/2', 'AM/4', 'AM/3', 'AM/7',
                'CO/1', 'CO/2', 'CO/3','CO/4','CO/5','CO/6','CO/7', 'CO/8', 'CO/9', 'CO/10',
                'VGC/1', 'VGC/7', 'VGC/9', 'VGC/10']

samples_per_test_folder = 300
n_images_test_folder = len(test_folders) * samples_per_test_folder

X_test_RGB = organise_data_from_folders(
    ref_path_test, test_folders, n_images_test_folder)

## Preprocessing

In [None]:
X_test_RGB[:, :, :, 0] = (X_test_RGB[:, :, :, 0] - mean_R) / std_R
X_test_RGB[:, :, :, 1] = (X_test_RGB[:, :, :, 1] - mean_G) / std_G
X_test_RGB[:, :, :, 2] = (X_test_RGB[:, :, :, 2] - mean_B) / std_B

## Classification

In [None]:
if configuration['model_name'] == 'histonet' :
    dim = configuration['image_size']
    X_test_R = X_test_RGB[:, :, :, 0].reshape((-1, 1, dim, dim))
    X_test_G = X_test_RGB[:, :, :, 1].reshape((-1, 1, dim, dim))
    X_test_B = X_test_RGB[:, :, :, 2].reshape((-1, 1, dim, dim))

    y_obtained = model.predict([X_test_R, X_test_G, X_test_B], verbose=2)

else :
    y_obtained = model.predict(X_test_RGB, verbose=2)

y_pred = np.argmax(y_obtained, axis=1)