小编典典

在使用Keras的深度学习中如何利用多处理和多线程的优势?

python

我以为大多数框架(例如keras / tensorflow /
…)会自动使用所有CPU内核,但实际上似乎并非如此。我只是发现很少的资源可以导致我们在深度学习过程中使用CPU的全部容量。我找到了一篇有关

from multiprocessing import Pool 
import psutil
import ray

另一方面,基于在多个过程中使用keras模型的答案,没有上述库的踪迹。是否有更优雅的方式利用Keras的
多处理功能 ,因为它在实施中非常受欢迎。

  • 例如,如何在学习过程中通过简单的RNN实施进行修改,以达到至少50%的CPU容量?

  • 我应该使用第二模型作为多任务处理(如LSTM)吗?我的意思是我们可以通过使用更多的CPU能力同时管理运行多模型吗?

    import numpy as np
    import pandas as pd
    import matplotlib.pyplot as plt

    from keras.layers.normalization import BatchNormalization
    from sklearn.preprocessing import MinMaxScaler
    from sklearn.metrics import mean_squared_error
    from sklearn.metrics import accuracy_score
    from sklearn.model_selection import train_test_split
    from keras.layers import Dense
    from keras.layers import Dropout
    from keras.layers import LSTM,SimpleRNN
    from keras.models import Sequential
    from keras.optimizers import Adam, RMSprop

    df = pd.read_csv(“D:\Train.csv”, header=None)

    index = [i for i in list(range(1440)) if i%3==2]

    Y_train= df[index]
    df = df.values

    making history by using look-back to prediction next

    def create_dataset(dataset,data_train,look_back=1):
    dataX,dataY = [],[]
    print(“Len:”,len(dataset)-look_back-1)
    for i in range(len(dataset)-look_back-1):
    a = dataset[i:(i+look_back), :]
    dataX.append(a)
    dataY.append(data_train[i + look_back, :])
    return np.array(dataX), np.array(dataY)

    Y_train=np.array(Y_train)
    df=np.array(df)

    look_back = 10
    trainX,trainY = create_dataset(df,Y_train, look_back=look_back)

    Split data into train & test

    trainX, testX, trainY, testY = train_test_split(trainX,trainY, test_size=0.2 , shuffle=False)

    Shape of train and test data

    trainX, testX, trainY, testY = train_test_split(trainX,trainY, test_size=0.2 , shuffle=False)
    print(“train size: {}”.format(trainX.shape))
    print(“train Label size: {}”.format(trainY.shape))
    print(“test size: {}”.format(testX.shape))
    print(“test Label size: {}”.format(testY.shape))

    train size: (23, 10, 1440)

    train Label size: (23, 960)

    test size: (6, 10, 1440)

    test Label size: (6, 960)

    model_RNN = Sequential()
    model_RNN.add(SimpleRNN(units=1440, input_shape=(trainX.shape[1], trainX.shape[2])))
    model_RNN.add(Dense(960))
    model_RNN.add(BatchNormalization())
    model_RNN.add(Activation(‘tanh’))

    Compile model

    model_RNN.compile(loss=’mean_squared_error’, optimizer=’adam’)
    callbacks = [
    EarlyStopping(patience=10, verbose=1),
    ReduceLROnPlateau(factor=0.1, patience=3, min_lr=0.00001, verbose=1)]

    Fit the model

    hist_RNN=model_RNN.fit(trainX, trainY, epochs =50, batch_size =20,validation_data=(testX,testY),verbose=1, callbacks=callbacks)

    predict

    Y_train=np.array(trainY)
    Y_test=np.array(testX)

    Y_RNN_Train_pred=model_RNN.predict(trainX)
    Y_RNN_Test_pred=model_RNN.predict(testX)

    train_MSE=mean_squared_error(trainY, Y_RNN_Train_pred)
    test_MSE=mean_squared_error(testY, Y_RNN_Test_pred)

    create and fit the Simple LSTM model as 2nd model for multi-tasking

    model_LSTM = Sequential()

    model_LSTM.add(LSTM(units = 1440, input_shape=(trainX.shape[1], trainX.shape[2])))

    model_LSTM.add(Dense(units = 960))

    model_LSTM.add(BatchNormalization())

    model_LSTM.add(Activation(‘tanh’))

    model_LSTM.compile(loss=’mean_squared_error’, optimizer=’adam’)

    hist_LSTM=model_LSTM.fit(trainX, trainY, epochs =50, batch_size =20,validation_data=(testX,testY),verbose=1, callbacks=callbacks)

    Y_train=np.array(trainY)

    Y_test=np.array(testX)

    Y_LSTM_Train_pred=model_LSTM.predict(trainX)

    Y_LSTM_Test_pred=model_LSTM.predict(testX)

    train_MSE=mean_squared_error(trainY, Y_LSTM_Train_pred)

    test_MSE=mean_squared_error(testY, Y_LSTM_Test_pred)

    plot losses for RNN + LSTM

    f, ax = plt.subplots(figsize=(20, 15))
    plt.subplot(1, 2, 1)
    ax=plt.plot(hist_RNN.history[‘loss’] ,label=’Train loss’)
    ax=plt.plot(hist_RNN.history[‘val_loss’],label=’Test/Validation/Prediction loss’)
    plt.xlabel(‘Training steps (Epochs = 50)’)
    plt.ylabel(‘Loss (MSE) for Sx-Sy & Sxy’)
    plt.title(‘ RNN Loss on Train and Test data’)
    plt.legend()
    plt.subplot(1, 2, 2)
    ax=plt.plot(hist_LSTM.history[‘loss’] ,label=’Train loss’)
    ax=plt.plot(hist_LSTM.history[‘val_loss’],label=’Test/Validation/Prediction loss’)
    plt.xlabel(‘Training steps (Epochs = 50)’)
    plt.ylabel(‘Loss (MSE) for Sx-Sy & Sxy’)
    plt.title(‘LSTM Loss on Train and Test data’)
    plt.legend()

    plt.subplots_adjust(top=0.80, bottom=0.38, left=0.12, right=0.90, hspace=0.37, wspace=0.28)
    #plt.savefig('All_Losses_history_.png')
    plt.show()
    

