我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用numpy.nditer()。
def eval_numerical_gradient_array(f, x, df, h=1e-5): ''' Evaluate a numeric gradient for a function that accepts a numpy array and returns a numpy array. ''' grad = np.zeros_like(x) it = np.nditer(x, flags=['multi_index'], op_flags=['readwrite']) while not it.finished: ix = it.multi_index oldval = x[ix] x[ix] = oldval + h pos = f(x).copy() x[ix] = oldval - h neg = f(x).copy() x[ix] = oldval grad[ix] = np.sum((pos - neg) * df) / (2 * h) it.iternext() return grad
def test_iter_no_inner_dim_coalescing(): # Check no_inner iterators whose dimensions may not coalesce completely # Skipping the last element in a dimension prevents coalescing # with the next-bigger dimension a = arange(24).reshape(2, 3, 4)[:,:, :-1] i = nditer(a, ['external_loop'], [['readonly']]) assert_equal(i.ndim, 2) assert_equal(i[0].shape, (3,)) a = arange(24).reshape(2, 3, 4)[:, :-1,:] i = nditer(a, ['external_loop'], [['readonly']]) assert_equal(i.ndim, 2) assert_equal(i[0].shape, (8,)) a = arange(24).reshape(2, 3, 4)[:-1,:,:] i = nditer(a, ['external_loop'], [['readonly']]) assert_equal(i.ndim, 1) assert_equal(i[0].shape, (12,)) # Even with lots of 1-sized dimensions, should still coalesce a = arange(24).reshape(1, 1, 2, 1, 1, 3, 1, 1, 4, 1, 1) i = nditer(a, ['external_loop'], [['readonly']]) assert_equal(i.ndim, 1) assert_equal(i[0].shape, (24,))
def test_iter_scalar_cast_errors(): # Check that invalid casts are caught # Need to allow copying/buffering for write casts of scalars to occur assert_raises(TypeError, nditer, np.float32(2), [], [['readwrite']], op_dtypes=[np.dtype('f8')]) assert_raises(TypeError, nditer, 2.5, [], [['readwrite']], op_dtypes=[np.dtype('f4')]) # 'f8' -> 'f4' isn't a safe cast if the value would overflow assert_raises(TypeError, nditer, np.float64(1e60), [], [['readonly']], casting='safe', op_dtypes=[np.dtype('f4')]) # 'f4' -> 'i4' is neither a safe nor a same-kind cast assert_raises(TypeError, nditer, np.float32(2), [], [['readonly']], casting='same_kind', op_dtypes=[np.dtype('i4')])
def test_iter_op_axes_errors(): # Check that custom axes throws errors for bad inputs # Wrong number of items in op_axes a = arange(6).reshape(2, 3) assert_raises(ValueError, nditer, [a, a], [], [['readonly']]*2, op_axes=[[0], [1], [0]]) # Out of bounds items in op_axes assert_raises(ValueError, nditer, [a, a], [], [['readonly']]*2, op_axes=[[2, 1], [0, 1]]) assert_raises(ValueError, nditer, [a, a], [], [['readonly']]*2, op_axes=[[0, 1], [2, -1]]) # Duplicate items in op_axes assert_raises(ValueError, nditer, [a, a], [], [['readonly']]*2, op_axes=[[0, 0], [0, 1]]) assert_raises(ValueError, nditer, [a, a], [], [['readonly']]*2, op_axes=[[0, 1], [1, 1]]) # Different sized arrays in op_axes assert_raises(ValueError, nditer, [a, a], [], [['readonly']]*2, op_axes=[[0, 1], [0, 1, 0]]) # Non-broadcastable dimensions in the result assert_raises(ValueError, nditer, [a, a], [], [['readonly']]*2, op_axes=[[0, 1], [1, 0]])
def test_iter_allocate_output_types_promotion(): # Check type promotion of automatic outputs i = nditer([array([3], dtype='f4'), array([0], dtype='f8'), None], [], [['readonly']]*2+[['writeonly', 'allocate']]) assert_equal(i.dtypes[2], np.dtype('f8')) i = nditer([array([3], dtype='i4'), array([0], dtype='f4'), None], [], [['readonly']]*2+[['writeonly', 'allocate']]) assert_equal(i.dtypes[2], np.dtype('f8')) i = nditer([array([3], dtype='f4'), array(0, dtype='f8'), None], [], [['readonly']]*2+[['writeonly', 'allocate']]) assert_equal(i.dtypes[2], np.dtype('f4')) i = nditer([array([3], dtype='u4'), array(0, dtype='i4'), None], [], [['readonly']]*2+[['writeonly', 'allocate']]) assert_equal(i.dtypes[2], np.dtype('u4')) i = nditer([array([3], dtype='u4'), array(-12, dtype='i4'), None], [], [['readonly']]*2+[['writeonly', 'allocate']]) assert_equal(i.dtypes[2], np.dtype('i8'))
def test_iter_write_buffering(): # Test that buffering of writes is working # F-order swapped array a = np.arange(24).reshape(2, 3, 4).T.newbyteorder().byteswap() i = nditer(a, ['buffered'], [['readwrite', 'nbo', 'aligned']], casting='equiv', order='C', buffersize=16) x = 0 while not i.finished: i[0] = x x += 1 i.iternext() assert_equal(a.ravel(order='C'), np.arange(24))
def test_iter_buffering_delayed_alloc(): # Test that delaying buffer allocation works a = np.arange(6) b = np.arange(1, dtype='f4') i = nditer([a, b], ['buffered', 'delay_bufalloc', 'multi_index', 'reduce_ok'], ['readwrite'], casting='unsafe', op_dtypes='f4') assert_(i.has_delayed_bufalloc) assert_raises(ValueError, lambda i:i.multi_index, i) assert_raises(ValueError, lambda i:i[0], i) assert_raises(ValueError, lambda i:i[0:2], i) def assign_iter(i): i[0] = 0 assert_raises(ValueError, assign_iter, i) i.reset() assert_(not i.has_delayed_bufalloc) assert_equal(i.multi_index, (0,)) assert_equal(i[0], 0) i[1] = 1 assert_equal(i[0:2], [0, 1]) assert_equal([[x[0][()], x[1][()]] for x in i], list(zip(range(6), [1]*6)))
def test_iter_buffering_string(): # Safe casting disallows shrinking strings a = np.array(['abc', 'a', 'abcd'], dtype=np.bytes_) assert_equal(a.dtype, np.dtype('S4')) assert_raises(TypeError, nditer, a, ['buffered'], ['readonly'], op_dtypes='S2') i = nditer(a, ['buffered'], ['readonly'], op_dtypes='S6') assert_equal(i[0], asbytes('abc')) assert_equal(i[0].dtype, np.dtype('S6')) a = np.array(['abc', 'a', 'abcd'], dtype=np.unicode) assert_equal(a.dtype, np.dtype('U4')) assert_raises(TypeError, nditer, a, ['buffered'], ['readonly'], op_dtypes='U2') i = nditer(a, ['buffered'], ['readonly'], op_dtypes='U6') assert_equal(i[0], sixu('abc')) assert_equal(i[0].dtype, np.dtype('U6'))
def test_iter_buffering_reduction_reuse_reduce_loops(): # There was a bug triggering reuse of the reduce loop inappropriately, # which caused processing to happen in unnecessarily small chunks # and overran the buffer. a = np.zeros((2, 7)) b = np.zeros((1, 7)) it = np.nditer([a, b], flags=['reduce_ok', 'external_loop', 'buffered'], op_flags=[['readonly'], ['readwrite']], buffersize=5) bufsizes = [] for x, y in it: bufsizes.append(x.shape[0]) assert_equal(bufsizes, [5, 2, 5, 2]) assert_equal(sum(bufsizes), a.size)
def test_iter_allocated_array_dtypes(): # If the dtype of an allocated output has a shape, the shape gets # tacked onto the end of the result. it = np.nditer(([1, 3, 20], None), op_dtypes=[None, ('i4', (2,))]) for a, b in it: b[0] = a - 1 b[1] = a + 1 assert_equal(it.operands[1], [[0, 2], [2, 4], [19, 21]]) # Make sure this works for scalars too it = np.nditer((10, 2, None), op_dtypes=[None, None, ('i4', (2, 2))]) for a, b, c in it: c[0, 0] = a - b c[0, 1] = a + b c[1, 0] = a * b c[1, 1] = a / b assert_equal(it.operands[2], [[8, 12], [20, 5]])
def _broadcast_to(array, shape, subok, readonly): shape = tuple(shape) if np.iterable(shape) else (shape,) array = np.array(array, copy=False, subok=subok) if not shape and array.shape: raise ValueError('cannot broadcast a non-scalar to a scalar array') if any(size < 0 for size in shape): raise ValueError('all elements of broadcast shape must be non-' 'negative') needs_writeable = not readonly and array.flags.writeable extras = ['reduce_ok'] if needs_writeable else [] op_flag = 'readwrite' if needs_writeable else 'readonly' broadcast = np.nditer( (array,), flags=['multi_index', 'refs_ok', 'zerosize_ok'] + extras, op_flags=[op_flag], itershape=shape, order='C').itviews[0] result = _maybe_view_as_subclass(array, broadcast) if needs_writeable and not result.flags.writeable: result.flags.writeable = True return result
def randomize(img, noise_level=.03): """ given an array, randomizes the values in that array noise_level [0,1] controls the overall likelihood of a bit being flipped. This overall level is then multiplied by the levels variable, which modifies the noise level for the various significant bit values (i.e. it makes it so that less significant bits are more likely to be flipped, which is accurate) """ levels = [.005, .01, .05, .10, .15, .25, .35, .45] # more or less randomly chosen modifiers for each bit significance level for val in np.nditer(img, op_flags=['readwrite']): xor_val = 0 for level in levels: if random.random() < level * noise_level: xor_val = (xor_val << 1) | 1 else: xor_val = (xor_val << 1) | 0 #print('{:08b}'.format(int(xor_val))) val[...] = val ^ xor_val return img
def _passes_gradient_check(self, parameter): iterator = np.nditer(parameter.value, flags=['multi_index'], op_flags=['readwrite']) while not iterator.finished: multi_index = iterator.multi_index numerical_gradient = self._compute_numerical_gradient(parameter=parameter, multi_index=multi_index) analytical_gradient = parameter.gradient[multi_index] relative_error = self._compute_relative_error( numerical_gradient=numerical_gradient, analytical_gradient=analytical_gradient ) if (relative_error > self.error_threshold) or np.isnan(relative_error): return False iterator.iternext() return True
def one_hot_comparison(hot_axes, axes, C): """ TODO. Arguments: hot_axes: TODO axes: TODO """ u = rng.random_integers(0, C.length - 1, axes, dtype=np.int8) u_p = ng.placeholder(axes, dtype=u.dtype) v = np.zeros(hot_axes.lengths, dtype=np.float32) udxiter = np.nditer(u, flags=['multi_index']) for uiter in udxiter: vindex = [int(uiter)] vindex.extend(udxiter.multi_index) v[tuple(vindex)] = 1 with executor(ng.one_hot(u_p, axis=C), u_p) as ex: v_t = ex(u) ng.testing.assert_allclose(v_t, v)
def numerical_gradient(f, x): h = 1e-4 # 0.0001 grad = np.zeros_like(x) it = np.nditer(x, flags=['multi_index'], op_flags=['readwrite']) while not it.finished: idx = it.multi_index tmp_val = x[idx] x[idx] = float(tmp_val) + h fxh1 = f(x) # f(x+h) x[idx] = tmp_val - h fxh2 = f(x) # f(x-h) grad[idx] = (fxh1 - fxh2) / (2 * h) x[idx] = tmp_val # ?????? it.iternext() return grad
def finiteprecision(self, coeff=None, totalbits=None, shiftbits=None): if coeff is None: coeff = self.coefficients if totalbits is None: totalbits = self.totalbits if shiftbits is None: shiftbits = self.shiftbits res = coeff * 0 + coeff for x in np.nditer(res, op_flags=['readwrite']): xr = np.round(x * 2 ** shiftbits) xmax = 2 ** (totalbits - 1) if xr == 0 and xr != 0: logger.warning("One value was rounded off to zero: Increase " "shiftbits in fpga design if this is a " "problem!") elif xr > xmax - 1: xr = xmax - 1 logger.warning("One value saturates positively: Increase " "totalbits or decrease gain!") elif xr < -xmax: xr = -xmax logger.warning("One value saturates negatively: Increase " "totalbits or decrease gain!") x[...] = 2 ** (-shiftbits) * xr return res
def testComputation(self): with self.test_session() as sess: x = sess.run(snt.nets.identity_kernel_initializer([3, 3, 5, 5])) # Iterate over elements. Assert that only the middle pixel is on when in # and out channels are same. it = np.nditer(x, flags=["multi_index"]) while not it.finished: value, idx = it[0], it.multi_index (filter_height, filter_width, in_channel, out_channel) = idx if (filter_height == 1 and filter_width == 1 and in_channel == out_channel): self.assertEqual(value, 1) else: self.assertEqual(value, 0) it.iternext()
def testComputation(self): tf.set_random_seed(0) with self.test_session() as sess: initializer = snt.nets.noisy_identity_kernel_initializer(2, stddev=1e-20) x = initializer([3, 3, 4, 8]) x = tf.reduce_sum(x, axis=[3]) x_ = sess.run(x) # Iterate over elements. After summing over depth, assert that only the # middle pixel is on. it = np.nditer(x_, flags=["multi_index"]) while not it.finished: value, idx = it[0], it.multi_index (filter_height, filter_width, _) = idx if filter_height == 1 and filter_width == 1: self.assertAllClose(value, 1) else: self.assertAllClose(value, 0) it.iternext()
def lower_dist_cumsum(context, builder, sig, args): dtype = sig.args[0].dtype zero = dtype(0) def cumsum_impl(in_arr, out_arr): c = zero for v in np.nditer(in_arr): c += v.item() prefix_var = distributed_api.dist_exscan(c) for i in range(in_arr.size): prefix_var += in_arr[i] out_arr[i] = prefix_var return 0 res = context.compile_internal(builder, cumsum_impl, sig, args, locals=dict(c=dtype, prefix_var=dtype)) return res
def sort_breakend_order(svs): ''' per sv, ensure chrom1, chrom2 and pos1, pos2 are ordered ''' svs = svs.copy() for sv in np.nditer(svs, op_flags=['readwrite']): if sv['chr1'] == sv['chr2']: if sv['pos1'] > sv['pos2']: ts = sv.copy() sv['pos1'], sv['dir1'] = ts['pos2'], ts['dir2'] sv['pos2'], sv['dir2'] = ts['pos1'], ts['dir1'] else: chrs = [str(sv['chr1']), str(sv['chr2'])] if not np.all(np.array(chrs) == np.array(nice_sort(chrs))): ts = sv.copy() sv['chr1'], sv['pos1'], sv['dir1'] = ts['chr2'], ts['pos2'], ts['dir2'] sv['chr2'], sv['pos2'], sv['dir2'] = ts['chr1'], ts['pos1'], ts['dir1'] return svs
def numerical_gradient(f, x): h = 1e-4 # 0.0001 grad = np.zeros_like(x) it = np.nditer(x, flags=['multi_index'], op_flags=['readwrite']) while not it.finished: idx = it.multi_index tmp_val = x[idx] x[idx] = float(tmp_val) + h fxh1 = f(x) # f(x+h) x[idx] = tmp_val - h fxh2 = f(x) # f(x-h) grad[idx] = (fxh1 - fxh2) / (2*h) x[idx] = tmp_val # ?????? it.iternext() return grad
def eval_numerical_gradient_array(f, x, df, h=1e-5): """ Evaluate a numeric gradient for a function that accepts a numpy array and returns a numpy array. """ grad = np.zeros_like(x) it = np.nditer(x, flags=['multi_index'], op_flags=['readwrite']) while not it.finished: ix = it.multi_index oldval = x[ix] x[ix] = oldval + h pos = f(x).copy() x[ix] = oldval - h neg = f(x).copy() x[ix] = oldval grad[ix] = np.sum((pos - neg) * df) / (2 * h) it.iternext() return grad
def dust_temperature_and_mass_from_grey_body_fit(self, fluxtype='limited'): # get the Herschel 160, 250, 350, 500 wavelengths waves = np.array( [ Filter(fs).pivotwavelength() for fs in ("Pacs.red","SPIRE.PSW","SPIRE.PMW","SPIRE.PLW")] ) sigmas = np.array(( 3,1,1,3 )) # pacs is less sensitive; longer wavelength fluxes are harder to measure # get the Herschel 160, 250, 350, 500 datapoints fluxstring = '''[ self.instr_fluxdensity_pacs_red_{0}, self.instr_fluxdensity_spire_psw_{0}, self.instr_fluxdensity_spire_pmw_{0}, self.instr_fluxdensity_spire_plw_{0} ]'''.format(fluxtype) fluxes = eval(fluxstring) # setup an iterator over the galaxies, specifying two to-be-allocated output arrays for T and M it = np.nditer([None, None, self.setup_distance_instrument] + fluxes, op_flags = [['writeonly','allocate'],['writeonly','allocate'],['readonly'], ['readonly'], ['readonly'], ['readonly'], ['readonly']]) # do the fit, iterating over the galaxies for Ti,Mi,di,f160i,f250i,f350i,f500i in it: greybody = GreyBody(di, 2, kappa350_Cortese) #greybody = GreyBody(di, 2, kappa350_Zubko) it[0],it[1] = greybody.fit(waves, (f160i,f250i,f350i,f500i), sigmas) # return the two result arrays T and M allocated by the iterator return it.operands[0:2] ## This function returns dust temperature (in K) for best fit with Herschel 160, 250, 350, 500 data points
def eval_numerical_grad(f, feed_dict, wrt, h=1e-5): wrt_val = feed_dict[wrt] grad = np.zeros_like(wrt_val) it = np.nditer(wrt_val, flags=['multi_index'], op_flags=['readwrite']) while not it.finished: ix = it.multi_index old_val = wrt_val[ix] wrt_val[ix] = old_val + h executor = Executor([f]) feed_dict[wrt] = wrt_val result_plus, = executor.run(feed_shapes=feed_dict) wrt_val[ix] = old_val - h executor = Executor([f]) feed_dict[wrt] = wrt_val result_minus, = executor.run(feed_shapes=feed_dict) grad[ix] = np.sum((result_plus - result_minus) / (2.0 * h)) wrt_val[ix] = old_val feed_dict[wrt] = wrt_val it.iternext() return grad
def get_desc_len_from_data_uni(n, n_edges, k, edgelist, mb): ''' Description length difference to a randomized instance, via PRL 110, 148701 (2013). ''' assert type(edgelist) is list, "[ERROR] the type of the input parameter (edgelist) should be a list" assert type(mb) is list, "[ERROR] the type of the input parameter (mb) should be a list" # First, let's compute the m_e_rs from the edgelist and mb m_e_rs = np.zeros((max(mb) + 1, max(mb) + 1)) for i in edgelist: # Please do check the index convention of the edgelist source_group = int(mb[int(i[0])]) target_group = int(mb[int(i[1])]) m_e_rs[source_group][target_group] += 1 m_e_rs[target_group][source_group] += 1 # then, we compute the profile likelihood from the m_e_rs italic_i = 0. m_e_r = np.sum(m_e_rs, axis=1) num_edges = m_e_r.sum() / 2. for ind, e_val in enumerate(np.nditer(m_e_rs)): ind_i = int(math.floor(ind / (m_e_rs.shape[0]))) ind_j = ind % (m_e_rs.shape[0]) if e_val != 0.0: italic_i += e_val / 2. / num_edges * math.log( e_val / m_e_r[ind_i] / m_e_r[ind_j] * 2 * num_edges ) assert m_e_rs.shape[0] == k, "[ERROR] m_e_rs dimension (={}) is not equal to k (={})!".format( m_e_rs.shape[0], k ) # finally, we compute the description length desc_len_b = (n * math.log(k) - n_edges * italic_i) / n_edges x = float(k * (k + 1)) / 2. / n_edges desc_len_b += (1 + x) * math.log(1 + x) - x * math.log(x) desc_len_b -= (1 + 1 / n_edges) * math.log(1 + 1 / n_edges) - (1 / n_edges) * math.log(1 / n_edges) return desc_len_b
def get_italic_i_from_m_e_rs(m_e_rs): assert type(m_e_rs) is np.ndarray, "[ERROR] input parameter (m_e_rs) should be of type numpy.ndarray" italic_i = 0. m_e_r = np.sum(m_e_rs, axis=1) num_edges = m_e_r.sum() / 2. for ind, e_val in enumerate(np.nditer(m_e_rs)): ind_i = int(math.floor(ind / (m_e_rs.shape[0]))) ind_j = ind % (m_e_rs.shape[0]) if e_val != 0.0: italic_i += e_val / 2. / num_edges * math.log( e_val / m_e_r[ind_i] / m_e_r[ind_j] * 2 * num_edges ) return italic_i
def test_euclideandistance(self): def mapTransform(layoutDefinition, spatialKey): ex = layoutDefinition.extent x_range = ex.xmax - ex.xmin xinc = x_range/layoutDefinition.tileLayout.layoutCols yrange = ex.ymax - ex.ymin yinc = yrange/layoutDefinition.tileLayout.layoutRows return {'xmin': ex.xmin + xinc * spatialKey['col'], 'xmax': ex.xmin + xinc * (spatialKey['col'] + 1), 'ymin': ex.ymax - yinc * (spatialKey['row'] + 1), 'ymax': ex.ymax - yinc * spatialKey['row']} def gridToMap(layoutDefinition, spatialKey, px, py): ex = mapTransform(layoutDefinition, spatialKey) x_range = ex['xmax'] - ex['xmin'] xinc = x_range/layoutDefinition.tileLayout.tileCols yrange = ex['ymax'] - ex['ymin'] yinc = yrange/layoutDefinition.tileLayout.tileRows return (ex['xmin'] + xinc * (px + 0.5), ex['ymax'] - yinc * (py + 0.5)) def distanceToGeom(layoutDefinition, spatialKey, geom, px, py): x, y = gridToMap(layoutDefinition, spatialKey, px, py) return geom.distance(Point(x, y)) tiled = euclidean_distance(self.pts_wm, 3857, 7) result = tiled.stitch().cells[0] arr = np.zeros((256,256), dtype=float) it = np.nditer(arr, flags=['multi_index']) while not it.finished: py, px = it.multi_index arr[py][px] = distanceToGeom(tiled.layer_metadata.layout_definition, {'col': 64, 'row':63}, self.pts_wm, px, py) it.iternext() self.assertTrue(np.all(abs(result - arr) < 1e-8))
def test_2d_direction(eval_buffer): """ All directions of 2D objects must have zero z coordinate """ for v in numpy.nditer(eval_buffer.array): assert v["z"] == 0 # The length must be _exactly_ zero
def test_direction_unit_length(eval_buffer): """ All directions must be unit length """ for v in numpy.nditer(eval_buffer.array): assert v["x"]**2 + v["y"]**2 + v["z"]**2 == pytest.approx(1)
def eval_numerical_gradient(f, x, verbose=True, h=0.00001): ''' a naive implementation of numerical gradient of f at x - f should be a function that takes a single argument - x is the point (numpy array) to evaluate the gradient at ''' grad = np.zeros_like(x) # iterate over all indexes in x it = np.nditer(x, flags=['multi_index'], op_flags=['readwrite']) while not it.finished: # evaluate function at x+h ix = it.multi_index oldval = x[ix] x[ix] = oldval + h # increment by h fxph = f(x) # evalute f(x + h) x[ix] = oldval - h fxmh = f(x) # evaluate f(x - h) x[ix] = oldval # restore # compute the partial derivative with centered formula grad[ix] = (fxph - fxmh) / (2 * h) # the slope if verbose: print(x), grad[ix] it.iternext() # step to next dimension return grad
def eval_numerical_gradient_blobs(f, inputs, output, h=1e-5): ''' Compute numeric gradients for a function that operates on input and output blobs. We assume that f accepts several input blobs as arguments, followed by a blob into which outputs will be written. For example, f might be called like this: f(x, w, out) where x and w are input Blobs, and the result of f will be written to out. Inputs: - f: function - inputs: tuple of input blobs - output: output blob - h: step size ''' numeric_diffs = [] for input_blob in inputs: diff = np.zeros_like(input_blob.diffs) it = np.nditer(input_blob.vals, flags=['multi_index'], op_flags=['readwrite']) while not it.finished: idx = it.multi_index orig = input_blob.vals[idx] input_blob.vals[idx] = orig + h f(*(inputs + (output,))) pos = np.copy(output.vals) input_blob.vals[idx] = orig - h f(*(inputs + (output,))) neg = np.copy(output.vals) input_blob.vals[idx] = orig diff[idx] = np.sum((pos - neg) * output.diffs) / (2.0 * h) it.iternext() numeric_diffs.append(diff) return numeric_diffs
def test_iter_refcount(): # Make sure the iterator doesn't leak # Basic a = arange(6) dt = np.dtype('f4').newbyteorder() rc_a = sys.getrefcount(a) rc_dt = sys.getrefcount(dt) it = nditer(a, [], [['readwrite', 'updateifcopy']], casting='unsafe', op_dtypes=[dt]) assert_(not it.iterationneedsapi) assert_(sys.getrefcount(a) > rc_a) assert_(sys.getrefcount(dt) > rc_dt) it = None assert_equal(sys.getrefcount(a), rc_a) assert_equal(sys.getrefcount(dt), rc_dt) # With a copy a = arange(6, dtype='f4') dt = np.dtype('f4') rc_a = sys.getrefcount(a) rc_dt = sys.getrefcount(dt) it = nditer(a, [], [['readwrite']], op_dtypes=[dt]) rc2_a = sys.getrefcount(a) rc2_dt = sys.getrefcount(dt) it2 = it.copy() assert_(sys.getrefcount(a) > rc2_a) assert_(sys.getrefcount(dt) > rc2_dt) it = None assert_equal(sys.getrefcount(a), rc2_a) assert_equal(sys.getrefcount(dt), rc2_dt) it2 = None assert_equal(sys.getrefcount(a), rc_a) assert_equal(sys.getrefcount(dt), rc_dt) del it2 # avoid pyflakes unused variable warning
def test_iter_best_order(): # The iterator should always find the iteration order # with increasing memory addresses # Test the ordering for 1-D to 5-D shapes for shape in [(5,), (3, 4), (2, 3, 4), (2, 3, 4, 3), (2, 3, 2, 2, 3)]: a = arange(np.prod(shape)) # Test each combination of positive and negative strides for dirs in range(2**len(shape)): dirs_index = [slice(None)]*len(shape) for bit in range(len(shape)): if ((2**bit) & dirs): dirs_index[bit] = slice(None, None, -1) dirs_index = tuple(dirs_index) aview = a.reshape(shape)[dirs_index] # C-order i = nditer(aview, [], [['readonly']]) assert_equal([x for x in i], a) # Fortran-order i = nditer(aview.T, [], [['readonly']]) assert_equal([x for x in i], a) # Other order if len(shape) > 2: i = nditer(aview.swapaxes(0, 1), [], [['readonly']]) assert_equal([x for x in i], a)
def test_iter_c_order(): # Test forcing C order # Test the ordering for 1-D to 5-D shapes for shape in [(5,), (3, 4), (2, 3, 4), (2, 3, 4, 3), (2, 3, 2, 2, 3)]: a = arange(np.prod(shape)) # Test each combination of positive and negative strides for dirs in range(2**len(shape)): dirs_index = [slice(None)]*len(shape) for bit in range(len(shape)): if ((2**bit) & dirs): dirs_index[bit] = slice(None, None, -1) dirs_index = tuple(dirs_index) aview = a.reshape(shape)[dirs_index] # C-order i = nditer(aview, order='C') assert_equal([x for x in i], aview.ravel(order='C')) # Fortran-order i = nditer(aview.T, order='C') assert_equal([x for x in i], aview.T.ravel(order='C')) # Other order if len(shape) > 2: i = nditer(aview.swapaxes(0, 1), order='C') assert_equal([x for x in i], aview.swapaxes(0, 1).ravel(order='C'))
def test_iter_f_order(): # Test forcing F order # Test the ordering for 1-D to 5-D shapes for shape in [(5,), (3, 4), (2, 3, 4), (2, 3, 4, 3), (2, 3, 2, 2, 3)]: a = arange(np.prod(shape)) # Test each combination of positive and negative strides for dirs in range(2**len(shape)): dirs_index = [slice(None)]*len(shape) for bit in range(len(shape)): if ((2**bit) & dirs): dirs_index[bit] = slice(None, None, -1) dirs_index = tuple(dirs_index) aview = a.reshape(shape)[dirs_index] # C-order i = nditer(aview, order='F') assert_equal([x for x in i], aview.ravel(order='F')) # Fortran-order i = nditer(aview.T, order='F') assert_equal([x for x in i], aview.T.ravel(order='F')) # Other order if len(shape) > 2: i = nditer(aview.swapaxes(0, 1), order='F') assert_equal([x for x in i], aview.swapaxes(0, 1).ravel(order='F'))