Python chainer.functions 模块,huber_loss() 实例源码


项目:chainer_frmqn    作者:okdshin    | 项目源码 | 文件源码
def calc_loss(self, state, state_dash, actions, rewards, done_list):
        assert(state.shape == state_dash.shape)
        s = state.reshape((state.shape[0], reduce(lambda x, y: x*y, state.shape[1:]))).astype(np.float32)
        s_dash = state_dash.reshape((state.shape[0], reduce(lambda x, y: x*y, state.shape[1:]))).astype(np.float32)
        q = self.model.q_function(s)

        q_dash = self.model_target.q_function(s_dash)  # Q(s',*)
        max_q_dash = np.asarray(list(map(np.max,, dtype=np.float32) # max_a Q(s',a)

        target =
        for i in range(self.replay_batch_size):
            assert(self.replay_batch_size == len(done_list))
            r = np.sign(rewards[i]) if self.clipping else rewards[i]
            if done_list[i]:
                discounted_sum = r
                discounted_sum = r + self.gamma * max_q_dash[i]
            assert(self.replay_batch_size == len(actions))
            target[i, actions[i]] = discounted_sum

        loss = F.sum(F.huber_loss(Variable(target), q, delta=1.0)) #/ self.replay_batch_size
        return loss, q
项目:chainerrl    作者:chainer    | 项目源码 | 文件源码
def compute_value_loss(y, t, clip_delta=True, batch_accumulator='mean'):
    """Compute a loss for value prediction problem.

        y (Variable or ndarray): Predicted values.
        t (Variable or ndarray): Target values.
        clip_delta (bool): Use the Huber loss function if set True.
        batch_accumulator (str): 'mean' or 'sum'. 'mean' will use the mean of
            the loss values in a batch. 'sum' will use the sum.
        (Variable) scalar loss
    assert batch_accumulator in ('mean', 'sum')
    y = F.reshape(y, (-1, 1))
    t = F.reshape(t, (-1, 1))
    if clip_delta:
        loss_sum = F.sum(F.huber_loss(y, t, delta=1.0))
        if batch_accumulator == 'mean':
            loss = loss_sum / y.shape[0]
        elif batch_accumulator == 'sum':
            loss = loss_sum
        loss_mean = F.mean_squared_error(y, t) / 2
        if batch_accumulator == 'mean':
            loss = loss_mean
        elif batch_accumulator == 'sum':
            loss = loss_mean * y.shape[0]
    return loss
项目:chainerrl    作者:chainer    | 项目源码 | 文件源码
def compute_weighted_value_loss(y, t, weights,
                                clip_delta=True, batch_accumulator='mean'):
    """Compute a loss for value prediction problem.

        y (Variable or ndarray): Predicted values.
        t (Variable or ndarray): Target values.
        weights (ndarray): Weights for y, t.
        clip_delta (bool): Use the Huber loss function if set True.
        batch_accumulator (str): 'mean' will devide loss by batchsize
        (Variable) scalar loss
    assert batch_accumulator in ('mean', 'sum')
    y = F.reshape(y, (-1, 1))
    t = F.reshape(t, (-1, 1))
    if clip_delta:
        losses = F.huber_loss(y, t, delta=1.0)
        losses = F.square(y - t) / 2
    losses = F.reshape(losses, (-1,))
    loss_sum = F.sum(losses * weights)
    if batch_accumulator == 'mean':
        loss = loss_sum / y.shape[0]
    elif batch_accumulator == 'sum':
        loss = loss_sum
    return loss
项目:DeepLearning    作者:Wanwannodao    | 项目源码 | 文件源码
def mean_clipped_loss(y, t):
    # Add an axis because F.huber_loss only accepts arrays with ndim >= 2
    y = F.expand_dims(y, axis=-1)
    t = F.expand_dims(t, axis=-1)
    return F.sum(F.huber_loss(y, t, 1.0)) / y.shape[0]
项目:DeepLearning    作者:Wanwannodao    | 项目源码 | 文件源码
def mean_clipped_loss(y, t):
    # Add an axis because F.huber_loss only accepts arrays with ndim >= 2
    y = F.expand_dims(y, axis=-1)
    t = F.expand_dims(t, axis=-1)
    return F.sum(F.huber_loss(y, t, 1.0)) / y.shape[0]
项目:chainer-deconv    作者:germanRos    | 项目源码 | 文件源码
def check_forward(self, x_data, t_data):
        x = chainer.Variable(x_data)
        t = chainer.Variable(t_data)
        loss = functions.huber_loss(x, t, delta=1)
        self.assertEqual(, numpy.float32)
        loss_value = cuda.to_cpu(

        diff_data = cuda.to_cpu(x_data) - cuda.to_cpu(t_data)
        expected_result = numpy.zeros(self.shape)
        mask = numpy.abs(diff_data) < 1
        expected_result[mask] = 0.5 * diff_data[mask]**2
        expected_result[~mask] = numpy.abs(diff_data[~mask]) - 0.5
        loss_expect = numpy.sum(expected_result, axis=1)
        gradient_check.assert_allclose(loss_value, loss_expect)
项目:chainer_frmqn    作者:okdshin    | 项目源码 | 文件源码
def calc_loss_recurrent(self, frames, actions, rewards, done_list, size_list):
        # TODO self.max_step -> max_step
        s = Variable(frames.astype(np.float32))

        self.model_target.reset_state() # Refresh model_target's state
        self.model_target.q_function(s[0]) # Update target model initial state

        target_q = self.xp.zeros((self.max_step, self.replay_batch_size), dtype=np.float32)
        selected_q_tuple = [None for _ in range(self.max_step)]

        for frame in range(0, self.max_step):
            q = self.model.q_function(s[frame])
            q_dash = self.model_target.q_function(s[frame+1])  # Q(s',*): shape is (batch_size, action_num)
            max_q_dash = # max_a Q(s',a): shape is (batch_size,)
            if self.clipping:
                rs = self.xp.sign(rewards[frame])
                rs = rewards[frame]
            target_q[frame] = rs + self.xp.logical_not(done_list[frame]).astype(*(self.gamma*max_q_dash)
            selected_q_tuple[frame] = F.select_item(q, actions[frame].astype(

        enable = self.xp.broadcast_to(self.xp.arange(self.max_step), (self.replay_batch_size, self.max_step))
        size_list = self.xp.expand_dims(cuda.to_gpu(size_list), -1)
        enable = (enable < size_list).T

        selected_q = F.concat(selected_q_tuple, axis=0)

        # element-wise huber loss
        huber_loss = F.huber_loss(
                F.expand_dims(F.flatten(target_q), axis=1),
                F.expand_dims(selected_q, axis=1), delta=1.0)
        huber_loss = F.reshape(huber_loss, enable.shape)

        zeros = self.xp.zeros(enable.shape, dtype=np.float32)
        loss = F.sum(F.where(enable, huber_loss, zeros)) #/ self.replay_batch_size

        return loss
项目:SeRanet    作者:corochann    | 项目源码 | 文件源码
def clear(self):
        self.loss = None
        # self.accuracy = None

#    def forward(self, x, t):
#        self.clear()
#        #x = chainer.Variable(x_data)  # x_data.astype(np.float32)
#        #t = chainer.Variable(t_data)  # [Note]: x_data, t_data must be np.float32 type
#        #self.loss = F.huber_loss(h, t, delta= 1 / 255.)
#        self.loss = F.mean_squared_error(self(x), t)
#        # self.accuracy = F.accuracy(h, t)  # type inconpatible
#        return self.loss