请注意 ,仅当我访问没有VGA的功能强大的服务器时,我才访问 CUDA
。我的目标是利用多处理和多线程来最大程度地利用CPU而不是30%的容量,这意味着只有四核才能拥有一个核!任何建议将不胜感激。我已经上传了格式化的csv数据集。

更新: 我的硬件配置如下:

  • 处理器:AMD A8-7650K Radeon R7 10 Compute Cores 4C + 6G 3.30 GHz
  • 内存:16GB
  • 操作系统:Win 7
  • Python版本3.6.6
  • Tensorflow版本1.8.0
  • Keras版本2.2.4

阅读 210

收藏
2020-12-20

共1个答案

小编典典

训练一个模型不会占用您全部100%的CPU是一件好事!现在,我们有空间并行训练多个模型,并加快您的总体训练时间。

注意:如果您只是想加速此模型,请查看GPU或更改超参数,例如批大小和神经元数量(层大小)。

这是用于multiprocessing同时训练多个模型的方法(使用在计算机的每个单独CPU内核上并行运行的进程)。

multiprocessing.Pool基本创建的作业池需要做。流程将拾取并运行这些作业。作业完成后,该过程将从池中提取另一个作业。

import time
import signal
import multiprocessing

def init_worker():
    ''' Add KeyboardInterrupt exception to mutliprocessing workers '''
    signal.signal(signal.SIGINT, signal.SIG_IGN)


def train_model(layer_size):
    '''
    This code is parallelised and runs on each process
    It trains a model with different layer sizes (hyperparameters)
    It saves the model and returns the score (error)
    '''
    import keras
    from keras.models import Sequential
    from keras.layers import Dense

    print(f'Training a model with layer size {layer_size}')

    # build your model here
    model_RNN = Sequential()
    model_RNN.add(Dense(layer_size))

    # fit the model (the bit that takes time!)
    model_RNN.fit(...)

    # lets demonstrate with a sleep timer
    time.sleep(5)

    # save trained model to a file
    model_RNN.save(...)

    # you can also return values eg. the eval score
    return model_RNN.evaluate(...)


num_workers = 4
hyperparams = [800, 960, 1100]

