我们从Python开源项目中,提取了以下12个代码示例,用于说明如何使用chainer.functions.select_item()。
def meanQvalue(Q, samples): xp = Q.xp s = np.ndarray(shape=(minibatch_size, STATE_LENGTH, FRAME_WIDTH, FRAME_HEIGHT), dtype=np.float32) a = np.asarray([sample[1] for sample in samples], dtype=np.int32) for i in xrange(minibatch_size): s[i] = samples[i][0] # to gpu if available s = xp.asarray(s) a = xp.asarray(a) # Prediction: Q(s,a) y = F.select_item(Q(s), a) mean_Q = (F.sum(y)/minibatch_size).data return mean_Q
def max(self): with chainer.force_backprop_mode(): return F.select_item(self.q_values, self.greedy_actions)
def evaluate_actions(self, actions): return F.select_item(self.q_values, actions)
def prob(self, x): return F.select_item(self.all_prob, x)
def log_prob(self, x): return F.select_item(self.all_log_prob, x)
def sampled_actions_log_probs(self): return F.select_item( self.log_probs, chainer.Variable(np.asarray(self.action_indices, dtype=np.int32)))
def update(Q, target_Q, opt, samples, gamma=0.99, target_type='double_dqn'): xp = Q.xp s = np.ndarray(shape=(minibatch_size, STATE_LENGTH, FRAME_WIDTH, FRAME_HEIGHT), dtype=np.float32) a = np.asarray([sample[1] for sample in samples], dtype=np.int32) r = np.asarray([sample[2] for sample in samples], dtype=np.float32) done = np.asarray([sample[3] for sample in samples], dtype=np.float32) s_next = np.ndarray(shape=(minibatch_size, STATE_LENGTH, FRAME_WIDTH, FRAME_HEIGHT), dtype=np.float32) for i in xrange(minibatch_size): s[i] = samples[i][0] s_next[i] = samples[i][4] # to gpu if available s = xp.asarray(s) a = xp.asarray(a) r = xp.asarray(r) done = xp.asarray(done) s_next = xp.asarray(s_next) # Prediction: Q(s,a) y = F.select_item(Q(s), a) # Target: r + gamma * max Q_b (s',b) with chainer.no_backprop_mode(): if target_type == 'dqn': t = r + gamma * (1 - done) * F.max(target_Q(s_next), axis=1) elif target_type == 'double_dqn': t = r + gamma * (1 - done) * F.select_item( target_Q(s_next), F.argmax(Q(s_next), axis=1)) else: raise ValueError('Unsupported target_type: {}'.format(target_type)) loss = mean_clipped_loss(y, t) Q.cleargrads() loss.backward() opt.update()
def update(Q, target_Q, opt, samples, gamma=0.99, target_type='double_dqn'): xp = Q.xp s = np.ndarray(shape=(minibatch_size, STATE_LENGTH, FRAME_WIDTH, FRAME_HEIGHT), dtype=np.float32) a = np.asarray([sample[1] for sample in samples], dtype=np.int32) r = np.asarray([sample[2] for sample in samples], dtype=np.float32) done = np.asarray([sample[3] for sample in samples], dtype=np.float32) s_next = np.ndarray(shape=(minibatch_size, STATE_LENGTH, FRAME_WIDTH, FRAME_HEIGHT), dtype=np.float32) for i in xrange(minibatch_size): s[i] = samples[i][0] s_next[i] = samples[i][4] # to gpu if available s = xp.asarray(s) a = xp.asarray(a) r = xp.asarray(r) done = xp.asarray(done) s_next = xp.asarray(s_next) # Prediction: Q(s,a) y = F.select_item(Q(s), a) f0 = Q.conv1.data print f0.shape # Target: r + gamma * max Q_b (s',b) with chainer.no_backprop_mode(): if target_type == 'dqn': t = r + gamma * (1 - done) * F.max(target_Q(s_next), axis=1) elif target_type == 'double_dqn': t = r + gamma * (1 - done) * F.select_item( target_Q(s_next), F.argmax(Q(s_next), axis=1)) else: raise ValueError('Unsupported target_type: {}'.format(target_type)) loss = mean_clipped_loss(y, t) Q.cleargrads() loss.backward() opt.update()
def check_forward(self, x_data, t_data): x = chainer.Variable(x_data) t = chainer.Variable(t_data) y = functions.select_item(x, t) y_exp = cuda.to_cpu(x_data)[range(t_data.size), cuda.to_cpu(t_data)] self.assertEqual(y.data.dtype, self.dtype) numpy.testing.assert_equal(cuda.to_cpu(y.data), y_exp)
def check_value_check(self, x_data, t_data): x = chainer.Variable(x_data) t = chainer.Variable(t_data) if self.valid: # Check if it throws nothing functions.select_item(x, t) else: with self.assertRaises(ValueError): functions.select_item(x, t)
def calc_loss_recurrent(self, frames, actions, rewards, done_list, size_list): # TODO self.max_step -> max_step s = Variable(frames.astype(np.float32)) self.model_target.reset_state() # Refresh model_target's state self.model_target.q_function(s[0]) # Update target model initial state target_q = self.xp.zeros((self.max_step, self.replay_batch_size), dtype=np.float32) selected_q_tuple = [None for _ in range(self.max_step)] for frame in range(0, self.max_step): q = self.model.q_function(s[frame]) q_dash = self.model_target.q_function(s[frame+1]) # Q(s',*): shape is (batch_size, action_num) max_q_dash = q_dash.data.max(axis=1) # max_a Q(s',a): shape is (batch_size,) if self.clipping: rs = self.xp.sign(rewards[frame]) else: rs = rewards[frame] target_q[frame] = rs + self.xp.logical_not(done_list[frame]).astype(np.int)*(self.gamma*max_q_dash) selected_q_tuple[frame] = F.select_item(q, actions[frame].astype(np.int)) enable = self.xp.broadcast_to(self.xp.arange(self.max_step), (self.replay_batch_size, self.max_step)) size_list = self.xp.expand_dims(cuda.to_gpu(size_list), -1) enable = (enable < size_list).T selected_q = F.concat(selected_q_tuple, axis=0) # element-wise huber loss huber_loss = F.huber_loss( F.expand_dims(F.flatten(target_q), axis=1), F.expand_dims(selected_q, axis=1), delta=1.0) huber_loss = F.reshape(huber_loss, enable.shape) zeros = self.xp.zeros(enable.shape, dtype=np.float32) loss = F.sum(F.where(enable, huber_loss, zeros)) #/ self.replay_batch_size #print("loss", loss.data) return loss