我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用numpy.frombuffer()。
def extract_images(filename): """Extract the images into a 4D uint8 numpy array [index, y, x, depth].""" print('Extracting', filename) with gzip.open(filename) as bytestream: magic = _read32(bytestream) if magic != 2051: raise ValueError( 'Invalid magic number %d in MNIST image file: %s' % (magic, filename)) num_images = _read32(bytestream) rows = _read32(bytestream) cols = _read32(bytestream) buf = bytestream.read(rows * cols * num_images) data = numpy.frombuffer(buf, dtype=numpy.uint8) data = data.reshape(num_images, rows, cols, 1) return data
def test_tile_symmetry(self): ''' Make sure that tiles are symmetric ''' upload_file = open('data/Dixon2012-J1-NcoI-R1-filtered.100kb.multires.cool', 'rb') tileset = tm.Tileset.objects.create( datafile=dcfu.SimpleUploadedFile(upload_file.name, upload_file.read()), filetype='cooler', datatype='matrix', owner=self.user1, uuid='aa') ret = self.client.get('/api/v1/tiles/?d=aa.0.0.0') contents = json.loads(ret.content.decode('utf-8')) import base64 r = base64.decodestring(contents['aa.0.0.0']['dense'].encode('utf-8')) q = np.frombuffer(r, dtype=np.float16) q = q.reshape((256,256))
def unpack_packet(stream,vertex_type,size): # The entire packet is read into memory at once for speed packet = stream.read(size) primitives = [] i = 0 while i < size: opcode = packet[i] if opcode == 0x00: i += 1 continue primitive_type = gx.PrimitiveType(opcode) vertex_count = uint16.unpack_from(packet,i + 1) vertices = numpy.frombuffer(packet,vertex_type,vertex_count,i + 3) primitives.append(Primitive(primitive_type,vertices)) i += 3 + vertex_count*vertex_type.itemsize return primitives
def load_bytes(self, data_blocks, dtype='<i1', start=None, end=None, expected_size=None): """ Return list of bytes contained in the specified set of blocks. NB : load all data as files cannot exceed 4Gb find later other solutions to spare memory. """ chunks = list() raw = '' # keep only data blocks having # a size greater than zero blocks = [k for k in data_blocks if k.size > 0] for data_block in blocks : self.file.seek(data_block.start) raw = self.file.read(data_block.size)[0:expected_size] databytes = np.frombuffer(raw, dtype=dtype) chunks.append(databytes) # concatenate all chunks and return # the specified slice if len(chunks)>0 : databytes = np.concatenate(chunks) return databytes[start:end] else : return np.array([])
def load_channel_data(self, ep, ch): """ Return a numpy array containing the list of bytes corresponding to the specified episode and channel. """ #memorise the sample size and symbol sample_size = self.sample_size(ep, ch) sample_symbol = self.sample_symbol(ep, ch) #create a bit mask to define which #sample to keep from the file bit_mask = self.create_bit_mask(ep, ch) #load all bytes contained in an episode data_blocks = self.get_data_blocks(ep) databytes = self.load_bytes(data_blocks) raw = self.filter_bytes(databytes, bit_mask) #reshape bytes from the sample size dt = np.dtype(numpy_map[sample_symbol]) dt.newbyteorder('<') return np.frombuffer(raw.reshape([len(raw) / sample_size, sample_size]), dt)
def load_encoded_spikes(self, episode, evt_channel, identifier): """ Return times stored as a 4-bytes integer in the specified spike channel. NB: it is meant for Blackrock-type, having an additional byte for each event time as spike sorting label. These additiona bytes are appended trailing the times. """ # to load the requested spikes for the specified episode and event channel: # get all the elphy blocks having as identifier 'RSPK' (or whatever) all_rspk_blocks = [k for k in self.blocks if k.identifier == identifier] rspk_block = all_rspk_blocks[episode-1] # RDATA(h?dI) REVT(NbVeV:I, NbEv:256I ... spike data are 4byte integers rspk_header = 4*( rspk_block.size - rspk_block.data_size-2 + len(rspk_block.n_events)) pre_events = np.sum(rspk_block.n_events[0:evt_channel-1], dtype=int, axis=0) # the real start is after header, preceeding events (which are 4byte) and preceeding labels (1byte) start = rspk_header + (4*pre_events) + pre_events end = start + 4*rspk_block.n_events[evt_channel-1] raw = self.load_bytes( [rspk_block], dtype='<i1', start=start, end=end, expected_size=rspk_block.size ) # re-encoding after reading byte by byte res = np.frombuffer(raw[0:(4*rspk_block.n_events[evt_channel-1])], dtype='<i4') res.sort() # sometimes timings are not sorted #print "load_encoded_data() - spikes:",res return res
def extract_images(filename): """Extract the images into a 4D uint8 numpy array [index, y, x, depth].""" log = logger.get() log.info('Extracting {}'.format(filename)) with gzip.open(filename) as bytestream: magic = _read32(bytestream) if magic != 2051: raise ValueError( 'Invalid magic number %d in MNIST image file: %s' % (magic, filename)) num_images = _read32(bytestream) rows = _read32(bytestream) cols = _read32(bytestream) buf = bytestream.read(rows * cols * num_images) data = numpy.frombuffer(buf, dtype=numpy.uint8) data = data.reshape(num_images, rows, cols, 1) return data
def extract_labels(filename, one_hot=False): """Extract the labels into a 1D uint8 numpy array [index].""" log = logger.get() log.info('Extracting {}'.format(filename)) with gzip.open(filename) as bytestream: magic = _read32(bytestream) if magic != 2049: raise ValueError( 'Invalid magic number %d in MNIST label file: %s' % (magic, filename)) num_items = _read32(bytestream) buf = bytestream.read(num_items) labels = numpy.frombuffer(buf, dtype=numpy.uint8) if one_hot: return dense_to_one_hot(labels) return labels
def read_pgm(filename, byteorder='>'): """Return image data from a raw PGM file as numpy array. Format specification: http://netpbm.sourceforge.net/doc/pgm.html """ with open(filename, 'rb') as f: buffer = f.read() try: header, width, height, maxval = re.search( b"(^P5\s(?:\s*#.*[\r\n])*" b"(\d+)\s(?:\s*#.*[\r\n])*" b"(\d+)\s(?:\s*#.*[\r\n])*" b"(\d+)\s(?:\s*#.*[\r\n]\s)*)", buffer).groups() except AttributeError: raise ValueError("Not a raw PGM file: '%s'" % filename) return np.frombuffer(buffer, dtype='u1' if int(maxval) < 256 else byteorder + 'u2', count=int(width) * int(height), offset=len(header) ).reshape((int(height), int(width)))
def extract_images(filename, verbose=True): """Extract the images into a 4D uint8 numpy array [index, y, x, depth].""" if verbose: print('Extracting', filename) with gzip.open(filename) as bytestream: magic = _read32(bytestream) if magic != 2051: raise ValueError( 'Invalid magic number %d in MNIST image file: %s' % (magic, filename)) num_images = _read32(bytestream) rows = _read32(bytestream) cols = _read32(bytestream) buf = bytestream.read(rows * cols * num_images) data = np.frombuffer(buf, dtype=np.uint8) data = data.reshape(num_images, rows, cols, 1) return data
def test_buffer_numpy(self): """test non-copying numpy array messages""" try: import numpy except ImportError: raise SkipTest("numpy required") rand = numpy.random.randint shapes = [ rand(2,16) for i in range(5) ] for i in range(1,len(shapes)+1): shape = shapes[:i] A = numpy.random.random(shape) m = zmq.Frame(A) if view.__name__ == 'buffer': self.assertEqual(A.data, m.buffer) B = numpy.frombuffer(m.buffer,dtype=A.dtype).reshape(A.shape) else: self.assertEqual(memoryview(A), m.buffer) B = numpy.array(m.buffer,dtype=A.dtype).reshape(A.shape) self.assertEqual((A==B).all(), True)
def extract_images(filename): """Extract the images into a 4D uint8 numpy array [index, y, x, depth].""" print('Extracting', filename) with tf.gfile.Open(filename, 'rb') as f, gzip.GzipFile(fileobj=f) as bytestream: magic = _read32(bytestream) if magic != 2051: raise ValueError( 'Invalid magic number %d in MNIST image file: %s' % (magic, filename)) num_images = _read32(bytestream) rows = _read32(bytestream) cols = _read32(bytestream) buf = bytestream.read(rows * cols * num_images) data = numpy.frombuffer(buf, dtype=numpy.uint8) data = data.reshape(num_images, rows, cols, 1) return data
def pickle_transitions_matrix_data(): transitions = pickle.load( open( "/ssd/ddimitrov/pickle/transitions", "rb" ) ) vocab = pickle.load( open( "/ssd/ddimitrov/pickle/vocab", "rb" ) ) i_indices = array.array(str("l")) j_indices = array.array(str("l")) values = array.array(str("d")) for s, targets in transitions.iteritems(): for t, v in targets.iteritems(): i_indices.append(vocab[s]) j_indices.append(vocab[t]) values.append(v) i_indices = np.frombuffer(i_indices, dtype=np.int_) j_indices = np.frombuffer(j_indices, dtype=np.int_) values = np.frombuffer(values, dtype=np.float64) transition_matrix=[i_indices,j_indices,values] pickle.dump(transition_matrix, open("/ssd/ddimitrov/pickle/transition_matrix", "wb"), protocol=pickle.HIGHEST_PROTOCOL) print "transition_matrix"
def _read_datafile(self, path, expected_dims): """Helper function to read a file in IDX format.""" base_magic_num = 2048 with gzip.GzipFile(path) as f: magic_num = struct.unpack('>I', f.read(4))[0] expected_magic_num = base_magic_num + expected_dims if magic_num != expected_magic_num: raise ValueError('Incorrect MNIST magic number (expected ' '{}, got {})' .format(expected_magic_num, magic_num)) dims = struct.unpack('>' + 'I' * expected_dims, f.read(4 * expected_dims)) buf = f.read(reduce(operator.mul, dims)) data = np.frombuffer(buf, dtype=np.uint8) data = data.reshape(*dims) return data
def _ungzip(save_path, extract_path, database_name, _): """ Unzip a gzip file and extract it to extract_path :param save_path: The path of the gzip files :param extract_path: The location to extract the data to :param database_name: Name of database :param _: HACK - Used to have to same interface as _unzip """ # Get data from save_path with open(save_path, 'rb') as f: with gzip.GzipFile(fileobj=f) as bytestream: magic = _read32(bytestream) if magic != 2051: raise ValueError('Invalid magic number {} in file: {}'.format(magic, f.name)) num_images = _read32(bytestream) rows = _read32(bytestream) cols = _read32(bytestream) buf = bytestream.read(rows * cols * num_images) data = np.frombuffer(buf, dtype=np.uint8) data = data.reshape(num_images, rows, cols) # Save data to extract_path for image_i, image in enumerate( tqdm(data, unit='File', unit_scale=True, miniters=1, desc='Extracting {}'.format(database_name))): Image.fromarray(image, 'L').save(os.path.join(extract_path, 'image_{}.jpg'.format(image_i)))
def receive_data(self, channel, oc): # push data from a socket into an OutputConnector (oc) self.last_timestamp = datetime.datetime.now() # wire format is just: [size, buffer...] sock = self._chan_to_rsocket[channel] # TODO receive 4 or 8 bytes depending on sizeof(size_t) msg = sock.recv(8) # reinterpret as int (size_t) msg_size = struct.unpack('n', msg)[0] buf = sock.recv(msg_size, socket.MSG_WAITALL) if len(buf) != msg_size: logger.error("Channel %s socket msg shorter than expected" % channel.channel) logger.error("Expected %s bytes, received %s bytes" % (msg_size, len(buf))) # assume that we cannot recover, so stop listening. loop = asyncio.get_event_loop() loop.remove_reader(sock) return data = np.frombuffer(buf, dtype=channel.dtype) asyncio.ensure_future(oc.push(data))
def receive_data(self, channel, oc): # push data from a socket into an OutputConnector (oc) self.last_timestamp = datetime.datetime.now() self.fetch_count += 1 # wire format is just: [size, buffer...] sock = self._chan_to_rsocket[channel] # TODO receive 4 or 8 bytes depending on sizeof(size_t) msg = sock.recv(8) # reinterpret as int (size_t) msg_size = struct.unpack('n', msg)[0] buf = sock.recv(msg_size, socket.MSG_WAITALL) if len(buf) != msg_size: logger.error("Channel %s socket msg shorter than expected" % channel.channel) logger.error("Expected %s bytes, received %s bytes" % (msg_size, len(buf))) # assume that we cannot recover, so stop listening. loop = asyncio.get_event_loop() loop.remove_reader(sock) return data = np.frombuffer(buf, dtype=np.float32) asyncio.ensure_future(oc.push(data))
def loop(self): while self.running: evts = dict(self.poller.poll(50)) if self.socket in evts and evts[self.socket] == zmq.POLLIN: msg = self.socket.recv_multipart() msg_type = msg[0].decode() name = msg[1].decode() if msg_type == "done": self.finished.emit(True) elif msg_type == "data": result = [name] # How many pairs of metadata and data are there? num_arrays = int((len(msg) - 2)/2) for i in range(num_arrays): md, data = msg[2+2*i:4+2*i] md = json.loads(md.decode()) A = np.frombuffer(data, dtype=md['dtype']) result.append(A) self.message.emit(tuple(result)) self.socket.close()
def _reads(self, addr, length): if length > 65535: length = 65535 self.logger.warning("Maximum read-length is %d", length) header = b'r' + bytes(bytearray([0, length & 0xFF, (length >> 8) & 0xFF, addr & 0xFF, (addr >> 8) & 0xFF, (addr >> 16) & 0xFF, (addr >> 24) & 0xFF])) self.socket.send(header) data = self.socket.recv(length * 4 + 8) while (len(data) < length * 4 + 8): data += self.socket.recv(length * 4 - len(data) + 8) if data[:8] == header: # check for in-sync transmission return np.frombuffer(data[8:], dtype=np.uint32) else: # error handling self.logger.error("Wrong control sequence from server: %s", data[:8]) self.emptybuffer() return None
def read(self): """Return audio file as array of integer. Returns: audio_data: np.ndarray, shape of (frame_num,) """ # Read wav file with wave.open(self.file_path, "r") as wav: # Move to head of the audio file wav.rewind() self.frame_num = wav.getnframes() self.sampling_rate = wav.getframerate() # 16,000 Hz self.channels = wav.getnchannels() self.sample_size = wav.getsampwidth() # 2 # Read to buffer as binary format buf = wav.readframes(self.frame_num) if self.channels == 1: audio_data = np.frombuffer(buf, dtype="int16") elif self.channels == 2: audio_data = np.frombuffer(buf, dtype="int32") return audio_data
def make_array(shape=(1,), dtype=np.float32, shared=False, fill_val=None): np_type_to_ctype = {np.float32: ctypes.c_float, np.float64: ctypes.c_double, np.bool: ctypes.c_bool, np.uint8: ctypes.c_ubyte, np.uint64: ctypes.c_ulonglong} if not shared: np_arr = np.empty(shape, dtype=dtype) else: numel = np.prod(shape) arr_ctypes = sharedctypes.RawArray(np_type_to_ctype[dtype], numel) np_arr = np.frombuffer(arr_ctypes, dtype=dtype, count=numel) np_arr.shape = shape if not fill_val is None: np_arr[...] = fill_val return np_arr
def __call__(self, bytez): output = np.zeros((16, 16), dtype=np.int) a = np.frombuffer(bytez, dtype=np.uint8) if a.shape[0] < self.window: Hbin, c = self._entropy_bin_counts(a) output[Hbin, :] += c else: # strided trick from here: http://www.rigtorp.se/2011/01/01/rolling-statistics-numpy.html shape = a.shape[:-1] + (a.shape[-1] - self.window + 1, self.window) strides = a.strides + (a.strides[-1],) blocks = np.lib.stride_tricks.as_strided( a, shape=shape, strides=strides)[::self.step, :] # from the blocks, compute histogram for block in blocks: Hbin, c = self._entropy_bin_counts(block) output[Hbin, :] += c return output.flatten().astype(self.dtype)
def _unpack_ndarray(cls, buf, offset): dtype, offset = cls._unpack_string(buf, offset) shape_len = unpack_from("I", buf, offset)[0] offset += 4 shape = [] for i in range(0, shape_len): item = unpack_from("I", buf, offset)[0] offset += 4 shape.append(item) reslen = unpack_from("I", buf, offset)[0] offset += 4 res = np.frombuffer(buf[offset:offset+reslen], dtype=np.dtype(dtype)) res = res.reshape(shape) offset += reslen return res, offset
def _extract_images(filename, num_images): """Extract the images into a numpy array. Args: filename: The path to an MNIST images file. num_images: The number of images in the file. Returns: A numpy array of shape [number_of_images, height, width, channels]. """ print('Extracting images from: ', filename) with gzip.open(filename) as bytestream: bytestream.read(16) buf = bytestream.read( _IMAGE_SIZE * _IMAGE_SIZE * num_images * _NUM_CHANNELS) data = np.frombuffer(buf, dtype=np.uint8) data = data.reshape(num_images, _IMAGE_SIZE, _IMAGE_SIZE, _NUM_CHANNELS) return data
def _extract_labels(filename, num_labels): """Extract the labels into a vector of int64 label IDs. Args: filename: The path to an MNIST labels file. num_labels: The number of labels in the file. Returns: A numpy array of shape [number_of_labels] """ print('Extracting labels from: ', filename) with gzip.open(filename) as bytestream: bytestream.read(8) buf = bytestream.read(1 * num_labels) labels = np.frombuffer(buf, dtype=np.uint8).astype(np.int64) return labels
def read_pgm(filename, byteorder='>'): """Return image data from a raw PGM file as numpy array. Format specification: http://netpbm.sourceforge.net/doc/pgm.html """ with open(filename, 'rb') as f: buffer = f.read() try: header, width, height, maxval = re.search( b"(^P5\s(?:\s*#.*[\r\n])*" b"(\d+)\s(?:\s*#.*[\r\n])*" b"(\d+)\s(?:\s*#.*[\r\n])*" b"(\d+)\s(?:\s*#.*[\r\n]\s)*)", buffer).groups() except AttributeError: raise ValueError("Not a raw PGM file: '%s'" % filename) return np.frombuffer(buffer, dtype='u1' if int(maxval) < 256 else byteorder+'u2', count=int(width)*int(height), offset=len(header) ).reshape((int(height), int(width)))
def mpraw_as_np(shape, dtype): """Construct a numpy array of the specified shape and dtype for which the underlying storage is a multiprocessing RawArray in shared memory. Parameters ---------- shape : tuple Shape of numpy array dtype : data-type Data type of array Returns ------- arr : ndarray Numpy array """ sz = int(np.product(shape)) csz = sz * np.dtype(dtype).itemsize raw = mp.RawArray('c', csz) return np.frombuffer(raw, dtype=dtype, count=sz).reshape(shape)
def native_to_builtin(value, native_type, data_count): '''Convert from a native EPICS DBR type to a builtin Python type Notes: - A waveform of characters is just a bytestring. - A waveform of strings is an array whose elements are fixed-length (40- character) strings. - Enums are just integers that happen to have special significance. - Everything else is, straightforwardly, an array of numbers. ''' if USE_NUMPY: # Return an ndarray dt = _numpy_map[native_type] if native_type == ChannelType.STRING and len(value) < MAX_STRING_SIZE: # caput behaves this way return numpy.frombuffer( bytes(value).ljust(MAX_STRING_SIZE, b'\x00'), dtype=dt) return numpy.frombuffer(value, dtype=dt) else: # TODO raise NotImplementedError("the non-numpy version has not been " "written yet")
def extract_images(filename): """Extract the images into a 4D uint8 numpy array [index, y, x, depth].""" print('Extracting %s' % filename) with gzip.open(filename) as bytestream: magic = _read32(bytestream) if magic != 2051: raise ValueError( 'Invalid magic number %d in MNIST image file: %s' % (magic, filename)) num_images = _read32(bytestream) rows = _read32(bytestream) cols = _read32(bytestream) buf = bytestream.read(rows * cols * num_images) data = numpy.frombuffer(buf, dtype=numpy.uint8) data = data.reshape(num_images, rows, cols, 1) return data
def test_history_recording_simple_model(self): """Test that history in memory matches with that recorded for test one-dimensional model.""" self.param, self.like = onedmodel() model = Model(self.like, self.param) step = Dream(model=model, model_name='test_history_recording') history_arr = mp.Array('d', [0]*4*step.total_var_dimension) n = mp.Value('i', 0) nchains = mp.Value('i', 3) pydream.Dream_shared_vars.history = history_arr pydream.Dream_shared_vars.count = n pydream.Dream_shared_vars.nchains = nchains test_history = np.array([[1], [3], [5], [7]]) for chainpoint in test_history: for point in chainpoint: step.record_history(nseedchains=0, ndimensions=step.total_var_dimension, q_new=point, len_history=len(history_arr)) history_arr_np = np.frombuffer(pydream.Dream_shared_vars.history.get_obj()) history_arr_np_reshaped = history_arr_np.reshape(np.shape(test_history)) self.assertIs(np.array_equal(history_arr_np_reshaped, test_history), True) remove('test_history_recording_DREAM_chain_history.npy') remove('test_history_recording_DREAM_chain_adapted_crossoverprob.npy') remove('test_history_recording_DREAM_chain_adapted_gammalevelprob.npy')
def test_history_recording_multidim_model(self): """Test that history in memory matches with that recorded for test multi-dimensional model.""" self.param, self.like = multidmodel() model = Model(self.like, self.param) dream = Dream(model=model, model_name='test_history_recording') history_arr = mp.Array('d', [0]*4*dream.total_var_dimension*3) n = mp.Value('i', 0) nchains = mp.Value('i', 3) pydream.Dream_shared_vars.history = history_arr pydream.Dream_shared_vars.count = n pydream.Dream_shared_vars.nchains = nchains test_history = np.array([[[1, 2, 3, 4], [3, 4, 5, 6], [5, 6, 7, 8]], [[7, 8, 9, 10], [9, 12, 18, 20], [11, 14, 18, 8]], [[13, 14, 18, 4], [15, 17, 11, 8], [17, 28, 50, 4]], [[19, 21, 1, 18], [21, 19, 19, 11], [23, 4, 3, 2]]]) for chainpoint in test_history: for point in chainpoint: dream.record_history(nseedchains=0, ndimensions=dream.total_var_dimension, q_new=point, len_history=len(history_arr)) history_arr_np = np.frombuffer(pydream.Dream_shared_vars.history.get_obj()) history_arr_np_reshaped = history_arr_np.reshape(np.shape(test_history)) self.assertIs(np.array_equal(history_arr_np_reshaped, test_history), True) remove('test_history_recording_DREAM_chain_history.npy') remove('test_history_recording_DREAM_chain_adapted_crossoverprob.npy') remove('test_history_recording_DREAM_chain_adapted_gammalevelprob.npy')
def _read32(bytestream): dt = numpy.dtype(numpy.uint32).newbyteorder('>') return numpy.frombuffer(bytestream.read(4), dtype=dt)[0]
def extract_labels(filename, one_hot=False): """Extract the labels into a 1D uint8 numpy array [index].""" print('Extracting', filename) with gzip.open(filename) as bytestream: magic = _read32(bytestream) if magic != 2049: raise ValueError( 'Invalid magic number %d in MNIST label file: %s' % (magic, filename)) num_items = _read32(bytestream) buf = bytestream.read(num_items) labels = numpy.frombuffer(buf, dtype=numpy.uint8) if one_hot: return dense_to_one_hot(labels) return labels
def _read32(bytestream): dt = numpy.dtype(numpy.uint32).newbyteorder('>') return numpy.frombuffer(bytestream.read(4), dtype=dt)
def decode_raw(bytestring, shape=(64,64,64), dtype=np.uint32): return np.frombuffer(bytestring, dtype=dtype).reshape(shape[::-1]).T
def capture(self, initial_sleep=0.01, poll=0.01, buffer_=None, filename=None): """Capture a still image. Type :class:`numpy.ndarray`.""" self.start_exposure() if initial_sleep: time.sleep(initial_sleep) while self.get_exposure_status() == ASI_EXP_WORKING: if poll: time.sleep(poll) pass status = self.get_exposure_status() if status != ASI_EXP_SUCCESS: raise ZWO_CaptureError('Could not capture image', status) data = self.get_data_after_exposure(buffer_) whbi = self.get_roi_format() shape = [whbi[1], whbi[0]] if whbi[3] == ASI_IMG_RAW8 or whbi[3] == ASI_IMG_Y8: img = np.frombuffer(data, dtype=np.uint8) elif whbi[3] == ASI_IMG_RAW16: img = np.frombuffer(data, dtype=np.uint16) elif whbi[3] == ASI_IMG_RGB24: img = np.frombuffer(data, dtype=np.uint8) shape.append(3) else: raise ValueError('Unsupported image type') img = img.reshape(shape) if filename is not None: from PIL import Image mode = None if len(img.shape) == 3: img = img[:, :, ::-1] # Convert BGR to RGB if whbi[3] == ASI_IMG_RAW16: mode = 'I;16' image = Image.fromarray(img, mode=mode) image.save(filename) logger.debug('wrote %s', filename) return img
def capture_video_frame(self, buffer_=None, filename=None, timeout=None): """Capture a single frame from video. Type :class:`numpy.ndarray`. Video mode must have been started previously otherwise a :class:`ZWO_Error` will be raised. A new buffer will be used to store the image unless one has been supplied with the `buffer` keyword argument. If `filename` is not ``None`` the image is saved using :py:meth:`PIL.Image.Image.save()`. :func:`capture_video_frame()` will wait indefinitely unless a `timeout` has been given. The SDK suggests that the `timeout` value, in milliseconds, should be twice the exposure plus 500 ms.""" data = self.get_video_data(buffer_=buffer_, timeout=timeout) whbi = self.get_roi_format() shape = [whbi[1], whbi[0]] if whbi[3] == ASI_IMG_RAW8 or whbi[3] == ASI_IMG_Y8: img = np.frombuffer(data, dtype=np.uint8) elif whbi[3] == ASI_IMG_RAW16: img = np.frombuffer(data, dtype=np.uint16) elif whbi[3] == ASI_IMG_RGB24: img = np.frombuffer(data, dtype=np.uint8) shape.append(3) else: raise ValueError('Unsupported image type') img = img.reshape(shape) if filename is not None: from PIL import Image mode = None if len(img.shape) == 3: img = img[:, :, ::-1] # Convert BGR to RGB if whbi[3] == ASI_IMG_RAW16: mode = 'I;16' image = Image.fromarray(img, mode=mode) image.save(filename) logger.debug('wrote %s', filename) return img
def __init__(self, buf, offset = 0, idmap = None, idmap_size = 1024): if idmap is None: idmap = Cache(idmap_size) self.offset = offset if offset != 0: self.buf = buf = buffer(buf, offset) else: self.buf = buf self.total_size, self.index_offset, self.index_elements = self._Header.unpack_from(buf, 0) self.index = numpy.frombuffer(buf, offset = self.index_offset, dtype = numpy.uint64, count = self.index_elements) self.idmap = idmap if self.index_elements > 0 and self.index[0] >= (self._Header.size + self._NewHeader.size): # New version, most likely self.version, min_reader_version, self.schema_offset, self.schema_size = self._NewHeader.unpack_from( buf, self._Header.size) if self._CURRENT_VERSION < min_reader_version: raise ValueError(( "Incompatible buffer, this buffer needs a reader with support for version %d at least, " "this reader supports up to version %d") % ( min_reader_version, self._CURRENT_VERSION )) if self.schema_offset and self.schema_size: if self.schema_offset > len(buf) or (self.schema_size + self.schema_offset) > len(buf): raise ValueError("Corrupted input - bad schema location") stored_schema = cPickle.loads(bytes(buffer(buf, self.schema_offset, self.schema_size))) if not isinstance(stored_schema, Schema): raise ValueError("Corrupted input - unrecognizable schema") if self.schema is None or not self.schema.compatible(stored_schema): self.schema = stored_schema elif self.schema is None: raise ValueError("Cannot map schema-less buffer without specifying schema") elif self.index_elements > 0: raise ValueError("Cannot reliably map version-0 buffers")
def imageToArray(img, copy=False, transpose=True): """ Convert a QImage into numpy array. The image must have format RGB32, ARGB32, or ARGB32_Premultiplied. By default, the image is not copied; changes made to the array will appear in the QImage as well (beware: if the QImage is collected before the array, there may be trouble). The array will have shape (width, height, (b,g,r,a)). """ fmt = img.format() ptr = img.bits() if USE_PYSIDE: arr = np.frombuffer(ptr, dtype=np.ubyte) else: ptr.setsize(img.byteCount()) arr = np.asarray(ptr) if img.byteCount() != arr.size * arr.itemsize: # Required for Python 2.6, PyQt 4.10 # If this works on all platforms, then there is no need to use np.asarray.. arr = np.frombuffer(ptr, np.ubyte, img.byteCount()) arr = arr.reshape(img.height(), img.width(), 4) if fmt == img.Format_RGB32: arr[...,3] = 255 if copy: arr = arr.copy() if transpose: return arr.transpose((1,0,2)) else: return arr