import matplotlib.pyplot as plt import matplotlib.image as mpimg import os import shutil import random import numpy as np import tensorflow as tf from datetime import datetime from keras import backend as K import matplotlib.pyplot as plt import matplotlib.image as mpimg import segmentation_models as sm from keras.models import load_model from tensorflow.keras.models import Model, load_model from tensorflow.keras.layers import Input, Conv2D, RandomFlip, RandomRotation from tensorflow.keras.optimizers import Adam, SGD, RMSprop from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau, TensorBoard from unet_util import dice_coef_loss, dice_coef, jacard_coef, dice_coef_loss, Residual_CNN_block, multiplication, attention_up_and_concatenate, multiplication2, attention_up_and_concatenate2, UNET_224, evaluate_prediction_result ################################################################ ################# Load Validation Data ######################### ################################################################ # def load_img(filename, map_dir, legend_dir, seg_dir): # mapName = tf.strings.join([map_dir, filename[0]], separator='/') # legendName = tf.strings.join([legend_dir, filename[1]], separator='/') # map_img = tf.io.read_file(mapName) # map_img = tf.cast(tf.io.decode_png(map_img), dtype=tf.float32) / 255.0 # legend_img = tf.io.read_file(legendName) # legend_img = tf.cast(tf.io.decode_png(legend_img), dtype=tf.float32) / 255.0 # map_img = tf.concat(axis=2, values=[map_img, legend_img]) # map_img = map_img*2.0 - 1.0 # map_img = tf.image.resize(map_img, [256, 256]) # segName = tf.strings.join([seg_dir, filename[0]], separator='/') # seg_img = tf.io.read_file(segName) # seg_img = tf.io.decode_png(seg_img) # seg_img = tf.image.resize(seg_img, [256, 256]) # return map_img, seg_img # def load_validation_img(filename): # return load_img(filename, # '/projects/bbym/shared/all_patched_data/validation/poly/map_patches', # # '/projects/bbym/nathanj/validation/legend', # processed legends # '/projects/bbym/shared/all_patched_data/validation/poly/legend', # unprocessed legends # '/projects/bbym/shared/all_patched_data/validation/poly/seg_patches') # validate_map_file = os.listdir('/projects/bbym/shared/all_patched_data/validation/poly/map_patches') # # Pre-filter map files based on existence of corresponding legend files # validate_map_legend_names = [(x, '_'.join(x.split('_')[0:-2])+'.png') # for x in validate_map_file # if os.path.exists(os.path.join( # # '/projects/bbym/nathanj/validation/legend', # processed legends # '/projects/bbym/shared/all_patched_data/validation/poly/legend' # unprocessed legends # '_'.join(x.split('_')[0:-2])+'.png'))] # validate_dataset = tf.data.Dataset.from_tensor_slices(validate_map_legend_names) # validate_dataset = validate_dataset.map(load_validation_img) # validate_dataset = validate_dataset.batch(50) def load_img(filename, map_dir, legend_dir, seg_dir): mapName = tf.strings.join([map_dir, filename[0]], separator='/') legendName = tf.strings.join([legend_dir, filename[1]], separator='/') map_img = tf.io.read_file(mapName) map_img = tf.cast(tf.io.decode_png(map_img), dtype=tf.float32) / 255.0 legend_img = tf.io.read_file(legendName) legend_img = tf.cast(tf.io.decode_png(legend_img), dtype=tf.float32) / 255.0 map_img = tf.concat(axis=2, values=[map_img, legend_img]) map_img = map_img*2.0 - 1.0 map_img = tf.image.resize(map_img, [256, 256]) segName = tf.strings.join([seg_dir, filename[0]], separator='/') seg_img = tf.io.read_file(segName) seg_img = tf.io.decode_png(seg_img) seg_img = tf.image.resize(seg_img, [256, 256]) return map_img, seg_img def load_validation_img(filename): return load_img(filename, '/projects/bbym/shared/all_patched_data/validation/poly/map_patches', '/projects/bbym/shared/all_patched_data/validation/poly/legend', '/projects/bbym/shared/all_patched_data/validation/poly/seg_patches') validate_map_file = os.listdir('/projects/bbym/shared/all_patched_data/validation/poly/map_patches') # Pre-filter map files based on existence of corresponding legend files validate_map_legend_names = [(x, '_'.join(x.split('_')[0:-2])+'.png') for x in validate_map_file if os.path.exists(os.path.join('/projects/bbym/shared/all_patched_data/validation/poly/legend', '_'.join(x.split('_')[0:-2])+'.png'))] validate_dataset = tf.data.Dataset.from_tensor_slices(validate_map_legend_names) validate_dataset = validate_dataset.map(load_validation_img) validate_dataset = validate_dataset.batch(50) print("Load Data Done!") ################################################################ ##### Prepare the model configurations ######################### ################################################################ name_id = 'unproecessed_legends' #You can change the id for each run so that all models and stats are saved separately. input_data = './samples/' prediction_path = './predicts_'+name_id+'/' log_path = './logs_'+name_id+'/' model_path = './models_'+name_id+'/' save_model_path = './models_'+name_id+'/' # Create the folder if it does not exist os.makedirs(input_data, exist_ok=True) os.makedirs(model_path, exist_ok=True) os.makedirs(prediction_path, exist_ok=True) # Avaiable backbones for Unet architechture # 'vgg16' 'vgg19' 'resnet18' 'resnet34' 'resnet50' 'resnet101' 'resnet152' 'inceptionv3' # 'inceptionresnetv2' 'densenet121' 'densenet169' 'densenet201' 'seresnet18' 'seresnet34' # 'seresnet50' 'seresnet101' 'seresnet152', and 'attentionUnet' backend = 'attentionUnet' name = 'Unet-'+ backend finetune = False if (finetune): name += "_ft" logdir = log_path + name if(os.path.isdir(logdir)): shutil.rmtree(logdir) os.makedirs(logdir, exist_ok=True) tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=logdir) print('model location: '+ model_path+name+'.h5') # Load the best model saved by the callback module from keras.models import load_model if(backend != "attentionUnet"): model = load_model(model_path+name+'.h5', custom_objects={'dice_coef_loss':dice_coef_loss, 'dice_coef':dice_coef,}) else: model = load_model(model_path+name+'.h5', custom_objects={'multiplication': multiplication, 'multiplication2': multiplication2, 'dice_coef_loss':dice_coef_loss, 'dice_coef':dice_coef,}) print("Load Model Done!") def f1_score(y_true, y_pred): # Dice coefficient smooth = 1. y_true_f = K.flatten(y_true) y_pred_f = K.flatten(y_pred) intersection = K.sum(y_true_f * y_pred_f) return (2. * intersection + smooth) / (K.sum(y_true_f) + K.sum(y_pred_f) + smooth) model.compile(optimizer = Adam(), loss = dice_coef_loss, metrics=[dice_coef,'accuracy', f1_score]) eval_results = model.evaluate(validate_dataset, verbose=1) print(eval_results) # print(f'Validation F1 score: {f1}') # If validate_dataset is a tf.data.Dataset instance def plotResult(fileName, save_dir): test_dataset = tf.data.Dataset.from_tensor_slices([fileName]) test_dataset = test_dataset.map(load_validation_img) test_dataset = test_dataset.batch(1) # Extracting and visualizing the first image from the first batch for batch in test_dataset.take(1): # Taking one batch from the dataset input_test = batch[0] # Extracting the first image from the batch print(input_test.shape) predicted = model.predict(test_dataset) print(predicted.shape) # Thresholding the predicted result to get binary values threshold = 0.5 # you can adjust this value based on your requirement predicted_binary = (predicted > threshold).astype(np.uint8) # convert boolean to integer (1 or 0) mapName = '/projects/bbym/shared/all_patched_data/validation/poly/map_patches/' + fileName[0] segName = '/projects/bbym/shared/all_patched_data/validation/poly/seg_patches/' + fileName[0] legendName = '/projects/bbym/shared/all_patched_data/validation/poly/legend/' + fileName[1] # legendName = '/projects/bbym/nathanj/validation/legend/' + fileName[1] map_img = mpimg.imread(mapName) seg_img = mpimg.imread(segName) label_img = mpimg.imread(legendName) # Set the figure size plt.figure(figsize=(10, 2)) # Plot map image plt.subplot(1, 5, 1) plt.title("map") plt.imshow(input_test[0,:,:,:3]) # Plot legend image plt.subplot(1, 5, 2) plt.title("legend") plt.imshow(input_test[0,:,:,3:]) # Plot true segmentation image plt.subplot(1, 5, 3) plt.title("true segmentation") plt.imshow(seg_img, cmap='gray') # Plot predicted segmentation image plt.subplot(1, 5, 4) plt.title("predicted segmentation") plt.imshow(predicted_binary[0, :, :, 0]*255, cmap='gray') # Plot error image plt.subplot(1, 5, 5) plt.title("error") # Normalize both images to the range [0, 1] if they aren't already seg_img_normalized = seg_img / 255.0 if seg_img.max() > 1 else seg_img predicted_normalized = predicted_binary[0, :, :, 0] if predicted_binary.max() <= 1 else predicted_binary[0, :, :, 0] / 255.0 # Calculate the error image # error_img = seg_img_normalized - predicted_normalized # simple difference error_img = np.logical_xor(predicted_binary[0, :, :, 0], seg_img) # Alternatively, for absolute difference: # error_img = np.abs(seg_img_normalized - predicted_normalized) # Display the error image cax = plt.imshow(error_img, cmap='gray') # Set the color scale limits if necessary # cax.set_clim(vmin=-1, vmax=1) # adjust as needed # # Add color bar to help interpret the error image # cbar = plt.colorbar(cax, orientation='vertical', shrink=0.75) # cbar.set_label('Error Magnitude', rotation=270, labelpad=15) # Save the entire figure plt.savefig(prediction_path + fileName[0] + '.png') # Close the figure to release resources plt.close() n=20 for fileName in random.sample(validate_map_legend_names, n): print(fileName) plotResult(fileName, prediction_path) print("Save Images Done!")