我以为大多数框架(例如keras / tensorflow / …)会自动使用所有CPU内核,但实际上似乎并非如此。我只是发现很少的资源可以导致我们在深度学习过程中使用CPU的全部容量。我找到了一篇有关
from multiprocessing import Pool import psutil import ray
另一方面,基于在多个过程中使用keras模型的答案,没有上述库的踪迹。是否有更优雅的方式利用Keras的 多处理功能 ,因为它在实施中非常受欢迎。
例如,如何在学习过程中通过简单的RNN实施进行修改,以达到至少50%的CPU容量?
我应该使用第二模型作为多任务处理(如LSTM)吗?我的意思是我们可以通过使用更多的CPU能力同时管理运行多模型吗?
import numpy as np import pandas as pd import matplotlib.pyplot as plt
from keras.layers.normalization import BatchNormalization from sklearn.preprocessing import MinMaxScaler from sklearn.metrics import mean_squared_error from sklearn.metrics import accuracy_score from sklearn.model_selection import train_test_split from keras.layers import Dense from keras.layers import Dropout from keras.layers import LSTM,SimpleRNN from keras.models import Sequential from keras.optimizers import Adam, RMSprop
df = pd.read_csv(“D:\Train.csv”, header=None)
index = [i for i in list(range(1440)) if i%3==2]
Y_train= df[index] df = df.values
def create_dataset(dataset,data_train,look_back=1): dataX,dataY = [],[] print(“Len:”,len(dataset)-look_back-1) for i in range(len(dataset)-look_back-1): a = dataset[i:(i+look_back), :] dataX.append(a) dataY.append(data_train[i + look_back, :]) return np.array(dataX), np.array(dataY)
Y_train=np.array(Y_train) df=np.array(df)
look_back = 10 trainX,trainY = create_dataset(df,Y_train, look_back=look_back)
trainX, testX, trainY, testY = train_test_split(trainX,trainY, test_size=0.2 , shuffle=False)
trainX, testX, trainY, testY = train_test_split(trainX,trainY, test_size=0.2 , shuffle=False) print(“train size: {}”.format(trainX.shape)) print(“train Label size: {}”.format(trainY.shape)) print(“test size: {}”.format(testX.shape)) print(“test Label size: {}”.format(testY.shape))
model_RNN = Sequential() model_RNN.add(SimpleRNN(units=1440, input_shape=(trainX.shape[1], trainX.shape[2]))) model_RNN.add(Dense(960)) model_RNN.add(BatchNormalization()) model_RNN.add(Activation(‘tanh’))
model_RNN.compile(loss=’mean_squared_error’, optimizer=’adam’) callbacks = [ EarlyStopping(patience=10, verbose=1), ReduceLROnPlateau(factor=0.1, patience=3, min_lr=0.00001, verbose=1)]
hist_RNN=model_RNN.fit(trainX, trainY, epochs =50, batch_size =20,validation_data=(testX,testY),verbose=1, callbacks=callbacks)
Y_train=np.array(trainY) Y_test=np.array(testX)
Y_RNN_Train_pred=model_RNN.predict(trainX) Y_RNN_Test_pred=model_RNN.predict(testX)
train_MSE=mean_squared_error(trainY, Y_RNN_Train_pred) test_MSE=mean_squared_error(testY, Y_RNN_Test_pred)
f, ax = plt.subplots(figsize=(20, 15)) plt.subplot(1, 2, 1) ax=plt.plot(hist_RNN.history[‘loss’] ,label=’Train loss’) ax=plt.plot(hist_RNN.history[‘val_loss’],label=’Test/Validation/Prediction loss’) plt.xlabel(‘Training steps (Epochs = 50)’) plt.ylabel(‘Loss (MSE) for Sx-Sy & Sxy’) plt.title(‘ RNN Loss on Train and Test data’) plt.legend() plt.subplot(1, 2, 2) ax=plt.plot(hist_LSTM.history[‘loss’] ,label=’Train loss’) ax=plt.plot(hist_LSTM.history[‘val_loss’],label=’Test/Validation/Prediction loss’) plt.xlabel(‘Training steps (Epochs = 50)’) plt.ylabel(‘Loss (MSE) for Sx-Sy & Sxy’) plt.title(‘LSTM Loss on Train and Test data’) plt.legend()
plt.subplots_adjust(top=0.80, bottom=0.38, left=0.12, right=0.90, hspace=0.37, wspace=0.28) #plt.savefig('All_Losses_history_.png') plt.show()
请注意 ,仅当我访问没有VGA的功能强大的服务器时,我才访问 CUDA 。我的目标是利用多处理和多线程来最大程度地利用CPU而不是30%的容量,这意味着只有四核才能拥有一个核!任何建议将不胜感激。我已经上传了格式化的csv数据集。
更新: 我的硬件配置如下:
训练一个模型不会占用您全部100%的CPU是一件好事!现在,我们有空间并行训练多个模型,并加快您的总体训练时间。
注意:如果您只是想加速此模型,请查看GPU或更改超参数,例如批大小和神经元数量(层大小)。
这是用于multiprocessing同时训练多个模型的方法(使用在计算机的每个单独CPU内核上并行运行的进程)。
multiprocessing
该multiprocessing.Pool基本创建的作业池需要做。流程将拾取并运行这些作业。作业完成后,该过程将从池中提取另一个作业。
multiprocessing.Pool
import time import signal import multiprocessing def init_worker(): ''' Add KeyboardInterrupt exception to mutliprocessing workers ''' signal.signal(signal.SIGINT, signal.SIG_IGN) def train_model(layer_size): ''' This code is parallelised and runs on each process It trains a model with different layer sizes (hyperparameters) It saves the model and returns the score (error) ''' import keras from keras.models import Sequential from keras.layers import Dense print(f'Training a model with layer size {layer_size}') # build your model here model_RNN = Sequential() model_RNN.add(Dense(layer_size)) # fit the model (the bit that takes time!) model_RNN.fit(...) # lets demonstrate with a sleep timer time.sleep(5) # save trained model to a file model_RNN.save(...) # you can also return values eg. the eval score return model_RNN.evaluate(...) num_workers = 4 hyperparams = [800, 960, 1100] pool = multiprocessing.Pool(num_workers, init_worker) scores = pool.map(train_model, hyperparams) print(scores)
输出:
Training a model with layer size 800 Training a model with layer size 960 Training a model with layer size 1100 [{'size':960,'score':1.0}, {'size':800,'score':1.2}, {'size':1100,'score':0.7}]
time.sleep代码中的a很容易说明这一点。您会看到所有3个过程都开始了培训工作,然后它们都几乎同时完成。如果这是单个处理的,则必须等待每个处理完成后才能开始下一个(打哈欠!)。
time.sleep
EDIT OP还需要完整的代码。这在Stack Overflow上很困难,因为我无法在您的环境和您的代码中进行测试。我已经自由地将代码复制并粘贴到上面的模板中。您可能需要添加一些导入,但这与您获得“可运行的”和“完整的”代码非常接近。
import time import signal import numpy as np import pandas as pd import multiprocessing from sklearn.preprocessing import MinMaxScaler from sklearn.model_selection import train_test_split from sklearn.metrics import mean_squared_error from sklearn.metrics import accuracy_score def init_worker(): ''' Add KeyboardInterrupt exception to mutliprocessing workers ''' signal.signal(signal.SIGINT, signal.SIG_IGN) def train_model(model_type): ''' This code is parallelised and runs on each process It trains a model with different layer sizes (hyperparameters) It saves the model and returns the score (error) ''' from keras.layers import LSTM, SimpleRNN, Dense, Activation from keras.models import Sequential from keras.callbacks import EarlyStopping, ReduceLROnPlateau from keras.layers.normalization import BatchNormalization print(f'Training a model: {model_type}') callbacks = [ EarlyStopping(patience=10, verbose=1), ReduceLROnPlateau(factor=0.1, patience=3, min_lr=0.00001, verbose=1), ] model = Sequential() if model_type == 'rnn': model.add(SimpleRNN(units=1440, input_shape=(trainX.shape[1], trainX.shape[2]))) elif model_type == 'lstm': model.add(LSTM(units=1440, input_shape=(trainX.shape[1], trainX.shape[2]))) model.add(Dense(480)) model.add(BatchNormalization()) model.add(Activation('tanh')) model.compile(loss='mean_squared_error', optimizer='adam') model.fit( trainX, trainY, epochs=50, batch_size=20, validation_data=(testX, testY), verbose=1, callbacks=callbacks, ) # predict Y_Train_pred = model.predict(trainX) Y_Test_pred = model.predict(testX) train_MSE = mean_squared_error(trainY, Y_Train_pred) test_MSE = mean_squared_error(testY, Y_Test_pred) # you can also return values eg. the eval score return {'type': model_type, 'train_MSE': train_MSE, 'test_MSE': test_MSE} # Your code # --------- df = pd.read_csv("D:\Train.csv", header=None) index = [i for i in list(range(1440)) if i % 3 == 2] Y_train = df[index] df = df.values # making history by using look-back to prediction next def create_dataset(dataset, data_train, look_back=1): dataX, dataY = [], [] print("Len:", len(dataset) - look_back - 1) for i in range(len(dataset) - look_back - 1): a = dataset[i : (i + look_back), :] dataX.append(a) dataY.append(data_train[i + look_back, :]) return np.array(dataX), np.array(dataY) Y_train = np.array(Y_train) df = np.array(df) look_back = 10 trainX, trainY = create_dataset(df, Y_train, look_back=look_back) # Split data into train & test trainX, testX, trainY, testY = train_test_split( trainX, trainY, test_size=0.2, shuffle=False ) # My Code # ------- num_workers = 2 model_types = ['rnn', 'lstm'] pool = multiprocessing.Pool(num_workers, init_worker) scores = pool.map(train_model, model_types) print(scores)
程序输出:
[{'type': 'rnn', 'train_MSE': 0.06648435491248038, 'test_MSE': 0.062323388902691866}, {'type': 'lstm', 'train_MSE': 0.10114341514420684, 'test_MSE': 0.09998065769499974}]