我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用numpy.memmap()。
def _read_from_header(self): a, b, c = self._get_header() header = a header['data_offset'] = b header['nb_channels'] = c #header['dtype_offset'] = int(header['ADC zero']) header['gain'] = float(re.findall("\d+\.\d+", header['El'])[0]) header['data_dtype'] = self.params['data_dtype'] self.data = numpy.memmap(self.file_name, offset=header['data_offset'], dtype=header['data_dtype'], mode='r') self.size = len(self.data) self._shape = (self.size//header['nb_channels'], header['nb_channels']) del self.data return header
def test_validating(self): #mpi_launch('fitting', self.file_name, 2, 0, 'False') a, b = os.path.splitext(os.path.basename(self.file_name)) file_name, ext = os.path.splitext(self.file_name) file_out = os.path.join(os.path.abspath(file_name), a) result_name = os.path.join(file_name, 'injected') spikes = {} result = h5py.File(os.path.join(result_name, '%s.result.hdf5' %a)) for key in result.get('spiketimes').keys(): spikes[key] = result.get('spiketimes/%s' %key)[:] juxta_file = file_out + '.juxta.dat' f = numpy.memmap(juxta_file, shape=(self.length,1), dtype=self.parser.get('validating', 'juxta_dtype'), mode='w+') f[spikes['temp_9']] = 100 del f mpi_launch('validating', self.file_name, 2, 0, 'False')
def _readData1(self, fd, meta, mmap=False, **kwds): ## Read array data from the file descriptor for MetaArray v1 files ## read in axis values for any axis that specifies a length frameSize = 1 for ax in meta['info']: if 'values_len' in ax: ax['values'] = np.fromstring(fd.read(ax['values_len']), dtype=ax['values_type']) frameSize *= ax['values_len'] del ax['values_len'] del ax['values_type'] self._info = meta['info'] if not kwds.get("readAllData", True): return ## the remaining data is the actual array if mmap: subarr = np.memmap(fd, dtype=meta['type'], mode='r', shape=meta['shape']) else: subarr = np.fromstring(fd.read(), dtype=meta['type']) subarr.shape = meta['shape'] self._data = subarr
def __mmap_ncs_packet_headers(self, filename): """ Memory map of the Neuralynx .ncs file optimized for extraction of data packet headers Reading standard dtype improves speed, but timestamps need to be reconstructed """ filesize = getsize(self.sessiondir + sep + filename) # in byte if filesize > 16384: data = np.memmap(self.sessiondir + sep + filename, dtype='<u4', shape=((filesize - 16384) / 4 / 261, 261), mode='r', offset=16384) ts = data[:, 0:2] multi = np.repeat(np.array([1, 2 ** 32], ndmin=2), len(data), axis=0) timestamps = np.sum(ts * multi, axis=1) # timestamps = data[:,0] + (data[:,1] *2**32) header_u4 = data[:, 2:5] return timestamps, header_u4 else: return None
def __mmap_nev_file(self, filename): """ Memory map the Neuralynx .nev file """ nev_dtype = np.dtype([ ('reserved', '<i2'), ('system_id', '<i2'), ('data_size', '<i2'), ('timestamp', '<u8'), ('event_id', '<i2'), ('ttl_input', '<i2'), ('crc_check', '<i2'), ('dummy1', '<i2'), ('dummy2', '<i2'), ('extra', '<i4', (8,)), ('event_string', 'a128'), ]) if getsize(self.sessiondir + sep + filename) > 16384: return np.memmap(self.sessiondir + sep + filename, dtype=nev_dtype, mode='r', offset=16384) else: return None
def __read_nsx_data_variant_b(self, nsx_nb): """ Extract nsx data (blocks) from a 2.2 or 2.3 .nsx file. Blocks can arise if the recording was paused by the user. """ filename = '.'.join([self._filenames['nsx'], 'ns%i' % nsx_nb]) data = {} for data_bl in self.__nsx_data_header[nsx_nb].keys(): # get shape and offset of data shape = ( self.__nsx_data_header[nsx_nb][data_bl]['nb_data_points'], self.__nsx_basic_header[nsx_nb]['channel_count']) offset = \ self.__nsx_data_header[nsx_nb][data_bl]['offset_to_data_block'] # read data data[data_bl] = np.memmap( filename, dtype='int16', shape=shape, offset=offset) return data
def __read_nev_data(self, nev_data_masks, nev_data_types): """ Extract nev data from a 2.1 or 2.2 .nev file """ filename = '.'.join([self._filenames['nev'], 'nev']) data_size = self.__nev_basic_header['bytes_in_data_packets'] header_size = self.__nev_basic_header['bytes_in_headers'] # read all raw data packets and markers dt0 = [ ('timestamp', 'uint32'), ('packet_id', 'uint16'), ('value', 'S{0}'.format(data_size - 6))] raw_data = np.memmap(filename, offset=header_size, dtype=dt0) masks = self.__nev_data_masks(raw_data['packet_id']) types = self.__nev_data_types(data_size) data = {} for k, v in nev_data_masks.items(): data[k] = raw_data.view(types[k][nev_data_types[k]])[masks[k][v]] return data
def __get_nev_rec_times(self): """ Extracts minimum and maximum time points from a nev file. """ filename = '.'.join([self._filenames['nev'], 'nev']) dt = [('timestamp', 'uint32')] offset = \ self.__get_file_size(filename) - \ self.__nev_params('bytes_in_data_packets') last_data_packet = np.memmap(filename, offset=offset, dtype=dt)[0] n_starts = [0 * self.__nev_params('event_unit')] n_stops = [ last_data_packet['timestamp'] * self.__nev_params('event_unit')] return n_starts, n_stops
def __init__(self, path, x_width = 0, x_type = np.float, y_width = 0, y_type = types.int_): if os.path.exists(path + "/dataset.json"): print("Using existing dataset in "+path) self.load(path) else: if x_width == 0 : raise "X width must be specified for new dataset" self.X = np.memmap(path + "/X.npy", x_type, "w+", 0, (1, x_width)) self.X.flush() if y_width > 0: self.Y = np.memmap(path + "/Y.npy", y_type, "w+", 0, (1, y_width)) self.Y.flush() else: self.Y = None self.index = None self.nrows = 0 self.running_mean = np.zeros((1, x_width), x_type) self.running_dev = np.zeros((1, x_width), x_type) self.running_max = np.zeros((1, x_width), x_type) self.running_min = np.zeros((1, x_width), x_type) self.path = path
def load(self, path): metadata = json.loads(open(path + "/dataset.json").read()) self.index = np.array(metadata["index"]) x_shape = tuple(metadata["x_shape"]) x_type = metadata["x_type"] if "y_shape" in metadata: y_shape = tuple(metadata["y_shape"]) y_type = metadata["y_type"] self.Y = np.memmap(path+"/Y.npy", y_type, shape = y_shape) else: self.Y = None self.nrows = x_shape[0] self.running_mean = np.asarray(metadata["running_mean"]) self.running_dev = np.asarray(metadata["running_dev"]) self.running_max = np.asarray(metadata["running_min"]) self.running_min = np.asarray(metadata["running_max"]) self.X = np.memmap(path+"/X.npy", x_type, shape = x_shape) self.path = path
def add(self, x, y = None): self.X = np.memmap( self.path+"/X.npy", self.X.dtype, shape = (self.nrows + x.shape[0] , x.shape[1]) ) self.X[self.nrows:self.nrows + x.shape[0],:] = x if y is not None: if x.shape != y.shape: raise "x and y should have the same shape" self.Y = np.memmap( self.path+"/Y.npy", self.Y.dtype, shape = (self.nrows + y.shape[0] , y.shape[1]) ) self.Y[self.nrows:self.nrows + y.shape[0],:] = y delta = x - self.running_mean n = self.X.shape[0] + np.arange(x.shape[0]) + 1 self.running_dev += np.sum(delta * (x - self.running_mean), 0) self.running_mean += np.sum(delta / n[:, np.newaxis], 0) self.running_max = np.amax(np.vstack((self.running_max, x)), 0) self.running_min = np.amin(np.vstack((self.running_min, x)), 0) self.nrows += x.shape[0]
def asarray(self, memmap=False, *args, **kwargs): """Read image data from all files and return as single numpy array. If memmap is True, return an array stored in a binary file on disk. The args and kwargs parameters are passed to the imread function. Raise IndexError or ValueError if image shapes don't match. """ im = self.imread(self.files[0], *args, **kwargs) shape = self.shape + im.shape if memmap: with tempfile.NamedTemporaryFile() as fh: result = numpy.memmap(fh, dtype=im.dtype, shape=shape) else: result = numpy.zeros(shape, dtype=im.dtype) result = result.reshape(-1, *im.shape) for index, fname in zip(self._indices, self.files): index = [i-j for i, j in zip(index, self._start_index)] index = numpy.ravel_multi_index(index, self.shape) im = self.imread(fname, *args, **kwargs) result[index] = im result.shape = shape return result
def stack_pages(pages, memmap=False, *args, **kwargs): """Read data from sequence of TiffPage and stack them vertically. If memmap is True, return an array stored in a binary file on disk. Additional parameters are passsed to the page asarray function. """ if len(pages) == 0: raise ValueError("no pages") if len(pages) == 1: return pages[0].asarray(memmap=memmap, *args, **kwargs) result = pages[0].asarray(*args, **kwargs) shape = (len(pages),) + result.shape if memmap: with tempfile.NamedTemporaryFile() as fh: result = numpy.memmap(fh, dtype=result.dtype, shape=shape) else: result = numpy.empty(shape, dtype=result.dtype) for i, page in enumerate(pages): result[i] = page.asarray(*args, **kwargs) return result
def flush(self): """ Write any changes in the array to the file on disk. For further information, see `memmap`. Parameters ---------- None See Also -------- memmap """ if self.base is not None and hasattr(self.base, 'flush'): self.base.flush()
def load_memory_map_dir(directory: str) -> Embeddings: """ Loads embeddings from a memory map directory to allow lazy loading (and reduce the memory usage). Args: directory: a file prefix. This function loads two files in the directory: a meta json file with shape information and the vocabulary, and the actual memory map file. Returns: Embeddings object with a lookup matrix that is backed by a memory map. """ meta_file = os.path.join(directory, "meta.json") mem_map_file = os.path.join(directory, "memory_map") with open(meta_file, "r") as f: meta = json.load(f) shape = tuple(meta['shape']) vocab = meta['vocab'] mem_map = np.memmap(mem_map_file, dtype='float32', mode='r+', shape=shape) result = Embeddings(vocab, mem_map, filename=directory, emb_format="memory_map_dir") return result
def save_as_memory_map_dir(directory: str, emb: Embeddings): """ Saves the given embeddings as memory map file and corresponding meta data in a directory. Args: directory: the directory to store the memory map file in (called `memory_map`) and the meta file (called `meta.json` that stores the shape of the memory map and the actual vocabulary. emb: the embeddings to store. """ if not os.path.exists(directory): os.makedirs(directory) meta_file = os.path.join(directory, "meta.json") mem_map_file = os.path.join(directory, "memory_map") with open(meta_file, "w") as f: json.dump({ "vocab": emb.vocabulary, "shape": emb.shape }, f) mem_map = np.memmap(mem_map_file, dtype='float32', mode='w+', shape=emb.shape) mem_map[:] = emb.lookup[:] mem_map.flush() del mem_map
def write_sampled(datfile, data, sampling_rate, **params): """Writes a sampled dataset to disk as a raw binary file, plus a meta file. Args: datfile (str): path to file to write to. If the file exists, it is overwritten. data (sequence): time series data of at most 2 dimensions sampling_rate (int or float): sampling rate of `data` **params: all other keyword arguments are treated as dataset attributes, and added to the meta file Returns: SampledData: sampled dataset containing `data` """ if 'columns' not in params: params['columns'] = sampled_columns(data) params["dtype"] = data.dtype.str shape = data.shape mdata = np.memmap(datfile, dtype=params["dtype"], mode="w+", shape=shape) mdata[:] = data[:] write_metadata(datfile, sampling_rate=sampling_rate, **params) params['sampling_rate'] = sampling_rate return SampledData(mdata, datfile, params)
def read_sampled(datfile, mode="r"): """Loads raw binary file and associated metadata into a sampled dataset. Args: datfile (str): path to raw binary file to read from mode: may be "r" or "r+"; use "r+" for modifying the data (not recommended) Returns: SampledData: sampled dataset containing `datfile`'s data """ path = os.path.abspath(datfile) params = read_metadata(datfile) try: data = np.memmap(datfile, dtype=params["dtype"], mode=mode) except ValueError: data = np.array([]) data = data.reshape(-1, len(params['columns'])) return SampledData(data, path, params)
def load_data(fname): n = 4543 size = int(fname.split('_')[0]) X_fname = 'cache/X_%s.npy' % fname y_fname = 'cache/pts_%s.npy' % fname X_shape = (n, 3, size, size) y_shape = (n, 4) X = np.memmap(X_fname, dtype=np.float32, mode='r', shape=X_shape) y = np.memmap(y_fname, dtype=np.int32, mode='r', shape=y_shape) y = y.astype(np.float32) y = y / size return X, y
def load_data(fname, data_grey=False): n = 6925 size = int(fname.split('_')[0]) if data_grey: X_fname = 'cache/X_test_grey_%s.npy' % fname else: X_fname = 'cache/X_test_%s.npy' % fname num_channels = 1 if data_grey else 3 X_shape = (n, num_channels, size, size) print 'Load test data from %s' % X_fname X = np.memmap(X_fname, dtype=np.float32, mode='r', shape=X_shape) return X
def load_data(fname): n = 4543 size = int(fname.split('_')[0]) X_fname = 'cache/X_%s.npy' % fname y_fname = 'cache/bbox_%s.npy' % fname X_shape = (n, 3, size, size) y_shape = (n, 4) X = np.memmap(X_fname, dtype=np.float32, mode='r', shape=X_shape) y = np.memmap(y_fname, dtype=np.int32, mode='r', shape=y_shape) y = y.astype(np.float32) y = y / size return X, y
def walk(self, size): if self.eof: return None end_point = self.offset + 4 * size assert end_point <= self.size, \ 'Over-read {}'.format(self.path) float32_1D_array = np.memmap( self.path, shape = (), mode = 'r', offset = self.offset, dtype='({})float32,'.format(size) ) self.offset = end_point if end_point == self.size: self.eof = True return float32_1D_array
def load_vectors_mmaped(VECTORS_FILE, NOFTYPES, D, UPDATE_VECTORS): if (not os.path.exists('vectors.mymemmap')) or UPDATE_VECTORS == 1: v = np.memmap('vectors.mymemmap', dtype='float', mode='w+', shape=(NOFTYPES, D)) i = 0 # Showing percentage to user limit = 100000 with open(VECTORS_FILE, 'r') as f: for line in f: if i >= limit: print(limit/float(NOFTYPES)*100) limit += 100000 line = line.strip() if len(line) > 0: v[i][:] = map(float, line.split(' ')) i += 1 else: v = np.memmap('vectors.mymemmap', dtype='float', mode='r', shape = (NOFTYPES, D)) return v
def __init__(self, hash_name='md5', coerce_mmap=False): """ Parameters ---------- hash_name: string The hash algorithm to be used coerce_mmap: boolean Make no difference between np.memmap and np.ndarray objects. """ self.coerce_mmap = coerce_mmap Hasher.__init__(self, hash_name=hash_name) # delayed import of numpy, to avoid tight coupling import numpy as np self.np = np if hasattr(np, 'getbuffer'): self._getbuffer = np.getbuffer else: self._getbuffer = memoryview
def hash(obj, hash_name='md5', coerce_mmap=False): """ Quick calculation of a hash to identify uniquely Python objects containing numpy arrays. Parameters ----------- hash_name: 'md5' or 'sha1' Hashing algorithm used. sha1 is supposedly safer, but md5 is faster. coerce_mmap: boolean Make no difference between np.memmap and np.ndarray """ if 'numpy' in sys.modules: hasher = NumpyHasher(hash_name=hash_name, coerce_mmap=coerce_mmap) else: hasher = Hasher(hash_name=hash_name) return hasher.hash(obj)
def get_mmap(X): """ converts a numpy array to a numpy memmory mapped array """ #TODO: use tempfile.NamedTemporaryFile if type(X) is np.core.memmap: return X fid = 0 filename = mmap_base+"data"+str(fid)+".dat" for i in range(max_mmap_files): if os.path.isfile(filename): fid += 1 filename = mmap_base+"data"+str(fid)+".dat" else: break _X = np.memmap(filename, dtype='float64', mode='w+', shape=X.shape) _X[:] = X[:] del X import gc gc.collect() return _X
def append(self, array): """Append data from `array` to self.""" if self.closed: raise ValueError('Array is not opened.') if not self.initialized: self.init_from_array(array) if array.shape[1:] != self.shape[1:]: raise ValueError("Appended array is of different shape.") elif array.dtype != self.dtype: raise ValueError("Appended array is of different dtype.") # Append new data pos = self.header_length + self.size * self.itemsize self.fs.seek(pos) self.fs.write(array.tobytes('C')) self.shape = (self.shape[0] + len(array), ) + self.shape[1:] # Only prepare the header bytes, need to be flushed to take effect self._prepare_header_data() # Invalidate the memmap self._memmap = None
def shmem_client_send_env_id(self): """ Multiplayer Scene can support multiple kinds of environments (robots, actors). For example, Stadium supports Hopper and Ant. On server side, environment of the same type should be created. To do that, we send env_id over pipe. Obervations, actions must have size matching that on server. So we open shared memory files at this point, after server created those files based on knowledge it now has, and sent "accepted" back here. """ os.write(self.sh_pipe_actready, (self.spec.id + "\n").encode("ascii")) check = self.sh_pipe_obsready.readline()[:-1] assert(check=="accepted") self.sh_obs = np.memmap(self.prefix + "_obs", mode="r+", shape=self.observation_space.shape, dtype=np.float32) self.sh_act = np.memmap(self.prefix + "_act", mode="r+", shape=self.action_space.shape, dtype=np.float32) self.sh_rew = np.memmap(self.prefix + "_rew", mode="r+", shape=(1,), dtype=np.float32) self.sh_rgb = np.memmap(self.prefix + "_rgb", mode="r+", shape=(self.VIDEO_H,self.VIDEO_W,3), dtype=np.uint8)
def read_env_id_and_create_env(self): self.sh_pipe_actready = open(self.sh_pipe_actready_filename, "rt") self.sh_pipe_obsready = os.open(self.sh_pipe_obsready_filename, os.O_WRONLY) env_id = self.sh_pipe_actready.readline()[:-1] if env_id.find("-v")==-1: raise ValueError("multiplayer client %s sent here invalid environment id '%s'" % (self.prefix, env_id)) # # And at this point we know env_id. # print("Player %i connected, wants to operate %s in this scene" % (self.player_n, env_id)) self.env = gym.make(env_id) # gym.make() creates at least timeout wrapper, we need it. self.env.unwrapped.scene = self.scene self.env.unwrapped.player_n = self.player_n assert isinstance(self.env.observation_space, gym.spaces.Box) assert isinstance(self.env.action_space, gym.spaces.Box) self.sh_obs = np.memmap(self.prefix + "_obs", mode="w+", shape=self.env.observation_space.shape, dtype=np.float32) self.sh_act = np.memmap(self.prefix + "_act", mode="w+", shape=self.env.action_space.shape, dtype=np.float32) self.sh_rew = np.memmap(self.prefix + "_rew", mode="w+", shape=(1,), dtype=np.float32) self.sh_rgb = np.memmap(self.prefix + "_rgb", mode="w+", shape=(self.env.unwrapped.VIDEO_H,self.env.unwrapped.VIDEO_W,3), dtype=np.uint8) os.write(self.sh_pipe_obsready, b'accepted\n')
def setSavePathFile(self, save=False, path_result=None): a = np.zeros((max(1,self.zones), 1, 2), dtype=np.int32) if save: if path_result is None: warnings.warn("Path file not set properly. Need to specify output file too") else: if path_result[-3:].lower() != 'aep': dictio_name = path_result + '.aed' path_result += '.aep' else: dictio_name = path_result[:-3] + 'aed' if self.nodes > 0 and self.zones > 0: a = np.memmap(path_result, dtype=np.int32, mode='w+', shape=(self.zones,self.nodes, 2)) saveDataFileDictionary(self.__graph_id__,'path file', [int(x) for x in a.shape[:]], dictio_name) self.path_file = {'save': save, 'results': a }
def save_portion(pars): big_mov,d,tot_frames,fnames,idx_start,idx_end=pars big_mov = np.memmap(big_mov, mode='r+', dtype=np.float32,shape=(d, tot_frames), order='C') Ttot=0 Yr_tot=np.zeros((idx_end-idx_start,tot_frames)) print Yr_tot.shape for f in fnames: print f Yr,dims,T=load_memmap(f) print idx_start,idx_end Yr_tot[:,Ttot:Ttot+T]=np.array(Yr[idx_start:idx_end]) Ttot=Ttot+T del Yr big_mov[idx_start:idx_end,:]=Yr_tot del Yr_tot print 'done' del big_mov return Ttot #%% #%%
def __init__(self, maxlen, input_shape, action_size): self.maxlen = maxlen dirname = tempfile.mkdtemp() #use memory maps so we won't have to worry about eating up lots of RAM get_path = lambda name: os.path.join(dirname, name) self.screens = np.memmap(get_path('screens'), dtype=np.float32, mode='w+', shape=tuple([self.maxlen]+input_shape)) self.actions = np.memmap(get_path('actions'), dtype=np.float32, mode='w+', shape=(self.maxlen, action_size)) self.rewards = np.memmap(get_path('rewards'), dtype=np.float32, mode='w+', shape=(self.maxlen,)) self.is_terminal = np.memmap(get_path('terminals'), dtype=np.bool, mode='w+', shape=(self.maxlen,)) self.position = 0 self.full = False # def _get_states(batch): # s = list() # for i in xrange(-3, 2): # s.append(self.screens[batch+i]) # return np.vstack(s[:-1]), np.vstack(s[1:])
def _strided_from_memmap(filename, dtype, mode, offset, order, shape, strides, total_buffer_len): """Reconstruct an array view on a memory mapped file.""" if mode == 'w+': # Do not zero the original data when unpickling mode = 'r+' if strides is None: # Simple, contiguous memmap return make_memmap(filename, dtype=dtype, shape=shape, mode=mode, offset=offset, order=order) else: # For non-contiguous data, memmap the total enclosing buffer and then # extract the non-contiguous view with the stride-tricks API base = make_memmap(filename, dtype=dtype, shape=total_buffer_len, mode=mode, offset=offset, order=order) return as_strided(base, shape=shape, strides=strides)
def _gen_prediction_array(self, task, job, threading): """Generate prediction array either in-memory or persist to disk.""" shape = task.shape(job) if threading: self.job.predict_out = np.empty(shape, dtype=_dtype(task)) else: f = os.path.join(self.job.dir, '%s_out_array.mmap' % task.name) try: self.job.predict_out = np.memmap( filename=f, dtype=_dtype(task), mode='w+', shape=shape) except Exception as exc: raise OSError( "Cannot create prediction matrix of shape (" "%i, %i), size %i MBs, for %s.\n Details:\n%r" % (shape[0], shape[1], 8 * shape[0] * shape[1] / (1024 ** 2), task.name, exc))
def load_raw(path, prompt_info=None): try: info = load_info(path) except FileNotFoundError as error: if prompt_info is None: raise error else: result = prompt_info() if result is None: return else: info, save = result info = [info] if save: base, ext = _ospath.splitext(path) info_path = base + '.yaml' save_info(info_path, info) dtype = _np.dtype(info[0]['Data Type']) shape = (info[0]['Frames'], info[0]['Height'], info[0]['Width']) movie = _np.memmap(path, dtype, 'r', shape=shape) if info[0]['Byte Order'] != '<': movie = movie.byteswap() info[0]['Byte Order'] = '<' return movie, info
def _init_in_memory_chunks(self, size): available_mem = psutil.virtual_memory().available required_mem = self._calculate_required_memory(size) if required_mem <= available_mem: self._in_memory_chunks = np.empty(shape=(size, self.data_producer.dimension()), order='C', dtype=np.float32) else: if self.oom_strategy == 'raise': self.logger.warning('K-means failed to load all the data (%s required, %s available) into memory. ' 'Consider using a larger stride or set the oom_strategy to \'memmap\' which works ' 'with a memmapped temporary file.' % (bytes_to_string(required_mem), bytes_to_string(available_mem))) raise MemoryError() else: self.logger.warning('K-means failed to load all the data (%s required, %s available) into memory ' 'and now uses a memmapped temporary file which is comparably slow. ' 'Consider using a larger stride.' % (bytes_to_string(required_mem), bytes_to_string(available_mem))) self._in_memory_chunks = np.memmap(tempfile.mkstemp()[1], mode="w+", shape=(size, self.data_producer.dimension()), order='C', dtype=np.float32)
def __init__(self, n_clusters, max_iter=5, metric='euclidean', tolerance=1e-5, init_strategy='kmeans++', batch_size=0.2, oom_strategy='memmap', fixed_seed=False, stride=None, n_jobs=None, skip=0): if stride is not None: raise ValueError("stride is a dummy value in MiniBatch Kmeans") if batch_size > 1: raise ValueError("batch_size should be less or equal to 1, but was %s" % batch_size) self._cluster_centers_iter = None self._centers_iter_list = [] super(MiniBatchKmeansClustering, self).__init__(n_clusters, max_iter, metric, tolerance, init_strategy, False, oom_strategy, stride=stride, n_jobs=n_jobs, skip=skip) self.set_params(batch_size=batch_size)