pool = multiprocessing.Pool(num_workers, init_worker)

scores = pool.map(train_model, hyperparams)

print(scores)

输出:

Training a model with layer size 800
Training a model with layer size 960
Training a model with layer size 1100
[{'size':960,'score':1.0}, {'size':800,'score':1.2}, {'size':1100,'score':0.7}]

time.sleep代码中的a很容易说明这一点。您会看到所有3个过程都开始了培训工作,然后它们都几乎同时完成。如果这是单个处理的,则必须等待每个处理完成后才能开始下一个(打哈欠!)。

EDIT OP还需要完整的代码。这在Stack
Overflow上很困难,因为我无法在您的环境和您的代码中进行测试。我已经自由地将代码复制并粘贴到上面的模板中。您可能需要添加一些导入,但这与您获得“可运行的”和“完整的”代码非常接近。

import time
import signal
import numpy as np
import pandas as pd
import multiprocessing
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
from sklearn.metrics import accuracy_score


def init_worker():
    ''' Add KeyboardInterrupt exception to mutliprocessing workers '''
    signal.signal(signal.SIGINT, signal.SIG_IGN)


def train_model(model_type):
    '''
    This code is parallelised and runs on each process
    It trains a model with different layer sizes (hyperparameters)
    It saves the model and returns the score (error)
    '''
    from keras.layers import LSTM, SimpleRNN, Dense, Activation
    from keras.models import Sequential
    from keras.callbacks import EarlyStopping, ReduceLROnPlateau
    from keras.layers.normalization import BatchNormalization

    print(f'Training a model: {model_type}')

    callbacks = [
        EarlyStopping(patience=10, verbose=1),
        ReduceLROnPlateau(factor=0.1, patience=3, min_lr=0.00001, verbose=1),
    ]

    model = Sequential()

    if model_type == 'rnn':
        model.add(SimpleRNN(units=1440, input_shape=(trainX.shape[1], trainX.shape[2])))
    elif model_type == 'lstm':
        model.add(LSTM(units=1440, input_shape=(trainX.shape[1], trainX.shape[2])))

    model.add(Dense(480))
    model.add(BatchNormalization())
    model.add(Activation('tanh'))
    model.compile(loss='mean_squared_error', optimizer='adam')
    model.fit(
        trainX,
        trainY,
        epochs=50,
        batch_size=20,
        validation_data=(testX, testY),
        verbose=1,
        callbacks=callbacks,
    )

    # predict
    Y_Train_pred = model.predict(trainX)
    Y_Test_pred = model.predict(testX)

    train_MSE = mean_squared_error(trainY, Y_Train_pred)
    test_MSE = mean_squared_error(testY, Y_Test_pred)

    # you can also return values eg. the eval score
    return {'type': model_type, 'train_MSE': train_MSE, 'test_MSE': test_MSE}


# Your code
# ---------

df = pd.read_csv("D:\Train.csv", header=None)

index = [i for i in list(range(1440)) if i % 3 == 2]

Y_train = df[index]
df = df.values

# making history by using look-back to prediction next
def create_dataset(dataset, data_train, look_back=1):
    dataX, dataY = [], []
    print("Len:", len(dataset) - look_back - 1)
    for i in range(len(dataset) - look_back - 1):
        a = dataset[i : (i + look_back), :]
        dataX.append(a)
        dataY.append(data_train[i + look_back, :])
    return np.array(dataX), np.array(dataY)


Y_train = np.array(Y_train)
df = np.array(df)

look_back = 10
trainX, trainY = create_dataset(df, Y_train, look_back=look_back)

# Split data into train & test
trainX, testX, trainY, testY = train_test_split(
    trainX, trainY, test_size=0.2, shuffle=False
)

# My Code
# -------

num_workers = 2
model_types = ['rnn', 'lstm']

pool = multiprocessing.Pool(num_workers, init_worker)

scores = pool.map(train_model, model_types)

print(scores)

程序输出:

[{'type': 'rnn', 'train_MSE': 0.06648435491248038, 'test_MSE': 0.062323388902691866}, 
 {'type': 'lstm', 'train_MSE': 0.10114341514420684, 'test_MSE': 0.09998065769499974}]
2020-12-20