我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用io.BytesIO()。
def fetch_data(): try: r = requests.get(MTG_JSON_URL) except requests.ConnectionError: r = requests.get(FALLBACK_MTG_JSON_URL) with closing(r), zipfile.ZipFile(io.BytesIO(r.content)) as archive: unzipped_files = archive.infolist() if len(unzipped_files) != 1: raise RuntimeError("Found an unexpected number of files in the MTGJSON archive.") data = archive.read(archive.infolist()[0]) decoded_data = data.decode('utf-8') sets_data = json.loads(decoded_data) return sets_data
def zip_dir(directory): """zip a directory tree into a BytesIO object""" result = io.BytesIO() dlen = len(directory) with ZipFile(result, "w") as zf: for root, dirs, files in os.walk(directory): for name in files: full = os.path.join(root, name) rel = root[dlen:] dest = os.path.join(rel, name) zf.write(full, dest) return result # # Simple progress bar #
def encode_jpeg(arr): assert arr.dtype == np.uint8 # simulate multi-channel array for single channel arrays if len(arr.shape) == 3: arr = np.expand_dims(arr, 3) # add channels to end of x,y,z arr = arr.transpose((3,2,1,0)) # channels, z, y, x reshaped = arr.reshape(arr.shape[3] * arr.shape[2], arr.shape[1] * arr.shape[0]) if arr.shape[0] == 1: img = Image.fromarray(reshaped, mode='L') elif arr.shape[0] == 3: img = Image.fromarray(reshaped, mode='RGB') else: raise ValueError("Number of image channels should be 1 or 3. Got: {}".format(arr.shape[3])) f = io.BytesIO() img.save(f, "JPEG") return f.getvalue()
def fromqimage(im): buffer = QBuffer() buffer.open(QIODevice.ReadWrite) # preserve alha channel with png # otherwise ppm is more friendly with Image.open if im.hasAlphaChannel(): im.save(buffer, 'png') else: im.save(buffer, 'ppm') b = BytesIO() try: b.write(buffer.data()) except TypeError: # workaround for Python 2 b.write(str(buffer.data())) buffer.close() b.seek(0) return Image.open(b)
def tell(self): """ Allows reference to our object from within a Codec() """ if not self.filepath: # If there is no filepath, then we're probably dealing with a # stream in memory like a StringIO or BytesIO stream. if self.stream: # Advance to the end of the file return self.stream.tell() else: if self.stream and self._dirty is True: self.stream.flush() self._dirty = False if not self.stream: if not self.open(mode=NNTPFileMode.BINARY_RO): return None return self.stream.tell()
def openStream(self, source): """Produces a file object from source. source can be either a file object, local filename or a string. """ # Already a file object if hasattr(source, 'read'): stream = source else: stream = BytesIO(source) try: stream.seek(stream.tell()) except: # pylint:disable=bare-except stream = BufferedStream(stream) return stream
def rollover(self): if self._rolled: return file = self._file newfile = self._file = TemporaryFile(**self._TemporaryFileArgs) del self._TemporaryFileArgs newfile.write(file.getvalue()) newfile.seek(file.tell(), 0) self._rolled = True # The method caching trick from NamedTemporaryFile # won't work here, because _file may change from a # BytesIO/StringIO instance to a real file. So we list # all the methods directly. # Context management protocol
def test_load_config(self): """ Test loading of the config attribute """ usrmgr = self.__get_dummy_object() with patch("os.path.exists", return_value=True),\ patch("ownbot.usermanager.open") as open_mock: open_mock.return_value = io.BytesIO(b""" foogroup: unverified: - '@foouser'""") expected_config = {"foogroup": {"unverified": ["@foouser"]}} self.assertEqual(usrmgr.config, expected_config) self.assertTrue(open_mock.called)
def close(self, *args, **kwargs): """ Engine closed, copy file to DB """ super(DatabaseWrapper, self).close(*args, **kwargs) signature_version = self.settings_dict.get("SIGNATURE_VERSION", "s3v4") s3 = boto3.resource('s3', config=botocore.client.Config(signature_version=signature_version)) try: with open(self.settings_dict['NAME'], 'rb') as f: fb = f.read() bytesIO = BytesIO() bytesIO.write(fb) bytesIO.seek(0) s3_object = s3.Object(self.settings_dict['BUCKET'], self.settings_dict['REMOTE_NAME']) result = s3_object.put('rb', Body=bytesIO) except Exception as e: print(e) logging.debug("Saved to remote DB!")
def getWebPage(url, headers, cookies, postData=None): try: if (postData): params = urllib.parse.urlencode(postData) params = params.encode('utf-8') request = urllib.request.Request(url, data=params, headers=headers) else: print('Fetching '+url) request = urllib.request.Request(url, None, headers) request.add_header('Cookie', cookies) if (postData): response = urllib.request.build_opener(urllib.request.HTTPCookieProcessor).open(request) else: response = urllib.request.urlopen(request) if response.info().get('Content-Encoding') == 'gzip': buf = BytesIO(response.read()) f = gzip.GzipFile(fileobj=buf) r = f.read() else: r = response.read() return r except Exception as e: print("Error processing webpage: "+str(e)) return None ## https://stackoverflow.com/questions/480214/how-do-you-remove-duplicates-from-a-list-in-python-whilst-preserving-order
def display_graph(g, format='svg', include_asset_exists=False): """ Display a TermGraph interactively from within IPython. """ try: import IPython.display as display except ImportError: raise NoIPython("IPython is not installed. Can't display graph.") if format == 'svg': display_cls = display.SVG elif format in ("jpeg", "png"): display_cls = partial(display.Image, format=format, embed=True) out = BytesIO() _render(g, out, format, include_asset_exists=include_asset_exists) return display_cls(data=out.getvalue())
def _body(self): try: read_func = self.environ['wsgi.input'].read except KeyError: self.environ['wsgi.input'] = BytesIO() return self.environ['wsgi.input'] body_iter = self._iter_chunked if self.chunked else self._iter_body body, body_size, is_temp_file = BytesIO(), 0, False for part in body_iter(read_func, self.MEMFILE_MAX): body.write(part) body_size += len(part) if not is_temp_file and body_size > self.MEMFILE_MAX: body, tmp = TemporaryFile(mode='w+b'), body body.write(tmp.getvalue()) del tmp is_temp_file = True self.environ['wsgi.input'] = body body.seek(0) return body
def open_device(vendor_id, product_id, interface_number): """Opens and returns the HID device (file-like object). Raise IOError if the device or interface is not available. Arguments: vendor_id -- the mouse vendor id (e.g. 0x1038) product_id -- the mouse product id (e.g. 0x1710) interface_number -- the interface number (e.g. 0x00) """ # Dry run if debug.DEBUG and debug.DRY and is_device_plugged(vendor_id, product_id): device = BytesIO() # Moke the device device.send_feature_report = device.write return device # Real device for interface in hid.enumerate(vendor_id, product_id): if interface["interface_number"] != interface_number: continue device = hid.device() device.open_path(interface["path"]) return device raise IOError("Unable to find the requested device: %04X:%04X:%02X" % ( vendor_id, product_id, interface_number))
def print_chapters(self,show=False): ''' ?????? Display infos of chapters. ''' headers={'use-agent':"Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102 Safari/537.36",'Referer':'http://manhua.dmzj.com/tags/s.shtml'} text='There are {n} chapters in comic {c}:\n{chs}'.format(n=self.chapter_num,c=self.comic_title,chs='\n'.join([info[0] for info in self.chapter_urls])) print(text) if show: try: res=requests.get(self.cover,headers=headers) if b'403' in res.content: raise ValueError('Got cover img failed') out=BytesIO(res.content) out.seek(0) Image.open(out).show() except (ConnectionError,ValueError): traceback.print_exc() return text
def file(self): """ Returns a file pointer to this binary :example: >>> process_obj = c.select(Process).where("process_name:svch0st.exe").first() >>> binary_obj = process_obj.binary >>> print(binary_obj.file.read(2)) MZ """ # TODO: I don't like reaching through to the session... with closing(self._cb.session.get("/api/v1/binary/{0:s}".format(self.md5sum), stream=True)) as r: z = StringIO(r.content) zf = ZipFile(z) fp = zf.open('filedata') return fp
def sendto(self, bytes, *args, **kwargs): if self.type != socket.SOCK_DGRAM: return super(socksocket, self).sendto(bytes, *args, **kwargs) if not self._proxyconn: self.bind(("", 0)) address = args[-1] flags = args[:-1] header = BytesIO() RSV = b"\x00\x00" header.write(RSV) STANDALONE = b"\x00" header.write(STANDALONE) self._write_SOCKS5_address(address, header) sent = super(socksocket, self).send(header.getvalue() + bytes, *flags, **kwargs) return sent - header.tell()
def recvfrom(self, bufsize, flags=0): if self.type != socket.SOCK_DGRAM: return super(socksocket, self).recvfrom(bufsize, flags) if not self._proxyconn: self.bind(("", 0)) buf = BytesIO(super(socksocket, self).recv(bufsize + 1024, flags)) buf.seek(2, SEEK_CUR) frag = buf.read(1) if ord(frag): raise NotImplementedError("Received UDP packet fragment") fromhost, fromport = self._read_SOCKS5_address(buf) if self.proxy_peername: peerhost, peerport = self.proxy_peername if fromhost != peerhost or peerport not in (0, fromport): raise socket.error(EAGAIN, "Packet filtered") return (buf.read(bufsize), (fromhost, fromport))
def test_write_svg(): # Test with default options qr = segno.make_qr('test') out = io.BytesIO() qr.save(out, kind='svg') xml_str = out.getvalue() assert xml_str.startswith(b'<?xml') root = _parse_xml(out) assert 'viewBox' not in root.attrib assert 'height' in root.attrib assert 'width' in root.attrib css_class = root.attrib.get('class') assert css_class assert 'segno' == css_class path_el = _get_path(root) assert path_el is not None path_class = path_el.get('class') assert 'qrline' == path_class stroke = path_el.get('stroke') assert stroke == '#000' title_el = _get_title(root) assert title_el is None desc_el = _get_desc(root) assert desc_el is None
def test_write_svg_black(): # Test with default options qr = segno.make_qr('test') out = io.BytesIO() qr.save(out, kind='svg', color='bLacK') xml_str = out.getvalue() assert xml_str.startswith(b'<?xml') root = _parse_xml(out) assert 'viewBox' not in root.attrib assert 'height' in root.attrib assert 'width' in root.attrib css_class = root.attrib.get('class') assert css_class assert 'segno' == css_class path_el = _get_path(root) assert path_el is not None path_class = path_el.get('class') assert 'qrline' == path_class stroke = path_el.get('stroke') assert stroke == '#000' title_el = _get_title(root) assert title_el is None desc_el = _get_desc(root) assert desc_el is None
def test_write_svg_black3(): # Test with default options qr = segno.make_qr('test') out = io.BytesIO() qr.save(out, kind='svg', color=(0, 0, 0)) xml_str = out.getvalue() assert xml_str.startswith(b'<?xml') root = _parse_xml(out) assert 'viewBox' not in root.attrib assert 'height' in root.attrib assert 'width' in root.attrib css_class = root.attrib.get('class') assert css_class assert 'segno' == css_class path_el = _get_path(root) assert path_el is not None path_class = path_el.get('class') assert 'qrline' == path_class stroke = path_el.get('stroke') assert stroke == '#000' title_el = _get_title(root) assert title_el is None desc_el = _get_desc(root) assert desc_el is None
def pam_bw_as_matrix(buff, border): """\ Returns the QR code as list of [0, 1] lists. :param io.BytesIO buff: Buffer to read the matrix from. """ res = [] data, size = _image_data(buff) for i, offset in enumerate(range(0, len(data), size)): if i < border: continue if i >= size - border: break row_data = bytearray(data[offset + border:offset + size - border]) # Invert bytes since PAM uses 0x0 = black, 0x1 = white res.append([b ^ 0x1 for b in row_data]) return res
def pdf_as_matrix(buff, border): """\ Reads the path in the PDF and returns it as list of 0, 1 lists. :param io.BytesIO buff: Buffer to read the matrix from. """ pdf = buff.getvalue() h, w = re.search(br'/MediaBox \[0 0 ([0-9]+) ([0-9]+)\]', pdf, flags=re.MULTILINE).groups() if h != w: raise ValueError('Expected equal height/width, got height="{}" width="{}"'.format(h, w)) size = int(w) - 2 * border graphic = _find_graphic(buff) res = [[0] * size for i in range(size)] for x1, y1, x2, y2 in re.findall(r'\s*(\-?\d+)\s+(\-?\d+)\s+m\s+' r'(\-?\d+)\s+(\-?\d+)\s+l', graphic): x1, y1, x2, y2 = [int(i) for i in (x1, y1, x2, y2)] y = abs(y1) res[y][x1:x2] = [1] * (x2 - x1) return res
def test_flow(): from io import BytesIO # info + flush inbound = b'\x01\x02\x1a\x00\x01\x02\x12\x00' data = BytesIO(inbound) req_type,_ = read_message(data, types.Request) assert 'info' == req_type.WhichOneof("value") req_type2, _ = read_message(data, types.Request) assert 'flush' == req_type2.WhichOneof("value") assert data.read() == b'' data.close() data2 = BytesIO(b'') req_type, fail = read_message(data2, types.Request) assert fail == 0 assert req_type == None data3 = BytesIO(b'\x01') req_type, fail = read_message(data3, types.Request) assert fail == 0
def bytes2zip(bytes): """ RETURN COMPRESSED BYTES """ if hasattr(bytes, "read"): buff = TemporaryFile() archive = gzip.GzipFile(fileobj=buff, mode='w') for b in bytes: archive.write(b) archive.close() buff.seek(0) from pyLibrary.env.big_data import FileString, safe_size return FileString(buff) buff = BytesIO() archive = gzip.GzipFile(fileobj=buff, mode='w') archive.write(bytes) archive.close() return buff.getvalue()
def openstream(self, filename): """ Open a stream as a read-only file object (BytesIO). Note: filename is case-insensitive. :param filename: path of stream in storage tree (except root entry), either: - a string using Unix path syntax, for example: 'storage_1/storage_1.2/stream' - or a list of storage filenames, path to the desired stream/storage. Example: ['storage_1', 'storage_1.2', 'stream'] :returns: file object (read-only) :exception IOError: if filename not found, or if this is not a stream. """ sid = self._find(filename) entry = self.direntries[sid] if entry.entry_type != STGTY_STREAM: raise IOError("this file is not a stream") return self._open(entry.isectStart, entry.size)
def __init__(self, image=None, **kw): # Tk compatibility: file or data if image is None: if "file" in kw: image = Image.open(kw["file"]) del kw["file"] elif "data" in kw: from io import BytesIO image = Image.open(BytesIO(kw["data"])) del kw["data"] self.__mode = image.mode self.__size = image.size if _pilbitmap_check(): # fast way (requires the pilbitmap booster patch) image.load() kw["data"] = "PIL:%d" % image.im.id self.__im = image # must keep a reference else: # slow but safe way kw["data"] = image.tobitmap() self.__photo = tkinter.BitmapImage(**kw)
def get_data_hash(self, data_bytes): """Calculate Merkle's root hash of the given data bytes""" # Calculate tree parameters data_len = len(data_bytes) tree_populated_width = math.ceil(data_len / self._chunk_len) tree_height = math.ceil(math.log2(tree_populated_width)) tree_width = int(math.pow(2, tree_height)) tree_bottom_layer = ['\x00'] * tree_width with io.BytesIO(data_bytes) as b_data: self._initial_hasher( b_data, tree_populated_width, tree_bottom_layer ) # Get Merkle's root hash mrh = self._calculate_root_hash(tree_bottom_layer) return mrh
def test_simple(self): pipe = test_helper.get_mock_pipeline([]) mock_mod = test_helper.MockSubscriber() mock_mod = test_helper.MockSubscriber() pipe.register_magic(b'\xFF\xEE\xDD', ('mock', mock_mod.consume)) pipe.register_magic(b'\x00\x00\x00', ('mock', mock_mod.consume)) _magic = magic.Subscriber(pipe) _magic.setup(None) doc = document.get_document('mock') content = b'\xFF\xEE\xDDMOCKMOCKMOCK' _magic.consume(doc, BytesIO(content)) self.assertEquals(True, doc.magic_hit) self.assertEquals(1, len(mock_mod.produced)) expected = content actual = mock_mod.produced[0][1].read() self.assertEquals(expected, actual)
def test_simple(self): _file_meta = file_meta.Subscriber(test_helper.get_mock_pipeline([])) response = json.dumps({'Content-Type': 'image/jpeg'}).encode('utf-8') _file_meta.setup({ 'data_root': 'local_data', 'code_root': '.', 'worker_id': 1, 'host': 'mock', helper.INJECTOR: test_helper.MockInjector(response) }) doc = document.get_document('mock.txt') _file_meta.consume(doc, BytesIO(b'mock')) expected = 'picture' actual = doc.doctype self.assertEqual(expected, actual)
def test_simple(self): mock_pipeline = test_helper.get_mock_pipeline([]) data_root = os.path.join('local_data', 'unittests') if os.path.exists(data_root): shutil.rmtree(data_root) _copy = copy_file.Subscriber(mock_pipeline) _copy.setup({ helper.DATA_ROOT: data_root, 'workers': 1, 'tag': 'default', helper.COPY_EXT: ['xyz'] }) _copy.consume(document.get_document('mock.xyz'), BytesIO(b'juba.')) _copy.consume(document.get_document('ignore.doc'), BytesIO(b'mock')) expected = ['39bbf948-mock.xyz'] actual = os.listdir(os.path.join(data_root, 'files', 'xyz')) self.assertEqual(expected, actual)
def reseed(self, netdb): """Compress netdb entries and set content""" zip_file = io.BytesIO() dat_files = [] for root, dirs, files in os.walk(netdb): for f in files: if f.endswith(".dat"): # TODO check modified time # may be not older than 10h dat_files.append(os.path.join(root, f)) if len(dat_files) == 0: raise PyseederException("Can't get enough netDb entries") elif len(dat_files) > 75: dat_files = random.sample(dat_files, 75) with ZipFile(zip_file, "w", compression=ZIP_DEFLATED) as zf: for f in dat_files: zf.write(f, arcname=os.path.split(f)[1]) self.FILE_TYPE = 0x00 self.CONTENT_TYPE = 0x03 self.CONTENT = zip_file.getvalue() self.CONTENT_LENGTH = len(self.CONTENT)
def _restart_data(self, format_: str='json') -> None: assert format_ == 'json' with open(join(CURDIR, 'data', 'helloworld.py')) as f: testcode = f.read() self.data = Request({ 'filepath': 'test.py', 'action': 'ParseAST', 'content': testcode, 'language': 'python', }) bufferclass = io.StringIO if format_ == 'json' else io.BytesIO # This will mock the python_driver stdin self.sendbuffer = bufferclass() # This will mock the python_driver stdout self.recvbuffer = bufferclass()
def __init__(self, codestr: str, astdict: AstDict) -> None: self._astdict = astdict # Tokenize and create the noop extractor and the position fixer self._tokens: List[Token] = [Token(*i) for i in tokenize.tokenize(BytesIO(codestr.encode('utf-8')).readline)] token_lines = _create_tokenized_lines(codestr, self._tokens) self.noops_sync = NoopExtractor(codestr, token_lines) self.pos_sync = LocationFixer(codestr, token_lines) self.codestr = codestr # This will store a dict of nodes to end positions, it will be filled # on parse() self._node2endpos = None self.visit_Global = self.visit_Nonlocal = self._promote_names
def __len__(self): """ Returns the length of the content """ if not self.filepath: # If there is no filepath, then we're probably dealing with a # stream in memory like a StringIO or BytesIO stream. if self.stream: # Advance to the end of the file ptr = self.stream.tell() # Advance to the end of the file and get our length length = self.stream.seek(0L, SEEK_END) if length != ptr: # Return our pointer self.stream.seek(ptr, SEEK_SET) else: # No Stream or Filepath; nothing has been initialized # yet at all so just return 0 length = 0 else: if self.stream and self._dirty is True: self.stream.flush() self._dirty = False # Get the size length = getsize(self.filepath) return length
def get_stream(self, resource): return io.BytesIO(self.get_bytes(resource))
def get_resource_stream(self, manager, resource_name): return io.BytesIO(self.get_resource_string(manager, resource_name))
def prepare_response(self, request, cached): """Verify our vary headers match and construct a real urllib3 HTTPResponse object. """ # Special case the '*' Vary value as it means we cannot actually # determine if the cached response is suitable for this request. if "*" in cached.get("vary", {}): return # Ensure that the Vary headers for the cached response match our # request for header, value in cached.get("vary", {}).items(): if request.headers.get(header, None) != value: return body_raw = cached["response"].pop("body") headers = CaseInsensitiveDict(data=cached['response']['headers']) if headers.get('transfer-encoding', '') == 'chunked': headers.pop('transfer-encoding') cached['response']['headers'] = headers try: body = io.BytesIO(body_raw) except TypeError: # This can happen if cachecontrol serialized to v1 format (pickle) # using Python 2. A Python 2 str(byte string) will be unpickled as # a Python 3 str (unicode string), which will cause the above to # fail with: # # TypeError: 'str' does not support the buffer interface body = io.BytesIO(body_raw.encode('utf8')) return HTTPResponse( body=body, preload_content=False, **cached["response"] )
def __init__(self, fp, callback): self.__buf = BytesIO() self.__fp = fp self.__callback = callback
def captured_output(stream_name): """Return a context manager used by captured_stdout/stdin/stderr that temporarily replaces the sys stream *stream_name* with a StringIO. Taken from Lib/support/__init__.py in the CPython repo. """ orig_stdout = getattr(sys, stream_name) setattr(sys, stream_name, StreamWrapper.from_stream(orig_stdout)) try: yield getattr(sys, stream_name) finally: setattr(sys, stream_name, orig_stdout)
def test_verifying_zipfile(): if not hasattr(zipfile.ZipExtFile, '_update_crc'): pytest.skip('No ZIP verification. Missing ZipExtFile._update_crc.') sio = StringIO() zf = zipfile.ZipFile(sio, 'w') zf.writestr("one", b"first file") zf.writestr("two", b"second file") zf.writestr("three", b"third file") zf.close() # In default mode, VerifyingZipFile checks the hash of any read file # mentioned with set_expected_hash(). Files not mentioned with # set_expected_hash() are not checked. vzf = wheel.install.VerifyingZipFile(sio, 'r') vzf.set_expected_hash("one", hashlib.sha256(b"first file").digest()) vzf.set_expected_hash("three", "blurble") vzf.open("one").read() vzf.open("two").read() try: vzf.open("three").read() except wheel.install.BadWheelFile: pass else: raise Exception("expected exception 'BadWheelFile()'") # In strict mode, VerifyingZipFile requires every read file to be # mentioned with set_expected_hash(). vzf.strict = True try: vzf.open("two").read() except wheel.install.BadWheelFile: pass else: raise Exception("expected exception 'BadWheelFile()'") vzf.set_expected_hash("two", None) vzf.open("two").read()
def quopri_encode(input, errors='strict'): assert errors == 'strict' f = BytesIO(input) g = BytesIO() quopri.encode(f, g, quotetabs=True) return (g.getvalue(), len(input))
def quopri_decode(input, errors='strict'): assert errors == 'strict' f = BytesIO(input) g = BytesIO() quopri.decode(f, g) return (g.getvalue(), len(input))
def uu_encode(input, errors='strict', filename='<data>', mode=0o666): assert errors == 'strict' infile = BytesIO(input) outfile = BytesIO() read = infile.read write = outfile.write # Encode write(('begin %o %s\n' % (mode & 0o777, filename)).encode('ascii')) chunk = read(45) while chunk: write(binascii.b2a_uu(chunk)) chunk = read(45) write(b' \nend\n') return (outfile.getvalue(), len(input))
def uu_decode(input, errors='strict'): assert errors == 'strict' infile = BytesIO(input) outfile = BytesIO() readline = infile.readline write = outfile.write # Find start of encoded data while 1: s = readline() if not s: raise ValueError('Missing "begin" line in input data') if s[:5] == b'begin': break # Decode while True: s = readline() if not s or s == b'end\n': break try: data = binascii.a2b_uu(s) except binascii.Error as v: # Workaround for broken uuencoders by /Fredrik Lundh nbytes = (((s[0]-32) & 63) * 4 + 5) // 3 data = binascii.a2b_uu(s[:nbytes]) #sys.stderr.write("Warning: %s\n" % str(v)) write(data) if not s: raise ValueError('Truncated input data') return (outfile.getvalue(), len(input))
def test_load_config_empty_file(self): """ Test loading of the config attr if the config file is empty """ usrmgr = self.__get_dummy_object() with patch("os.path.exists", return_value=True),\ patch("ownbot.usermanager.open") as open_mock: open_mock.return_value = io.BytesIO(b"") self.assertEqual(usrmgr.config, {}) self.assertTrue(open_mock.called)
def _create_map_image(self): images = self._get_images() self._map_image = Image.new('RGBA', self._map_size) for img in images: if not isinstance(img, Image.Image): img = Image.open(BytesIO(img.content)).convert('RGBA') self._map_image = Image.alpha_composite(self._map_image, img)
def _create_pdf_libreoffice(self): output_image = BytesIO() self._map_image.save(output_image, 'PNG') render = Renderer(media_path='.') # TODO: use the configuration to select the template # TODO: use the configuration to select the name of the key in the template result = render.render('template.odt', my_map=output_image) with NamedTemporaryFile( mode='wb+', prefix='geo-pyprint_', delete=True ) as generated_odt: generated_odt.write(result) generated_odt.flush() output_name = generated_odt.name + '.pdf' cmd = [ 'unoconv', '-f', 'pdf', '-o', output_name, generated_odt.name ] subprocess.call(cmd, timeout=None) return output_name
def load_verify_locations(self, cafile=None, capath=None, cadata=None): if cafile is not None: cafile = cafile.encode('utf-8') if capath is not None: capath = capath.encode('utf-8') self._ctx.load_verify_locations(cafile, capath) if cadata is not None: self._ctx.load_verify_locations(BytesIO(cadata))