Python chainer 模块,testing() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用chainer.testing()。
def _test(self, gpu):
if gpu >= 0:
chainer.cuda.get_device(gpu).use()
xp = chainer.cuda.cupy
else:
xp = np
batch_probs = xp.asarray([[0.3, 0.7],
[0.8, 0.2],
[0.5, 0.5],
[0.0, 1.0],
[1.0, 0.0]], dtype=np.float32)
counter = np.zeros(batch_probs.shape, dtype=batch_probs.dtype)
for _ in range(1000):
batch_indices = chainer.cuda.to_cpu(
distribution.sample_discrete_actions(batch_probs))
for i in range(batch_probs.shape[0]):
counter[i][batch_indices[i]] += 1
np.testing.assert_allclose(
counter / 1000, chainer.cuda.to_cpu(batch_probs), atol=0.05)
def test_copyparams(self):
self.link.x.grad.fill(0)
self.link.y.grad.fill(1)
gx = self.link.x.grad.copy()
gy = self.link.y.grad.copy()
l = chainer.Link(x=(2, 3), y=2)
l.x.data.fill(2)
l.x.grad.fill(3)
l.y.data.fill(4)
l.y.grad.fill(5)
self.link.copyparams(l)
numpy.testing.assert_array_equal(self.link.x.data, l.x.data)
numpy.testing.assert_array_equal(self.link.x.grad, gx)
numpy.testing.assert_array_equal(self.link.y.data, l.y.data)
numpy.testing.assert_array_equal(self.link.y.grad, gy)
def test_addgrads(self):
l1 = chainer.Link(x=(2, 3))
l2 = chainer.Link(x=2)
l3 = chainer.Link(x=3)
c1 = chainer.Chain(l1=l1, l2=l2)
c2 = chainer.Chain(c1=c1, l3=l3)
l1.x.grad.fill(1)
l2.x.grad.fill(2)
l3.x.grad.fill(3)
self.l1.x.grad.fill(-1)
self.l2.x.grad.fill(-2)
self.l3.x.grad.fill(-3)
self.c2.addgrads(c2)
numpy.testing.assert_array_equal(self.l1.x.grad, numpy.zeros((2, 3)))
numpy.testing.assert_array_equal(self.l2.x.grad, numpy.zeros(2))
numpy.testing.assert_array_equal(self.l3.x.grad, numpy.zeros(3))
def test_addgrads(self):
l1 = chainer.Link(x=(2, 3))
l2 = chainer.Link(x=2)
l3 = chainer.Link(x=3)
c1 = chainer.ChainList(l1, l2)
c2 = chainer.ChainList(c1, l3)
l1.x.grad.fill(1)
l2.x.grad.fill(2)
l3.x.grad.fill(3)
self.l1.x.grad.fill(-1)
self.l2.x.grad.fill(-2)
self.l3.x.grad.fill(-3)
self.c2.addgrads(c2)
numpy.testing.assert_array_equal(self.l1.x.grad, numpy.zeros((2, 3)))
numpy.testing.assert_array_equal(self.l2.x.grad, numpy.zeros(2))
numpy.testing.assert_array_equal(self.l3.x.grad, numpy.zeros(3))
def check_forward(self, f_data, f_p_data, l2_reg):
xp = cuda.get_array_module(f_data)
num_examples = len(f_data)
f = chainer.Variable(f_data)
f_p = chainer.Variable(f_p_data)
loss = n_pair_mc_loss(f, f_p, l2_reg)
loss_for_each = []
for i in range(num_examples):
exps = []
for j in set(range(num_examples)) - {i}:
exp_ij = xp.exp(f_data[i].dot(f_p_data[j]) -
f_data[i].dot(f_p_data[i]))
exps.append(exp_ij)
loss_i = xp.log(1.0 + sum(exps))
loss_for_each.append(loss_i)
loss_expected = xp.asarray(loss_for_each, dtype=np.float32).mean()
testing.assert_allclose(loss.data, loss_expected, atol=1e-2)
def check_forward(self, x_data, c_data, gamma, T, y_star, y_pam):
num_examples = len(x_data)
x = chainer.Variable(x_data)
c = chainer.Variable(c_data)
loss = clustering_loss(x, c, gamma, T)
sq_distances_ij = []
for i, j in zip(range(num_examples), y_pam):
sqd_ij = np.sum((x_data[i] - x_data[j]) ** 2)
sq_distances_ij.append(sqd_ij)
f = -sum(sq_distances_ij)
sq_distances_ij = []
for i, j in zip(range(num_examples), y_star):
sqd_ij = np.sum((x_data[i] - x_data[j]) ** 2)
sq_distances_ij.append(sqd_ij)
f_tilde = -sum(sq_distances_ij)
delta = 1.0 - normalized_mutual_info_score(cuda.to_cpu(c_data), y_pam)
loss_expected = f + gamma * delta - f_tilde
testing.assert_allclose(loss.data, loss_expected)
def check_call(self, x, expects):
outs = self.link(x)
if isinstance(self.pick, tuple):
pick = self.pick
else:
if self.pick is None:
pick = ('l2',)
else:
pick = (self.pick,)
outs = (outs,)
self.assertEqual(len(outs), len(pick))
for out, layer_name in zip(outs, pick):
self.assertIsInstance(out, chainer.Variable)
self.assertIsInstance(out.array, self.link.xp.ndarray)
out = to_cpu(out.array)
np.testing.assert_equal(out, to_cpu(expects[layer_name].array))
def check_forward(self, x_data):
x = chainer.Variable(x_data)
# Make the batch normalization to be the identity function.
self.l.bn.avg_var[:] = 1
self.l.bn.avg_mean[:] = 0
with chainer.using_config('train', False):
y = self.l(x)
self.assertIsInstance(y, chainer.Variable)
self.assertIsInstance(y.array, self.l.xp.ndarray)
if self.activ == 'relu':
np.testing.assert_almost_equal(
cuda.to_cpu(y.array), np.maximum(cuda.to_cpu(x_data), 0),
decimal=4
)
elif self.activ == 'add_one':
np.testing.assert_almost_equal(
cuda.to_cpu(y.array), cuda.to_cpu(x_data) + 1,
decimal=4
)
def _check(self, xp):
self.assertIsInstance(self.link, chainer.Link)
self.assertEqual(self.link.xp, xp)
outputs = self.link('ignored', -1, 'inputs', 1.0)
if isinstance(self.outputs, tuple):
originals = self.outputs
outputs = outputs
else:
originals = self.outputs,
outputs = outputs,
self.assertEqual(len(originals), len(outputs))
for orig, out in zip(originals, outputs):
self.assertIsInstance(out, chainer.Variable)
self.assertEqual(out.shape, orig.shape)
self.assertEqual(out.dtype, orig.dtype)
self.assertEqual(
chainer.cuda.get_array_module(out.array), xp)
out.to_cpu()
np.testing.assert_equal(out.array, orig)
def _test(self, comm, model):
rank = comm.rank
model.bn1.avg_mean.fill(rank * 1)
model.bn2.avg_mean.fill(rank * 2)
model.bn1.avg_var.fill(rank * 3)
model.bn2.avg_var.fill(rank * 4)
allreduce_persistent = \
chainermn.extensions.AllreducePersistent(model, comm)
allreduce_persistent()
avg_rank = (comm.size - 1) / 2.0
chainer.testing.assert_allclose(model.bn1.avg_mean, avg_rank * 1)
chainer.testing.assert_allclose(model.bn2.avg_mean, avg_rank * 2)
chainer.testing.assert_allclose(model.bn1.avg_var, avg_rank * 3)
chainer.testing.assert_allclose(model.bn2.avg_var, avg_rank * 4)
def check_forward(self, delegate_data, x_data):
delegate_variable = chainer.Variable(delegate_data)
x = tuple([chainer.Variable(data) for data in x_data])
y = chainermn.functions.pseudo_connect(delegate_variable, *x)
if isinstance(y, tuple):
for _y in y:
self.assertEqual(_y.data.dtype, self.dtype)
for _x, _y in zip(self.x, y):
y_expect = _x.copy()
testing.assert_allclose(y_expect, _y.data)
else:
self.assertEqual(y.data.dtype, self.dtype)
y_expect = self.x[0].copy()
testing.assert_allclose(y_expect, y.data)
def check_allreduce_grad(communicator, model):
# We need to repeat twice for regressions on lazy initialization of
# sub communicators.
for _ in range(2):
model.a.W.grad[:] = communicator.rank
model.b.W.grad[:] = communicator.rank + 1
model.c.b.grad[:] = communicator.rank + 2
communicator.allreduce_grad(model)
base = (communicator.size - 1.0) / 2
chainer.testing.assert_allclose(model.a.W.grad,
(base + 0) * np.ones((3, 2)))
chainer.testing.assert_allclose(model.b.W.grad,
(base + 1) * np.ones((4, 3)))
chainer.testing.assert_allclose(model.c.b.grad,
(base + 2) * np.ones((5, )))
def test_forward_consistency(self, nobias=False):
x_cpu = chainer.Variable(self.x)
W_cpu = chainer.Variable(self.W)
b_cpu = None if nobias else chainer.Variable(self.b)
func_cpu = graph_convolution.GraphConvolutionFunction(self.L, self.K)
func_cpu.to_cpu()
args_cpu = (x_cpu, W_cpu)
if b_cpu is not None:
args_cpu += (b_cpu, )
y_cpu = func_cpu(*args_cpu)
x_gpu = chainer.Variable(cuda.to_gpu(self.x))
W_gpu = chainer.Variable(cuda.to_gpu(self.W))
b_gpu = None if nobias else chainer.Variable(cuda.to_gpu(self.b))
func_gpu = graph_convolution.GraphConvolutionFunction(self.L, self.K)
func_gpu.to_gpu()
args_gpu = (x_gpu, W_gpu)
if b_gpu is not None:
args_gpu += (b_gpu, )
y_gpu = func_gpu(*args_gpu)
testing.assert_allclose(
y_cpu.data, y_gpu.data.get(), **self.check_forward_options)
def check_pickling(self, x_data):
x = chainer.Variable(x_data)
y = self.link(x)
y_data1 = y.data
del x, y
pickled = pickle.dumps(self.link, -1)
del self.link
self.link = pickle.loads(pickled)
x = chainer.Variable(x_data)
y = self.link(x)
y_data2 = y.data
testing.assert_allclose(y_data1, y_data2, atol=0, rtol=0)
def test_prob(self):
batch_ps = []
for a in range(self.n):
indices = np.asarray([a] * self.batch_size, dtype=np.int32)
batch_p = self.distrib.prob(indices)
self.assertTrue(isinstance(batch_p, chainer.Variable))
for b in range(self.batch_size):
p = batch_p.data[b]
self.assertGreaterEqual(p, self.min_prob)
self.assertLessEqual(p, 1)
batch_ps.append(batch_p.data)
np.testing.assert_almost_equal(sum(batch_ps), np.ones(self.batch_size))
def test_log_prob(self):
for a in range(self.n):
indices = np.asarray([a] * self.batch_size, dtype=np.int32)
batch_p = self.distrib.prob(indices)
batch_log_p = self.distrib.log_prob(indices)
np.testing.assert_almost_equal(np.log(batch_p.data),
batch_log_p.data)
def test_self_kl(self):
kl = self.distrib.kl(self.distrib)
for b in range(self.batch_size):
np.testing.assert_allclose(
kl.data[b], np.zeros_like(kl.data[b]), rtol=1e-5)
def test_prob(self):
batch_ps = []
for a in range(self.n):
indices = np.asarray([a] * self.batch_size, dtype=np.int32)
batch_p = self.distrib.prob(indices)
self.assertTrue(isinstance(batch_p, chainer.Variable))
for b in range(self.batch_size):
p = batch_p.data[b]
self.assertGreaterEqual(p, 0)
self.assertLessEqual(p, 1)
batch_ps.append(batch_p.data)
np.testing.assert_almost_equal(sum(batch_ps), np.ones(self.batch_size))
def test_self_kl(self):
kl = self.distrib.kl(self.distrib)
for b in range(self.batch_size):
np.testing.assert_allclose(
kl.data[b], np.zeros_like(kl.data[b]), rtol=1e-5)
def test_most_probable(self):
most_probable = self.distrib.most_probable
self.assertTrue(isinstance(most_probable, chainer.Variable))
self.assertEqual(most_probable.shape, (self.batch_size, self.ndim))
np.testing.assert_allclose(most_probable.data, self.mean, rtol=1e-5)
def test_prob(self):
sample = self.distrib.sample()
sample_prob = self.distrib.prob(sample)
for b in range(self.batch_size):
cov = (np.identity(self.ndim, dtype=np.float32) *
self.var[b])
desired_pdf = scipy.stats.multivariate_normal(
self.mean[b], cov).pdf(sample.data[b])
np.testing.assert_allclose(
sample_prob.data[b],
desired_pdf, rtol=1e-5)
def test_log_prob(self):
sample = self.distrib.sample()
sample_log_prob = self.distrib.log_prob(sample)
for b in range(self.batch_size):
cov = (np.identity(self.ndim, dtype=np.float32) *
self.var[b])
desired_pdf = scipy.stats.multivariate_normal(
self.mean[b], cov).pdf(sample.data[b])
np.testing.assert_allclose(
sample_log_prob.data[b],
np.log(desired_pdf), rtol=1e-4)
def test_entropy(self):
entropy = self.distrib.entropy
for b in range(self.batch_size):
cov = (np.identity(self.ndim, dtype=np.float32) *
self.var[b])
desired_entropy = scipy.stats.multivariate_normal(
self.mean[b], cov).entropy()
np.testing.assert_allclose(
entropy.data[b], desired_entropy, rtol=1e-5)
def test_kl(self):
# Compare it to chainer.functions.gaussian_kl_divergence
standard = distribution.GaussianDistribution(
mean=np.zeros((self.batch_size, self.ndim), dtype=np.float32),
var=np.ones((self.batch_size, self.ndim), dtype=np.float32))
kl = self.distrib.kl(standard)
chainer_kl = chainer.functions.gaussian_kl_divergence(
self.distrib.mean, self.distrib.ln_var)
np.testing.assert_allclose(kl.data.sum(),
chainer_kl.data,
rtol=1e-5)
def test_sample(self):
sample = self.distrib.sample()
self.assertTrue(isinstance(sample, chainer.Variable))
self.assertEqual(sample.shape, (self.batch_size, self.ndim))
np.testing.assert_allclose(sample.data, self.x, rtol=1e-5)
def test_most_probable(self):
most_probable = self.distrib.most_probable
self.assertTrue(isinstance(most_probable, chainer.Variable))
self.assertEqual(most_probable.shape, (self.batch_size, self.ndim))
np.testing.assert_allclose(most_probable.data, self.x, rtol=1e-5)
def check_forward(self, x_data, t_data, class_weight, use_cudnn=True):
x = chainer.Variable(x_data)
t = chainer.Variable(t_data)
loss = softmax_cross_entropy.softmax_cross_entropy(
x, t, use_cudnn=use_cudnn, normalize=self.normalize,
cache_score=self.cache_score, class_weight=class_weight)
self.assertEqual(loss.data.shape, ())
self.assertEqual(loss.data.dtype, self.dtype)
self.assertEqual(hasattr(loss.creator, 'y'), self.cache_score)
loss_value = float(cuda.to_cpu(loss.data))
# Compute expected value
loss_expect = 0.0
count = 0
x = numpy.rollaxis(self.x, 1, self.x.ndim).reshape(
(self.t.size, self.x.shape[1]))
t = self.t.ravel()
for xi, ti in six.moves.zip(x, t):
if ti == -1:
continue
log_z = numpy.ufunc.reduce(numpy.logaddexp, xi)
if class_weight is None:
loss_expect -= (xi - log_z)[ti]
else:
loss_expect -= (xi - log_z)[ti] * class_weight[ti]
count += 1
if self.normalize:
if count == 0:
loss_expect = 0.0
else:
loss_expect /= count
else:
loss_expect /= len(t_data)
testing.assert_allclose(
loss_expect, loss_value, **self.check_forward_options)
def check_forward(self, x, use_cudnn='always'):
with chainer.using_config('use_cudnn', use_cudnn):
y = normalize_layer(x, eps=self.eps)
self.assertEqual(y.data.dtype, self.dtype)
y_expect = _normalize_layer(self.x, self.eps).data
testing.assert_allclose(y_expect, y.data, **self.check_forward_options)
def test_cupy_array(self):
x = cuda.to_gpu(self.x)
if not self.c_contiguous:
x = cuda.cupy.asfortranarray(x)
y = cuda.to_cpu(x)
self.assertIsInstance(y, numpy.ndarray)
numpy.testing.assert_array_equal(self.x, y)
def test_cupy_array2(self):
with cuda.Device(0):
x = cuda.to_gpu(self.x)
if not self.c_contiguous:
x = cuda.cupy.asfortranarray(x)
with cuda.Device(1):
y = cuda.to_cpu(x)
self.assertIsInstance(y, numpy.ndarray)
numpy.testing.assert_array_equal(self.x, y)
def check_call(self, x_data):
x = chainer.Variable(x_data)
actual = self.mlp(x)
act = functions.sigmoid
expect = self.mlp[2](act(self.mlp[1](act(self.mlp[0](x)))))
numpy.testing.assert_array_equal(
cuda.to_cpu(expect.data), cuda.to_cpu(actual.data))
def _check_setter(self, fs, gpu, attribute):
expect = getattr(fs, attribute)
setattr(fs, attribute, expect)
actual = getattr(fs, attribute)
if gpu:
expect = tuple(p.get() for p in expect)
actual = tuple(p.get() for p in actual)
self.assertEqual(len(expect), len(actual))
for e, a in zip(expect, actual):
np.testing.assert_array_equal(e, a)
def check_forward(self, src_id, dst_id):
x_data = _to_gpu(self.x_data, src_id)
x = chainer.Variable(x_data)
y = functions.copy(x, dst_id)
self.assertEqual(self.x_data.dtype, self.dtype)
numpy.testing.assert_array_equal(self.x_data, cuda.to_cpu(y.data))
def check_forward(self, x_data, t_data):
x = chainer.Variable(x_data)
t = chainer.Variable(t_data)
y = functions.select_item(x, t)
y_exp = cuda.to_cpu(x_data)[range(t_data.size), cuda.to_cpu(t_data)]
self.assertEqual(y.data.dtype, self.dtype)
numpy.testing.assert_equal(cuda.to_cpu(y.data), y_exp)
def check_forward(self, x_data):
x = chainer.Variable(x_data)
y = functions.get_item(x, self.slices)
self.assertEqual(y.data.dtype, numpy.float)
numpy.testing.assert_equal(cuda.to_cpu(x_data)[self.slices],
cuda.to_cpu(y.data))
def check_forward(self, x_data):
x = chainer.Variable(x_data)
y = functions.expand_dims(x, self.axis)
self.assertEqual(y.data.shape, self.out_shape)
y_expect = numpy.expand_dims(cuda.to_cpu(x_data), self.axis)
self.assertEqual(y.data.dtype, self.dtype)
numpy.testing.assert_array_equal(cuda.to_cpu(y.data), y_expect)
def check_get_item(self, gpu):
x_data = self.x
if gpu:
x_data = cuda.to_gpu(x_data)
x = chainer.Variable(x_data)
slices = slice(2, 5)
np.testing.assert_equal(cuda.to_cpu(x[slices].data),
cuda.to_cpu(x_data[slices]))
slices = slice(2, 5),
np.testing.assert_equal(cuda.to_cpu(x[slices].data),
cuda.to_cpu(x_data[slices]))
def test_to_cpu_from_cpu(self):
a = chainer.Variable(np.zeros(3, dtype=np.float32))
a.grad = np.ones_like(a.data)
b = a.data
gb = a.grad
c = b.copy()
gc = gb.copy()
a.to_cpu()
self.assertIs(a.data, b)
self.assertIs(a.grad, gb)
np.testing.assert_array_equal(a.data, c)
np.testing.assert_array_equal(a.grad, gc)
def test_to_gpu_from_gpu(self):
cp = cuda.cupy
a = chainer.Variable(cp.zeros(3, dtype=np.float32))
a.grad = cuda.cupy.ones_like(a.data)
b = a.data
gb = a.grad
c = b.copy()
gc = gb.copy()
a.to_gpu()
self.assertIs(a.data, b)
self.assertIs(a.grad, gb)
cp.testing.assert_array_equal(a.data, c)
cp.testing.assert_array_equal(a.grad, gc)
def test_to_gpu(self):
cp = cuda.cupy
a = chainer.Variable(np.zeros(3, dtype=np.float32))
a.grad = np.ones(3, dtype=np.float32)
a.to_gpu()
cp.testing.assert_array_equal(a.data, cp.zeros(3, dtype=np.float32))
cp.testing.assert_array_equal(a.grad, cp.ones(3, dtype=np.float32))
def test_to_gpu_from_another_gpu(self):
cp = cuda.cupy
a = chainer.Variable(cp.zeros(3, dtype=np.float32))
a.grad = cuda.cupy.ones_like(a.data)
b = a.data.copy()
gb = a.grad.copy()
a.to_gpu(1)
self.assertEqual(int(cuda.get_device(a.data)), 1)
self.assertEqual(int(cuda.get_device(a.grad)), 1)
cp.testing.assert_array_equal(a.data, b)
cp.testing.assert_array_equal(a.grad, gb)
def check_zerograd(self, a_data, fill=False):
xp = cuda.get_array_module(a_data)
a = chainer.Variable(a_data)
if fill:
a.grad = xp.full_like(a_data, np.nan)
a.zerograd()
self.assertIsNot(a.grad, None)
g_expect = xp.zeros_like(a.data)
xp.testing.assert_array_equal(a.grad, g_expect)
def test_zerograds_multi_gpu(self):
cupy = cuda.cupy
with cuda.get_device(1):
a = chainer.Variable(cupy.empty(3, dtype=np.float32))
a.zerograd()
self.assertIsNot(a.grad, None)
self.assertEqual(int(a.grad.device), 1)
with cuda.get_device(1):
g_expect = cupy.zeros_like(a.data)
cupy.testing.assert_array_equal(a.grad, g_expect)
def check_copydata(self, data1, data2, expect):
xp = cuda.get_array_module(data1)
v = chainer.Variable(data1)
w = chainer.Variable(data2)
v.copydata(w)
xp.testing.assert_array_equal(v.data, expect)
def check_addgrad(self, src, dst, expect):
xp = cuda.get_array_module(dst)
a = chainer.Variable(src)
a.grad = src
b = chainer.Variable(dst)
b.grad = dst
b.addgrad(a)
xp.testing.assert_array_equal(b.grad, expect)
def test_zerograds(self):
gx_expect = numpy.zeros_like(self.link.x.data)
gy_expect = numpy.zeros_like(self.link.y.data)
self.link.zerograds()
numpy.testing.assert_array_equal(self.link.x.grad, gx_expect)
numpy.testing.assert_array_equal(self.link.y.grad, gy_expect)
def test_addgrads(self):
l = chainer.Link(x=(2, 3), y=2)
l.x.grad.fill(1)
l.y.grad.fill(2)
self.link.x.grad.fill(-1)
self.link.y.grad.fill(-2)
self.link.addgrads(l)
gx_expect = numpy.zeros_like(l.x.grad)
gy_expect = numpy.zeros_like(l.y.grad)
numpy.testing.assert_array_equal(self.link.x.grad, gx_expect)
numpy.testing.assert_array_equal(self.link.y.grad, gy_expect)
def test_zerograds(self):
self.c2.zerograds()
numpy.testing.assert_array_equal(self.l1.x.grad, numpy.zeros((2, 3)))
numpy.testing.assert_array_equal(self.l2.x.grad, numpy.zeros(2))
numpy.testing.assert_array_equal(self.l3.x.grad, numpy.zeros(3))
def test_copyparams(self):
l1 = chainer.Link(x=(2, 3))
l2 = chainer.Link(x=2)
l3 = chainer.Link(x=3)
c1 = chainer.ChainList(l1, l2)
c2 = chainer.ChainList(c1, l3)
l1.x.data.fill(0)
l2.x.data.fill(1)
l3.x.data.fill(2)
self.c2.copyparams(c2)
numpy.testing.assert_array_equal(self.l1.x.data, l1.x.data)
numpy.testing.assert_array_equal(self.l2.x.data, l2.x.data)
numpy.testing.assert_array_equal(self.l3.x.data, l3.x.data)