我们从Python开源项目中,提取了以下26个代码示例,用于说明如何使用keras.callbacks()。
def start_train(self): """ Starts to Train the entire Model Based on set Parameters """ # 1, Prep callback = [EarlyStopping(patience=self.Patience), ReduceLROnPlateau(patience=5, verbose=1), CSVLogger(filename=self.rnn_type+'log.csv'), ModelCheckpoint(self.rnn_type + '_' + self.dataset + '.check', save_best_only=True, save_weights_only=True)] # 2, Train self.model.fit(x = [self.train[0],self.train[1]], y = self.train[2], batch_size = self.BatchSize, epochs = self.MaxEpoch, validation_data=([self.test[0], self.test[1]], self.test[2]), callbacks = callback) # 3, Evaluate self.model.load_weights(self.rnn_type + '_' + self.dataset + '.check') # revert to the best model self.evaluate_on_test()
def lengthy_test(model, testrange=[5,10,20,40,80], epochs=100, verboose=True): ts = datetime.now().strftime("%Y-%m-%d_%H:%M:%S") log_path = LOG_PATH_BASE + ts + "_-_" + model.name tensorboard = TensorBoard(log_dir=log_path, write_graph=False, #This eats a lot of space. Enable with caution! #histogram_freq = 1, write_images=True, batch_size = model.batch_size, write_grads=True) model_saver = ModelCheckpoint(log_path + "/model.ckpt.{epoch:04d}.hdf5", monitor='loss', period=1) callbacks = [tensorboard, TerminateOnNaN(), model_saver] for i in testrange: acc = test_model(model, sequence_length=i, verboose=verboose) print("the accuracy for length {0} was: {1}%".format(i,acc)) train_model(model, epochs=epochs, callbacks=callbacks, verboose=verboose) for i in testrange: acc = test_model(model, sequence_length=i, verboose=verboose) print("the accuracy for length {0} was: {1}%".format(i,acc)) return
def train_model(model, epochs=10, min_size=5, max_size=20, callbacks=None, verboose=False): input_dim = model.input_dim output_dim = model.output_dim batch_size = model.batch_size sample_generator = get_sample(batch_size=batch_size, in_bits=input_dim, out_bits=output_dim, max_size=max_size, min_size=min_size) if verboose: for j in range(epochs): model.fit_generator(sample_generator, steps_per_epoch=10, epochs=j+1, callbacks=callbacks, initial_epoch=j) print("currently at epoch {0}".format(j+1)) for i in [5,10,20,40]: test_model(model, sequence_length=i, verboose=True) else: model.fit_generator(sample_generator, steps_per_epoch=10, epochs=epochs, callbacks=callbacks) print("done training")
def train(self, train_data, validation_data, folder): context_data, question_data, answer_data, y_train = train_data context_data_v, question_data_v, answer_data_v, y_val = validation_data print("Model Fitting") filepath = folder + "structures/cos-lstm-nn" + VERSION + "-final-{epoch:02d}-{val_acc:.2f}.hdf5" checkpoint = ModelCheckpoint(filepath, monitor='val_acc', verbose=0, save_best_only=True, mode='max') model_json = self.model.to_json() with open(folder + "/structures/cos-lstm-model" + VERSION + ".json", "w") as json_file: json_file.write(model_json) self.model.summary() import numpy as np context_data = np.array(list(map(lambda x: x[:MAX_SEQUENCE_LENGTH_C], context_data))) context_data_v = np.array(list(map(lambda x: x[:MAX_SEQUENCE_LENGTH_C], context_data_v))) self.model.fit({'context': context_data, 'question': question_data, 'answer': answer_data}, y_train, validation_data=({'context': context_data_v, 'question': question_data_v, 'answer': answer_data_v}, y_val), epochs=50, batch_size=256, callbacks=[checkpoint], verbose=2)
def train_with_data_augmentation(self, batch_size, num_epoch, lr_schedule): datagen = ImageDataGenerator( width_shift_range=0.125, # randomly shift images horizontally, fraction height_shift_range=0.125, # randomly shift images vertically, fraction horizontal_flip=True) opt = keras.optimizers.SGD(lr=lr_schedule(0), momentum=0.9, nesterov=True) callback_list = [LearningRateScheduler(lr_schedule)] self.ae.compile(optimizer=opt, loss='mse') assert False, 'seems that y is not augmented.' # history = self.ae.fit_generator( # datagen.flow( # self.dataset.train_xs, # self.dataset.train_xs, # nb_epoch=num_epoch, # batch_size=batch_size, # validation_data=(self.dataset.test_xs, self.dataset.test_xs), # shuffle=True, callbacks=callback_list) self.history = history.history
def train_on_memory(model, memory_container, episode): # container contains [(observation0, rewards_vec0), (observation1, rewards_vec1),...] # x contains model fits X which is the states, y contains model fits y which is the rewards_vec in every state x = np.array(memory_container['observations']) y = np.array(memory_container['rewards']) early_stopping = EarlyStopping(monitor='val_loss', min_delta=0, verbose=1, patience=2) model.fit(x, y, epochs=3000, validation_split=0.3, callbacks=[early_stopping]) return model
def main(_): pp.pprint(flags.FLAGS.__flags) sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0) if not os.path.isdir(FLAGS.checkpoint): os.mkdir(FLAGS.checkpoint) if not os.path.isdir(FLAGS.log): os.mkdir(FLAGS.log) model = genChipModel() model.summary() opt = keras.optimizers.rmsprop(lr=0.001, decay=1e-6) model.compile(optimizer=opt, loss='categorical_crossentropy', metrics=['accuracy'])#'categorical_crossentropy', metrics=['accuracy']) filename = '../../data/finalData.txt' x, y = readData(filename) x_train, y_train, x_test, y_test = init(x, y) y_train_labels = to_categorical(y_train, num_classes=79) y_test_labels = to_categorical(y_test, num_classes=79) model_path = os.path.join(FLAGS.checkpoint, "weights.hdf5") callbacks = [ ModelCheckpoint(filepath=model_path, monitor="val_acc", save_best_only=True, save_weights_only=True), TensorBoard(log_dir=FLAGS.log), ReduceLROnPlateau(monitor='val_acc', factor=0.5, patience=2) ] hist = model.fit(x_train, y_train_labels, epochs=FLAGS.epoch, batch_size=100, validation_data=(x_test, y_test_labels), callbacks=callbacks) loss, accuracy = model.evaluate(x_test, y_test_labels, batch_size=100, verbose=1)
def fit(self): import time tic = time.time() logger.info("Start train model {}\n".format(self.config.name)) hist = self.model.fit(self.config.dataset['train_x'], self.config.dataset['train_y'], # validation_split=0.2, validation_data=(self.config.dataset['test_x'], self.config.dataset['test_y']), verbose=self.config.verbose, batch_size=self.config.batch_size, epochs=self.config.epochs, callbacks=[self.config.lr_reducer, self.config.csv_logger, self.config.early_stopper, TensorBoard(log_dir=self.config.tf_log_path, # histogram_freq=20, # batch_size=32, # write_graph=True, # write_grads=True, # write_images=True, # embeddings_freq=0 )] ) # todo do we need earlystop? logger.info("Fit model {} Consume {}:".format(self.config.name, time.time() - tic)) return hist
def main(): RUN_TIME = sys.argv[1] if RUN_TIME == "TRAIN": image_features = Input(shape=(4096,)) model = build_model(image_features) print model.summary() # number of training images _num_train = get_num_train_images() # Callbacks # remote_cb = RemoteMonitor(root='http://localhost:9000') tensorboard = TensorBoard(log_dir="logs/{}".format(time())) epoch_cb = EpochCheckpoint(folder="./snapshots/") valid_cb = ValidCallBack() # fit generator steps_per_epoch = math.ceil(_num_train/float(BATCH)) print "Steps per epoch i.e number of iterations: ",steps_per_epoch train_datagen = data_generator(batch_size=INCORRECT_BATCH, image_class_ranges=TRAINING_CLASS_RANGES) history = model.fit_generator( train_datagen, steps_per_epoch=steps_per_epoch, epochs=250, callbacks=[tensorboard, valid_cb] ) print history.history.keys() elif RUN_TIME == "TEST": from keras.models import load_model model = load_model("snapshots/epoch_49.hdf5", custom_objects={"hinge_rank_loss":hinge_rank_loss}) K.clear_session()
def train(self): checkpointer = ModelCheckpoint(filepath='./best.model', verbose=1, monitor='val_loss', save_best_only=True) earlystop = EarlyStopping(monitor='val_loss', patience=10, verbose=1, mode='auto') self.model.compile(loss='categorical_crossentropy', optimizer='adadelta', metrics=['accuracy']) json_model = self.model.to_json() fjson = open('model.json', 'w') fjson.write(json_model) fjson.close() print 'model_json_saved!' train_x, train_y = pre_x_y(path='kgs-19-2017-01-new/') valid_x, valid_y = pre_x_y(path='kgs-19-2017-01-new/') print 'train_data_len: ', len(train_x) print 'valid_data_len: ', len(valid_x) self.model.fit({'x': train_x}, {'out': train_y}, batch_size=32, nb_epoch=1, shuffle=True, verbose=1, callbacks=[checkpointer, earlystop], validation_data=({'x': valid_x}, {'out': valid_y}))
def fit(self, data_stream, nvis=20, nbatch=128, niter=1000, opt=None, save_dir='./'): if opt == None: opt = Adam(lr=0.0001) if not os.path.exists(save_dir): os.makedirs(save_dir) ae = self.autoencoder ae.compile(optimizer=opt, loss='mse') vis_grid(data_stream().next(), (1, 20), '{}/sample.png'.format(save_dir)) sampleX = transform(data_stream().next()[:nvis]) vis_grid(inverse_transform(np.concatenate([sampleX, ae.predict(sampleX)], axis=0)), (2, 20), '{}/sample_generate.png'.format(save_dir)) def vis_grid_f(epoch, logs): vis_grid(inverse_transform(np.concatenate([sampleX, ae.predict(sampleX)], axis=0)), (2, 20), '{}/{}.png'.format(save_dir, epoch)) if epoch % 50 == 0: ae.save_weights('{}/{}_ae_params.h5'.format(save_dir, epoch), overwrite=True) def transform_wrapper(): for data in data_stream(): yield transform(data), transform(data) ae.fit_generator(transform_wrapper(), samples_per_epoch=nbatch, nb_epoch=niter, verbose=1, callbacks=[LambdaCallback(on_epoch_end=vis_grid_f)], )
def make_lr_scheduler(base_lr, decay_rate, epoch_rate): def lr_schedule(epoch): if epoch + 1 < epoch_rate: lr = base_lr else: lr = base_lr / (decay_rate * np.floor(epoch + 1 / rate_epochs)) return lr return keras.callbacks.LearningRateScheduler(lr_schedule)
def train(self, train_data, validation_data, folder): context_data, question_data, answer_data, y_train = train_data context_data_v, question_data_v, answer_data_v, y_val = validation_data print("Model Fitting") filepath = folder + "structures/lstm-nn" + VERSION + "-final-{epoch:02d}-{val_acc:.2f}.hdf5" checkpoint = ModelCheckpoint(filepath, monitor='val_acc', verbose=0, save_best_only=True, mode='max') model_json = self.model.to_json() with open(folder + "/structures/lstm-model" + VERSION + ".json", "w") as json_file: json_file.write(model_json) self.model.summary() self.model.fit({'context': context_data, 'question': question_data, 'answer': answer_data}, y_train, validation_data=({'context': context_data_v, 'question': question_data_v, 'answer': answer_data_v}, y_val), epochs=50, batch_size=256, callbacks=[checkpoint], verbose=2)
def train(self, train_data, validation_data, folder): context_data, question_data, answer_data, y_train = train_data context_data_v, question_data_v, answer_data_v, y_val = validation_data print("Model Fitting") filepath = folder + "structures/layered-lstm-nn" + VERSION + "-final-{epoch:02d}-{val_acc:.2f}.hdf5" checkpoint = ModelCheckpoint(filepath, monitor='val_acc', verbose=0, save_best_only=True, mode='max') model_json = self.model.to_json() with open(folder + "/structures/layered-lstm-model" + VERSION + ".json", "w") as json_file: json_file.write(model_json) self.model.summary() self.model.fit({'context': context_data, 'question': question_data, 'answer': answer_data}, y_train, validation_data=({'context': context_data_v, 'question': question_data_v, 'answer': answer_data_v}, y_val), epochs=50, batch_size=256, callbacks=[checkpoint], verbose=2)
def train(self, train_data, validation_data, folder): context_data, question_data, answer_data, y_train = train_data context_data_v, question_data_v, answer_data_v, y_val = validation_data print("Model Fitting") filepath = folder + "structures/no-context-lstm-nn" + VERSION + "-final-{epoch:02d}-{val_acc:.2f}.hdf5" checkpoint = ModelCheckpoint(filepath, monitor='val_acc', verbose=0, save_best_only=True, mode='max') model_json = self.model.to_json() with open(folder + "/structures/no-context-lstm-model" + VERSION + ".json", "w") as json_file: json_file.write(model_json) self.model.summary() self.model.fit({'question': question_data, 'answer': answer_data}, y_train, validation_data=({'question': question_data_v, 'answer': answer_data_v}, y_val), epochs=50, batch_size=256, callbacks=[checkpoint], verbose=2)
def train(self, train_data, validation_data, folder): context_data, question_data, answer_data, y_train = train_data context_data_v, question_data_v, answer_data_v, y_val = validation_data print("Model Fitting") filepath = folder + "structures/lstm-cnn-nn" + VERSION + "-final-{epoch:02d}-{val_acc:.2f}.hdf5" checkpoint = ModelCheckpoint(filepath, monitor='val_acc', verbose=0, save_best_only=True, mode='max') model_json = self.model.to_json() with open(folder + "/structures/lstm-cnn-model" + VERSION + ".json", "w") as json_file: json_file.write(model_json) self.model.summary() self.model.fit({'context': context_data, 'question': question_data, 'answer': answer_data}, y_train, validation_data=({'context': context_data_v, 'question': question_data_v, 'answer': answer_data_v}, y_val), epochs=50, batch_size=256, callbacks=[checkpoint], verbose=2)
def train(self, batch_size, num_epoch, lr_schedule): opt = keras.optimizers.SGD(lr=lr_schedule(0), momentum=0.9, nesterov=True) callback_list = [LearningRateScheduler(lr_schedule)] self.ae.compile(optimizer=opt, loss='mse') history = self.ae.fit( self.dataset.train_xs, self.dataset.train_xs, nb_epoch=num_epoch, batch_size=batch_size, validation_data=(self.dataset.test_xs, self.dataset.test_xs), shuffle=True, callbacks=callback_list) self.history = history.history
def get_callback(): def return_callback(): from keras.callbacks import ReduceLROnPlateau reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5, min_lr=0.001) return reduce_lr return return_callback
def test_experiment_fit(self, get_model, get_loss_metric, get_custom_l, get_callback_fix): new_session() data, data_val = make_data(train_samples, test_samples) model, metrics, cust_objects = prepare_model(get_model(get_custom_l), get_loss_metric, get_custom_l) expe = Experiment(model) for mod in [None, model]: for data_val_loc in [None, data_val]: expe.fit([data], [data_val_loc], model=mod, nb_epoch=2, batch_size=batch_size, metrics=metrics, custom_objects=cust_objects, overwrite=True, callbacks=get_callback_fix) expe.backend_name = 'another_backend' expe.load_model() expe.load_model(expe.mod_id, expe.data_id) assert expe.data_id is not None assert expe.mod_id is not None assert expe.params_dump is not None if K.backend() == 'tensorflow': K.clear_session() print(self)
def test_experiment_fit_gen(self, get_model, get_loss_metric, get_custom_l, get_callback_fix): new_session() model, metrics, cust_objects = prepare_model(get_model(get_custom_l), get_loss_metric, get_custom_l) model_name = model.__class__.__name__ _, data_val_use = make_data(train_samples, test_samples) expe = Experiment(model) for val in [1, data_val_use]: gen, data, data_stream = make_gen(batch_size) if val == 1: val, data_2, data_stream_2 = make_gen(batch_size) expe.fit_gen([gen], [val], nb_epoch=2, model=model, metrics=metrics, custom_objects=cust_objects, samples_per_epoch=64, nb_val_samples=128, verbose=2, overwrite=True, callbacks=get_callback_fix) close_gens(gen, data, data_stream) if val == 1: close_gens(val, data_2, data_stream_2) if K.backend() == 'tensorflow': K.clear_session() print(self)
def train(model, nnInput, labels, validation, makePlot = True, labelIndexes = RP['label_idxs']): print(' Training model...') # needed format is orthogonal to ours ''' formattedLabels = np.zeros((len(labels[0]), len(labelIndexes))) formattedValid = np.zeros((len(validation[1][labelIndexes[0]]), len(labelIndexes))) for i in range(len(labelIndexes)): for j in range(len(labels[0])): formattedLabels[j][i] = labels[labelIndexes[i]][j] for j in range(len(validation[1][labelIndexes[i]])): formattedValid[j][i] = validation[1][labelIndexes[i]][j] ''' early = keras.callbacks.EarlyStopping(monitor = 'val_loss', patience = RP['early_stop']) learningRateScheduler = keras.callbacks.LearningRateScheduler(learningRateDecayer) modelLogger = visualization.ModelLogger() history = model.fit(nnInput, labels, nb_epoch = RP['epochs'], batch_size = RP['batch'], callbacks = [early], validation_data = (validation[0], validation[1])) if makePlot: values = np.zeros((len(history.history['loss']), 2)) for i in range(len(history.history['loss'])): values[i][0] = history.history['loss'][i] values[i][1] = history.history['val_loss'][i] utility.plotLoss(values) visualization.histograms(modelLogger) print(' Model weights:') print(model.summary()) # print(model.get_weights()) print(' ...done') return len(history.history['loss'])
def train_unet(self): img_size = self.flag.image_size batch_size = self.flag.batch_size epochs = self.flag.total_epoch datagen_args = dict(featurewise_center=False, # set input mean to 0 over the dataset samplewise_center=False, # set each sample mean to 0 featurewise_std_normalization=False, # divide inputs by std of the dataset samplewise_std_normalization=False, # divide each input by its std zca_whitening=False, # apply ZCA whitening rotation_range=5, # randomly rotate images in the range (degrees, 0 to 180) width_shift_range=0.05, # randomly shift images horizontally (fraction of total width) height_shift_range=0.05, # randomly shift images vertically (fraction of total height) # fill_mode='constant', # cval=0., horizontal_flip=False, # randomly flip images vertical_flip=False) # randomly flip images image_datagen = ImageDataGenerator(**datagen_args) mask_datagen = ImageDataGenerator(**datagen_args) seed = random.randrange(1, 1000) image_generator = image_datagen.flow_from_directory( os.path.join(self.flag.data_path, 'train/IMAGE'), class_mode=None, seed=seed, batch_size=batch_size, color_mode='grayscale') mask_generator = mask_datagen.flow_from_directory( os.path.join(self.flag.data_path, 'train/GT'), class_mode=None, seed=seed, batch_size=batch_size, color_mode='grayscale') config = tf.ConfigProto() # config.gpu_options.per_process_gpu_memory_fraction = 0.9 config.gpu_options.allow_growth = True set_session(tf.Session(config=config)) model = get_unet(self.flag) if self.flag.pretrained_weight_path != None: model.load_weights(self.flag.pretrained_weight_path) if not os.path.exists(os.path.join(self.flag.ckpt_dir, self.flag.ckpt_name)): mkdir_p(os.path.join(self.flag.ckpt_dir, self.flag.ckpt_name)) model_json = model.to_json() with open(os.path.join(self.flag.ckpt_dir, self.flag.ckpt_name, 'model.json'), 'w') as json_file: json_file.write(model_json) vis = callbacks.trainCheck(self.flag) model_checkpoint = ModelCheckpoint( os.path.join(self.flag.ckpt_dir, self.flag.ckpt_name,'weights.{epoch:03d}.h5'), period=self.flag.total_epoch//10+1) learning_rate = LearningRateScheduler(self.lr_step_decay) model.fit_generator( self.train_generator(image_generator, mask_generator), steps_per_epoch= image_generator.n // batch_size, epochs=epochs, callbacks=[model_checkpoint, learning_rate, vis] )
def train(self, run_name, start_epoch, stop_epoch, img_w): words_per_epoch = 16000 val_split = 0.2 #0.2 #val_words = len(val_crop_iter) val_words = int(words_per_epoch * (val_split)) #fdir ='/Users/sofwath/Desktop/dhivehiocr/tmp/' fdir = DATA_DIR self.img_gen = TextImageGenerator(monogram_file=os.path.join(fdir, 'wordlist_mono_dhivehi.txt'), bigram_file=os.path.join(fdir, 'wordlist_bi_dhivehi.txt'), minibatch_size=32, img_w=img_w, img_h=self.img_h, downsample_factor=(self.pool_size ** 2), val_split=self.words_per_epoch - self.val_words ) adam = keras.optimizers.Adam(lr=1e-4, beta_1=0.9, beta_2=0.999, epsilon=1e-08) output_dir = os.path.join(OUTPUT_DIR, run_name) labels = Input(name='the_labels', shape=[self.img_gen.absolute_max_string_len], dtype='float32') input_length = Input(name='input_length', shape=[1], dtype='int64') label_length = Input(name='label_length', shape=[1], dtype='int64') y_pred = Activation('softmax', name='softmax')(self.inner) loss_out = Lambda(ctc_lambda_func, output_shape=(1,), name='ctc')([y_pred, labels, input_length, label_length]) self.model = Model(inputs=[self.input_data, labels, input_length, label_length], outputs=loss_out) self.model.compile(loss={'ctc': lambda y_true, y_pred: y_pred}, optimizer=adam) if start_epoch > 0: weight_file = os.path.join(OUTPUT_DIR, os.path.join(run_name, 'weights%02d.h5' % (start_epoch - 1))) self.model.load_weights(weight_file) viz_cb = VizCallback(run_name, self.test_func, self.img_gen.next_val(),self.model) # use this (uncomment below) for test runs #self.model.fit_generator(generator=self.img_gen.next_train(), steps_per_epoch=(self.words_per_epoch - self.val_words)/self.minibatch_size, # epochs=stop_epoch, validation_data=self.img_gen.next_val(), validation_steps=self.val_words/self.minibatch_size, # callbacks=[viz_cb, self.img_gen], initial_epoch=start_epoch, verbose=1) self.model.fit_generator(generator=self.img_gen.next_train(), steps_per_epoch=(self.words_per_epoch - self.val_words), epochs=stop_epoch, validation_data=self.img_gen.next_val(), validation_steps=self.val_words, callbacks=[viz_cb, self.img_gen], initial_epoch=start_epoch, verbose=1)
def train(trainset, valset, path_data, path_session, hyper_param): """Execute a single training task. Returns: model: /path/to/best_model as measured by validation's loss loss: the loss computed on the validation set acc: the accuracy computed on the validation set """ session_id = os.path.basename(path_session) model_cp = keras.callbacks.ModelCheckpoint( os.path.join(path_session, "{}_model.hdf5".format(session_id)), monitor="val_loss", save_best_only=True) # train model = _get_model(hyper_param["optimizer"], hyper_param["batch_norm"], pool_type=hyper_param["pool_type"], dropout_rate=hyper_param["dropout_rate"]) history = model.fit_generator( _sample_generator(trainset, path_data, hyper_param["batch_sz"]), steps_per_epoch=int(len(trainset) / hyper_param["batch_sz"]), epochs=hyper_param["epochs"], validation_data=_sample_generator(valset, path_data, 2), validation_steps=int(len(valset) / 2), callbacks=[model_cp, hyper_param["lr_schedule"]], verbose=1, workers=4) # plot training curves def plot_history(metric): plt.ioff() str_metric = "accuracy" if metric == "acc" else "loss" plt.plot(history.history[metric]) plt.plot(history.history["val_{}".format(metric)]) plt.title("model {}".format(str_metric)) plt.ylabel(str_metric) plt.xlabel("epoch") plt.legend(["train", "test"], loc="upper left") plt.savefig(os.path.join(path_session, "{}_{}.png".format(session_id, str_metric))) plot_history("loss") plt.cla() plot_history("acc") with open(os.path.join(path_session, "{}_history.pkl".format(session_id)), 'wb') as output: pickle.dump(history.history, output, pickle.HIGHEST_PROTOCOL) # output model and performance measures ind_min_loss = np.argmin(history.history["val_loss"]) return (os.path.join(path_session, "{}.hdf5".format(session_id)), history.history["val_loss"][ind_min_loss], history.history["val_acc"][ind_min_loss])
def test_experiment_fit_async(self, get_model, get_loss_metric, get_custom_l, get_callback_fix): new_session() data, data_val = make_data(train_samples, test_samples) model, metrics, cust_objects = prepare_model(get_model(get_custom_l), get_loss_metric, get_custom_l) cust_objects['test_list'] = [1, 2] expe = Experiment(model) expected_value = 2 for mod in [None, model]: for data_val_loc in [None, data_val]: _, thread = expe.fit_async([data], [data_val_loc], model=mod, nb_epoch=2, batch_size=batch_size, metrics=metrics, custom_objects=cust_objects, overwrite=True, verbose=2, callbacks=get_callback_fix) thread.join() for k in expe.full_res['metrics']: if 'iter' not in k: assert len( expe.full_res['metrics'][k]) == expected_value if data_val_loc is not None: for k in expe.full_res['metrics']: if 'val' in k and 'iter' not in k: assert None not in expe.full_res['metrics'][k] else: for k in expe.full_res['metrics']: if 'val' in k and 'iter' not in k: assert all([np.isnan(v) for v in expe.full_res['metrics'][k]]) if K.backend() == 'tensorflow': K.clear_session() print(self)