我们从Python开源项目中,提取了以下8个代码示例,用于说明如何使用chainer.functions.huber_loss()。
def calc_loss(self, state, state_dash, actions, rewards, done_list): assert(state.shape == state_dash.shape) s = state.reshape((state.shape[0], reduce(lambda x, y: x*y, state.shape[1:]))).astype(np.float32) s_dash = state_dash.reshape((state.shape[0], reduce(lambda x, y: x*y, state.shape[1:]))).astype(np.float32) q = self.model.q_function(s) q_dash = self.model_target.q_function(s_dash) # Q(s',*) max_q_dash = np.asarray(list(map(np.max, q_dash.data)), dtype=np.float32) # max_a Q(s',a) target = q.data.copy() for i in range(self.replay_batch_size): assert(self.replay_batch_size == len(done_list)) r = np.sign(rewards[i]) if self.clipping else rewards[i] if done_list[i]: discounted_sum = r else: discounted_sum = r + self.gamma * max_q_dash[i] assert(self.replay_batch_size == len(actions)) target[i, actions[i]] = discounted_sum loss = F.sum(F.huber_loss(Variable(target), q, delta=1.0)) #/ self.replay_batch_size return loss, q
def compute_value_loss(y, t, clip_delta=True, batch_accumulator='mean'): """Compute a loss for value prediction problem. Args: y (Variable or ndarray): Predicted values. t (Variable or ndarray): Target values. clip_delta (bool): Use the Huber loss function if set True. batch_accumulator (str): 'mean' or 'sum'. 'mean' will use the mean of the loss values in a batch. 'sum' will use the sum. Returns: (Variable) scalar loss """ assert batch_accumulator in ('mean', 'sum') y = F.reshape(y, (-1, 1)) t = F.reshape(t, (-1, 1)) if clip_delta: loss_sum = F.sum(F.huber_loss(y, t, delta=1.0)) if batch_accumulator == 'mean': loss = loss_sum / y.shape[0] elif batch_accumulator == 'sum': loss = loss_sum else: loss_mean = F.mean_squared_error(y, t) / 2 if batch_accumulator == 'mean': loss = loss_mean elif batch_accumulator == 'sum': loss = loss_mean * y.shape[0] return loss
def compute_weighted_value_loss(y, t, weights, clip_delta=True, batch_accumulator='mean'): """Compute a loss for value prediction problem. Args: y (Variable or ndarray): Predicted values. t (Variable or ndarray): Target values. weights (ndarray): Weights for y, t. clip_delta (bool): Use the Huber loss function if set True. batch_accumulator (str): 'mean' will devide loss by batchsize Returns: (Variable) scalar loss """ assert batch_accumulator in ('mean', 'sum') y = F.reshape(y, (-1, 1)) t = F.reshape(t, (-1, 1)) if clip_delta: losses = F.huber_loss(y, t, delta=1.0) else: losses = F.square(y - t) / 2 losses = F.reshape(losses, (-1,)) loss_sum = F.sum(losses * weights) if batch_accumulator == 'mean': loss = loss_sum / y.shape[0] elif batch_accumulator == 'sum': loss = loss_sum return loss
def mean_clipped_loss(y, t): # Add an axis because F.huber_loss only accepts arrays with ndim >= 2 y = F.expand_dims(y, axis=-1) t = F.expand_dims(t, axis=-1) return F.sum(F.huber_loss(y, t, 1.0)) / y.shape[0]
def check_forward(self, x_data, t_data): x = chainer.Variable(x_data) t = chainer.Variable(t_data) loss = functions.huber_loss(x, t, delta=1) self.assertEqual(loss.data.dtype, numpy.float32) loss_value = cuda.to_cpu(loss.data) diff_data = cuda.to_cpu(x_data) - cuda.to_cpu(t_data) expected_result = numpy.zeros(self.shape) mask = numpy.abs(diff_data) < 1 expected_result[mask] = 0.5 * diff_data[mask]**2 expected_result[~mask] = numpy.abs(diff_data[~mask]) - 0.5 loss_expect = numpy.sum(expected_result, axis=1) gradient_check.assert_allclose(loss_value, loss_expect)
def calc_loss_recurrent(self, frames, actions, rewards, done_list, size_list): # TODO self.max_step -> max_step s = Variable(frames.astype(np.float32)) self.model_target.reset_state() # Refresh model_target's state self.model_target.q_function(s[0]) # Update target model initial state target_q = self.xp.zeros((self.max_step, self.replay_batch_size), dtype=np.float32) selected_q_tuple = [None for _ in range(self.max_step)] for frame in range(0, self.max_step): q = self.model.q_function(s[frame]) q_dash = self.model_target.q_function(s[frame+1]) # Q(s',*): shape is (batch_size, action_num) max_q_dash = q_dash.data.max(axis=1) # max_a Q(s',a): shape is (batch_size,) if self.clipping: rs = self.xp.sign(rewards[frame]) else: rs = rewards[frame] target_q[frame] = rs + self.xp.logical_not(done_list[frame]).astype(np.int)*(self.gamma*max_q_dash) selected_q_tuple[frame] = F.select_item(q, actions[frame].astype(np.int)) enable = self.xp.broadcast_to(self.xp.arange(self.max_step), (self.replay_batch_size, self.max_step)) size_list = self.xp.expand_dims(cuda.to_gpu(size_list), -1) enable = (enable < size_list).T selected_q = F.concat(selected_q_tuple, axis=0) # element-wise huber loss huber_loss = F.huber_loss( F.expand_dims(F.flatten(target_q), axis=1), F.expand_dims(selected_q, axis=1), delta=1.0) huber_loss = F.reshape(huber_loss, enable.shape) zeros = self.xp.zeros(enable.shape, dtype=np.float32) loss = F.sum(F.where(enable, huber_loss, zeros)) #/ self.replay_batch_size #print("loss", loss.data) return loss
def clear(self): self.loss = None # self.accuracy = None # def forward(self, x, t): # self.clear() # #x = chainer.Variable(x_data) # x_data.astype(np.float32) # #t = chainer.Variable(t_data) # [Note]: x_data, t_data must be np.float32 type # # #self.loss = F.huber_loss(h, t, delta= 1 / 255.) # self.loss = F.mean_squared_error(self(x), t) # # self.accuracy = F.accuracy(h, t) # type inconpatible # return self.loss #