Python io 模块,BytesIO() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用io.BytesIO()

项目:django-magic-cards    作者:pbaranay    | 项目源码 | 文件源码
def fetch_data():
    try:
        r = requests.get(MTG_JSON_URL)
    except requests.ConnectionError:
        r = requests.get(FALLBACK_MTG_JSON_URL)
    with closing(r), zipfile.ZipFile(io.BytesIO(r.content)) as archive:
        unzipped_files = archive.infolist()
        if len(unzipped_files) != 1:
            raise RuntimeError("Found an unexpected number of files in the MTGJSON archive.")
        data = archive.read(archive.infolist()[0])
    decoded_data = data.decode('utf-8')
    sets_data = json.loads(decoded_data)
    return sets_data
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def zip_dir(directory):
    """zip a directory tree into a BytesIO object"""
    result = io.BytesIO()
    dlen = len(directory)
    with ZipFile(result, "w") as zf:
        for root, dirs, files in os.walk(directory):
            for name in files:
                full = os.path.join(root, name)
                rel = root[dlen:]
                dest = os.path.join(rel, name)
                zf.write(full, dest)
    return result

#
# Simple progress bar
#
项目:cloud-volume    作者:seung-lab    | 项目源码 | 文件源码
def encode_jpeg(arr):
    assert arr.dtype == np.uint8

    # simulate multi-channel array for single channel arrays
    if len(arr.shape) == 3:
        arr = np.expand_dims(arr, 3) # add channels to end of x,y,z

    arr = arr.transpose((3,2,1,0)) # channels, z, y, x
    reshaped = arr.reshape(arr.shape[3] * arr.shape[2], arr.shape[1] * arr.shape[0])
    if arr.shape[0] == 1:
        img = Image.fromarray(reshaped, mode='L')
    elif arr.shape[0] == 3:
        img = Image.fromarray(reshaped, mode='RGB')
    else:
        raise ValueError("Number of image channels should be 1 or 3. Got: {}".format(arr.shape[3]))

    f = io.BytesIO()
    img.save(f, "JPEG")
    return f.getvalue()
项目:imagepaste    作者:robinchenyu    | 项目源码 | 文件源码
def fromqimage(im):
    buffer = QBuffer()
    buffer.open(QIODevice.ReadWrite)
    # preserve alha channel with png
    # otherwise ppm is more friendly with Image.open
    if im.hasAlphaChannel():
        im.save(buffer, 'png')
    else:
        im.save(buffer, 'ppm')

    b = BytesIO()
    try:
        b.write(buffer.data())
    except TypeError:
        # workaround for Python 2
        b.write(str(buffer.data()))
    buffer.close()
    b.seek(0)

    return Image.open(b)
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def tell(self):
        """
        Allows reference to our object from within a Codec()

        """
        if not self.filepath:
            # If there is no filepath, then we're probably dealing with a
            # stream in memory like a StringIO or BytesIO stream.
            if self.stream:
                # Advance to the end of the file
                return self.stream.tell()

        else:
            if self.stream and self._dirty is True:
                self.stream.flush()
                self._dirty = False

        if not self.stream:
            if not self.open(mode=NNTPFileMode.BINARY_RO):
                return None

        return self.stream.tell()
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def openStream(self, source):
        """Produces a file object from source.

        source can be either a file object, local filename or a string.

        """
        # Already a file object
        if hasattr(source, 'read'):
            stream = source
        else:
            stream = BytesIO(source)

        try:
            stream.seek(stream.tell())
        except:  # pylint:disable=bare-except
            stream = BufferedStream(stream)

        return stream
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def rollover(self):
        if self._rolled: return
        file = self._file
        newfile = self._file = TemporaryFile(**self._TemporaryFileArgs)
        del self._TemporaryFileArgs

        newfile.write(file.getvalue())
        newfile.seek(file.tell(), 0)

        self._rolled = True

    # The method caching trick from NamedTemporaryFile
    # won't work here, because _file may change from a
    # BytesIO/StringIO instance to a real file. So we list
    # all the methods directly.

    # Context management protocol
项目:ownbot    作者:michaelimfeld    | 项目源码 | 文件源码
def test_load_config(self):
        """
            Test loading of the config attribute
        """
        usrmgr = self.__get_dummy_object()
        with patch("os.path.exists", return_value=True),\
                patch("ownbot.usermanager.open") as open_mock:

            open_mock.return_value = io.BytesIO(b"""
foogroup:
  unverified:
    - '@foouser'""")

            expected_config = {"foogroup": {"unverified": ["@foouser"]}}

            self.assertEqual(usrmgr.config, expected_config)
            self.assertTrue(open_mock.called)
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def zip_dir(directory):
    """zip a directory tree into a BytesIO object"""
    result = io.BytesIO()
    dlen = len(directory)
    with ZipFile(result, "w") as zf:
        for root, dirs, files in os.walk(directory):
            for name in files:
                full = os.path.join(root, name)
                rel = root[dlen:]
                dest = os.path.join(rel, name)
                zf.write(full, dest)
    return result

#
# Simple progress bar
#
项目:zappa-django-utils    作者:Miserlou    | 项目源码 | 文件源码
def close(self, *args, **kwargs):
        """
        Engine closed, copy file to DB
        """
        super(DatabaseWrapper, self).close(*args, **kwargs)

        signature_version = self.settings_dict.get("SIGNATURE_VERSION", "s3v4")
        s3 = boto3.resource('s3',
                config=botocore.client.Config(signature_version=signature_version))

        try:
            with open(self.settings_dict['NAME'], 'rb') as f:
                fb = f.read()
                bytesIO = BytesIO()
                bytesIO.write(fb)
                bytesIO.seek(0)

                s3_object = s3.Object(self.settings_dict['BUCKET'], self.settings_dict['REMOTE_NAME'])
                result = s3_object.put('rb', Body=bytesIO)

        except Exception as e:
            print(e)

        logging.debug("Saved to remote DB!")
项目:AutoSteamGifts    作者:joaopsys    | 项目源码 | 文件源码
def getWebPage(url, headers, cookies, postData=None):
    try:
        if (postData):
            params = urllib.parse.urlencode(postData)
            params = params.encode('utf-8')
            request = urllib.request.Request(url, data=params, headers=headers)
        else:
            print('Fetching '+url)
            request = urllib.request.Request(url, None, headers)
        request.add_header('Cookie', cookies)
        if (postData):
            response = urllib.request.build_opener(urllib.request.HTTPCookieProcessor).open(request)
        else:
            response = urllib.request.urlopen(request)
        if response.info().get('Content-Encoding') == 'gzip':
            buf = BytesIO(response.read())
            f = gzip.GzipFile(fileobj=buf)
            r = f.read()
        else:
            r = response.read()

        return r
    except Exception as e:
        print("Error processing webpage: "+str(e))
        return None

## https://stackoverflow.com/questions/480214/how-do-you-remove-duplicates-from-a-list-in-python-whilst-preserving-order
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def display_graph(g, format='svg', include_asset_exists=False):
    """
    Display a TermGraph interactively from within IPython.
    """
    try:
        import IPython.display as display
    except ImportError:
        raise NoIPython("IPython is not installed.  Can't display graph.")

    if format == 'svg':
        display_cls = display.SVG
    elif format in ("jpeg", "png"):
        display_cls = partial(display.Image, format=format, embed=True)

    out = BytesIO()
    _render(g, out, format, include_asset_exists=include_asset_exists)
    return display_cls(data=out.getvalue())
项目:dabdabrevolution    作者:harryparkdotio    | 项目源码 | 文件源码
def _body(self):
        try:
            read_func = self.environ['wsgi.input'].read
        except KeyError:
            self.environ['wsgi.input'] = BytesIO()
            return self.environ['wsgi.input']
        body_iter = self._iter_chunked if self.chunked else self._iter_body
        body, body_size, is_temp_file = BytesIO(), 0, False
        for part in body_iter(read_func, self.MEMFILE_MAX):
            body.write(part)
            body_size += len(part)
            if not is_temp_file and body_size > self.MEMFILE_MAX:
                body, tmp = TemporaryFile(mode='w+b'), body
                body.write(tmp.getvalue())
                del tmp
                is_temp_file = True
        self.environ['wsgi.input'] = body
        body.seek(0)
        return body
项目:rivalcfg    作者:flozz    | 项目源码 | 文件源码
def open_device(vendor_id, product_id, interface_number):
    """Opens and returns the HID device (file-like object). Raise IOError if
    the device or interface is not available.

    Arguments:
    vendor_id -- the mouse vendor id (e.g. 0x1038)
    product_id -- the mouse product id (e.g. 0x1710)
    interface_number -- the interface number (e.g. 0x00)
    """
    # Dry run
    if debug.DEBUG and debug.DRY and is_device_plugged(vendor_id, product_id):
        device = BytesIO()  # Moke the device
        device.send_feature_report = device.write
        return device

    # Real device
    for interface in hid.enumerate(vendor_id, product_id):
        if interface["interface_number"] != interface_number:
            continue
        device = hid.device()
        device.open_path(interface["path"])
        return device

    raise IOError("Unable to find the requested device: %04X:%04X:%02X" % (
        vendor_id, product_id, interface_number))
项目:ComicSpider    作者:QuantumLiu    | 项目源码 | 文件源码
def print_chapters(self,show=False):
        '''
        ??????
        Display infos of chapters.
        '''
        headers={'use-agent':"Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102 Safari/537.36",'Referer':'http://manhua.dmzj.com/tags/s.shtml'}
        text='There are {n} chapters in comic {c}:\n{chs}'.format(n=self.chapter_num,c=self.comic_title,chs='\n'.join([info[0] for info in self.chapter_urls]))
        print(text)
        if show:
            try:
                res=requests.get(self.cover,headers=headers)
                if b'403' in res.content:
                    raise ValueError('Got cover img failed')
                out=BytesIO(res.content)
                out.seek(0)
                Image.open(out).show()
            except (ConnectionError,ValueError):
                traceback.print_exc()
        return text
项目:cbapi-python    作者:carbonblack    | 项目源码 | 文件源码
def file(self):
        """
        Returns a file pointer to this binary

        :example:

        >>> process_obj = c.select(Process).where("process_name:svch0st.exe").first()
        >>> binary_obj = process_obj.binary
        >>> print(binary_obj.file.read(2))
        MZ
        """
        # TODO: I don't like reaching through to the session...
        with closing(self._cb.session.get("/api/v1/binary/{0:s}".format(self.md5sum), stream=True)) as r:
            z = StringIO(r.content)
            zf = ZipFile(z)
            fp = zf.open('filedata')
            return fp
项目:LFISuite    作者:D35m0nd142    | 项目源码 | 文件源码
def sendto(self, bytes, *args, **kwargs):
        if self.type != socket.SOCK_DGRAM:
            return super(socksocket, self).sendto(bytes, *args, **kwargs)
        if not self._proxyconn:
            self.bind(("", 0))

        address = args[-1]
        flags = args[:-1]

        header = BytesIO()
        RSV = b"\x00\x00"
        header.write(RSV)
        STANDALONE = b"\x00"
        header.write(STANDALONE)
        self._write_SOCKS5_address(address, header)

        sent = super(socksocket, self).send(header.getvalue() + bytes, *flags, **kwargs)
        return sent - header.tell()
项目:LFISuite    作者:D35m0nd142    | 项目源码 | 文件源码
def recvfrom(self, bufsize, flags=0):
        if self.type != socket.SOCK_DGRAM:
            return super(socksocket, self).recvfrom(bufsize, flags)
        if not self._proxyconn:
            self.bind(("", 0))

        buf = BytesIO(super(socksocket, self).recv(bufsize + 1024, flags))
        buf.seek(2, SEEK_CUR)
        frag = buf.read(1)
        if ord(frag):
            raise NotImplementedError("Received UDP packet fragment")
        fromhost, fromport = self._read_SOCKS5_address(buf)

        if self.proxy_peername:
            peerhost, peerport = self.proxy_peername
            if fromhost != peerhost or peerport not in (0, fromport):
                raise socket.error(EAGAIN, "Packet filtered")

        return (buf.read(bufsize), (fromhost, fromport))
项目:segno    作者:heuer    | 项目源码 | 文件源码
def test_write_svg():
    # Test with default options
    qr = segno.make_qr('test')
    out = io.BytesIO()
    qr.save(out, kind='svg')
    xml_str = out.getvalue()
    assert xml_str.startswith(b'<?xml')
    root = _parse_xml(out)
    assert 'viewBox' not in root.attrib
    assert 'height' in root.attrib
    assert 'width' in root.attrib
    css_class = root.attrib.get('class')
    assert css_class
    assert 'segno' == css_class
    path_el = _get_path(root)
    assert path_el is not None
    path_class = path_el.get('class')
    assert 'qrline' == path_class
    stroke = path_el.get('stroke')
    assert stroke == '#000'
    title_el = _get_title(root)
    assert title_el is None
    desc_el = _get_desc(root)
    assert desc_el is None
项目:segno    作者:heuer    | 项目源码 | 文件源码
def test_write_svg_black():
    # Test with default options
    qr = segno.make_qr('test')
    out = io.BytesIO()
    qr.save(out, kind='svg', color='bLacK')
    xml_str = out.getvalue()
    assert xml_str.startswith(b'<?xml')
    root = _parse_xml(out)
    assert 'viewBox' not in root.attrib
    assert 'height' in root.attrib
    assert 'width' in root.attrib
    css_class = root.attrib.get('class')
    assert css_class
    assert 'segno' == css_class
    path_el = _get_path(root)
    assert path_el is not None
    path_class = path_el.get('class')
    assert 'qrline' == path_class
    stroke = path_el.get('stroke')
    assert stroke == '#000'
    title_el = _get_title(root)
    assert title_el is None
    desc_el = _get_desc(root)
    assert desc_el is None
项目:segno    作者:heuer    | 项目源码 | 文件源码
def test_write_svg_black3():
    # Test with default options
    qr = segno.make_qr('test')
    out = io.BytesIO()
    qr.save(out, kind='svg', color=(0, 0, 0))
    xml_str = out.getvalue()
    assert xml_str.startswith(b'<?xml')
    root = _parse_xml(out)
    assert 'viewBox' not in root.attrib
    assert 'height' in root.attrib
    assert 'width' in root.attrib
    css_class = root.attrib.get('class')
    assert css_class
    assert 'segno' == css_class
    path_el = _get_path(root)
    assert path_el is not None
    path_class = path_el.get('class')
    assert 'qrline' == path_class
    stroke = path_el.get('stroke')
    assert stroke == '#000'
    title_el = _get_title(root)
    assert title_el is None
    desc_el = _get_desc(root)
    assert desc_el is None
项目:segno    作者:heuer    | 项目源码 | 文件源码
def pam_bw_as_matrix(buff, border):
    """\
    Returns the QR code as list of [0, 1] lists.

    :param io.BytesIO buff: Buffer to read the matrix from.
    """
    res = []
    data, size = _image_data(buff)
    for i, offset in enumerate(range(0, len(data), size)):
        if i < border:
            continue
        if i >= size - border:
            break
        row_data = bytearray(data[offset + border:offset + size - border])
        # Invert bytes since PAM uses 0x0 = black, 0x1 = white
        res.append([b ^ 0x1 for b in row_data])
    return res
项目:segno    作者:heuer    | 项目源码 | 文件源码
def pdf_as_matrix(buff, border):
    """\
    Reads the path in the PDF and returns it as list of 0, 1 lists.

    :param io.BytesIO buff: Buffer to read the matrix from.
    """
    pdf = buff.getvalue()
    h, w = re.search(br'/MediaBox \[0 0 ([0-9]+) ([0-9]+)\]', pdf,
                     flags=re.MULTILINE).groups()
    if h != w:
        raise ValueError('Expected equal height/width, got height="{}" width="{}"'.format(h, w))
    size = int(w) - 2 * border

    graphic = _find_graphic(buff)
    res = [[0] * size for i in range(size)]
    for x1, y1, x2, y2 in re.findall(r'\s*(\-?\d+)\s+(\-?\d+)\s+m\s+'
                                        r'(\-?\d+)\s+(\-?\d+)\s+l', graphic):
        x1, y1, x2, y2 = [int(i) for i in (x1, y1, x2, y2)]
        y = abs(y1)
        res[y][x1:x2] = [1] * (x2 - x1)
    return res
项目:py-abci    作者:davebryson    | 项目源码 | 文件源码
def test_flow():
    from io import BytesIO
    # info + flush
    inbound = b'\x01\x02\x1a\x00\x01\x02\x12\x00'
    data = BytesIO(inbound)

    req_type,_  = read_message(data, types.Request)
    assert 'info' == req_type.WhichOneof("value")

    req_type2, _  = read_message(data, types.Request)
    assert 'flush' == req_type2.WhichOneof("value")

    assert data.read() == b''
    data.close()

    data2 = BytesIO(b'')
    req_type, fail  = read_message(data2, types.Request)
    assert fail == 0
    assert req_type == None

    data3 = BytesIO(b'\x01')
    req_type, fail  = read_message(data3, types.Request)
    assert fail == 0
项目:jx-sqlite    作者:mozilla    | 项目源码 | 文件源码
def bytes2zip(bytes):
    """
    RETURN COMPRESSED BYTES
    """
    if hasattr(bytes, "read"):
        buff = TemporaryFile()
        archive = gzip.GzipFile(fileobj=buff, mode='w')
        for b in bytes:
            archive.write(b)
        archive.close()
        buff.seek(0)
        from pyLibrary.env.big_data import FileString, safe_size
        return FileString(buff)

    buff = BytesIO()
    archive = gzip.GzipFile(fileobj=buff, mode='w')
    archive.write(bytes)
    archive.close()
    return buff.getvalue()
项目:imagepaste    作者:robinchenyu    | 项目源码 | 文件源码
def openstream(self, filename):
        """
        Open a stream as a read-only file object (BytesIO).
        Note: filename is case-insensitive.

        :param filename: path of stream in storage tree (except root entry), either:

            - a string using Unix path syntax, for example:
              'storage_1/storage_1.2/stream'
            - or a list of storage filenames, path to the desired stream/storage.
              Example: ['storage_1', 'storage_1.2', 'stream']

        :returns: file object (read-only)
        :exception IOError: if filename not found, or if this is not a stream.
        """
        sid = self._find(filename)
        entry = self.direntries[sid]
        if entry.entry_type != STGTY_STREAM:
            raise IOError("this file is not a stream")
        return self._open(entry.isectStart, entry.size)
项目:imagepaste    作者:robinchenyu    | 项目源码 | 文件源码
def __init__(self, image=None, **kw):

        # Tk compatibility: file or data
        if image is None:
            if "file" in kw:
                image = Image.open(kw["file"])
                del kw["file"]
            elif "data" in kw:
                from io import BytesIO
                image = Image.open(BytesIO(kw["data"]))
                del kw["data"]

        self.__mode = image.mode
        self.__size = image.size

        if _pilbitmap_check():
            # fast way (requires the pilbitmap booster patch)
            image.load()
            kw["data"] = "PIL:%d" % image.im.id
            self.__im = image  # must keep a reference
        else:
            # slow but safe way
            kw["data"] = image.tobitmap()
        self.__photo = tkinter.BitmapImage(**kw)
项目:PyPPSPP    作者:justas-    | 项目源码 | 文件源码
def get_data_hash(self, data_bytes):
        """Calculate Merkle's root hash of the given data bytes"""

        # Calculate tree parameters
        data_len = len(data_bytes)
        tree_populated_width = math.ceil(data_len / self._chunk_len)
        tree_height = math.ceil(math.log2(tree_populated_width))
        tree_width = int(math.pow(2, tree_height))

        tree_bottom_layer = ['\x00'] * tree_width
        with io.BytesIO(data_bytes) as b_data:
            self._initial_hasher(
                b_data,
                tree_populated_width,
                tree_bottom_layer
            )

        # Get Merkle's root hash
        mrh = self._calculate_root_hash(tree_bottom_layer)
        return mrh
项目:gransk    作者:pcbje    | 项目源码 | 文件源码
def test_simple(self):
    pipe = test_helper.get_mock_pipeline([])

    mock_mod = test_helper.MockSubscriber()
    mock_mod = test_helper.MockSubscriber()

    pipe.register_magic(b'\xFF\xEE\xDD', ('mock', mock_mod.consume))
    pipe.register_magic(b'\x00\x00\x00', ('mock', mock_mod.consume))

    _magic = magic.Subscriber(pipe)
    _magic.setup(None)

    doc = document.get_document('mock')

    content = b'\xFF\xEE\xDDMOCKMOCKMOCK'

    _magic.consume(doc, BytesIO(content))

    self.assertEquals(True, doc.magic_hit)
    self.assertEquals(1, len(mock_mod.produced))

    expected = content
    actual = mock_mod.produced[0][1].read()

    self.assertEquals(expected, actual)
项目:gransk    作者:pcbje    | 项目源码 | 文件源码
def test_simple(self):
    _file_meta = file_meta.Subscriber(test_helper.get_mock_pipeline([]))
    response = json.dumps({'Content-Type': 'image/jpeg'}).encode('utf-8')
    _file_meta.setup({
        'data_root': 'local_data',
        'code_root': '.',
        'worker_id': 1,
        'host': 'mock',
        helper.INJECTOR: test_helper.MockInjector(response)
    })

    doc = document.get_document('mock.txt')

    _file_meta.consume(doc, BytesIO(b'mock'))

    expected = 'picture'
    actual = doc.doctype

    self.assertEqual(expected, actual)
项目:gransk    作者:pcbje    | 项目源码 | 文件源码
def test_simple(self):
    mock_pipeline = test_helper.get_mock_pipeline([])

    data_root = os.path.join('local_data', 'unittests')

    if os.path.exists(data_root):
      shutil.rmtree(data_root)

    _copy = copy_file.Subscriber(mock_pipeline)
    _copy.setup({
        helper.DATA_ROOT: data_root,
        'workers': 1,
        'tag': 'default',
        helper.COPY_EXT: ['xyz']
    })

    _copy.consume(document.get_document('mock.xyz'), BytesIO(b'juba.'))
    _copy.consume(document.get_document('ignore.doc'), BytesIO(b'mock'))

    expected = ['39bbf948-mock.xyz']

    actual = os.listdir(os.path.join(data_root, 'files', 'xyz'))

    self.assertEqual(expected, actual)
项目:pyseeder    作者:PurpleI2P    | 项目源码 | 文件源码
def reseed(self, netdb):
        """Compress netdb entries and set content"""
        zip_file = io.BytesIO()
        dat_files = []

        for root, dirs, files in os.walk(netdb):
            for f in files:
                if f.endswith(".dat"):
                    # TODO check modified time
                    # may be not older than 10h
                    dat_files.append(os.path.join(root, f))

        if len(dat_files) == 0:
            raise PyseederException("Can't get enough netDb entries")
        elif len(dat_files) > 75:
            dat_files = random.sample(dat_files, 75)

        with ZipFile(zip_file, "w", compression=ZIP_DEFLATED) as zf:
            for f in dat_files: 
                zf.write(f, arcname=os.path.split(f)[1])

        self.FILE_TYPE = 0x00
        self.CONTENT_TYPE = 0x03
        self.CONTENT = zip_file.getvalue()
        self.CONTENT_LENGTH = len(self.CONTENT)
项目:python-driver    作者:bblfsh    | 项目源码 | 文件源码
def _restart_data(self, format_: str='json') -> None:
        assert format_ == 'json'

        with open(join(CURDIR, 'data', 'helloworld.py')) as f:
            testcode = f.read()

        self.data = Request({
            'filepath': 'test.py',
            'action': 'ParseAST',
            'content': testcode,
            'language': 'python',
        })

        bufferclass = io.StringIO if format_ == 'json' else io.BytesIO

        # This will mock the python_driver stdin
        self.sendbuffer = bufferclass()
        # This will mock the python_driver stdout
        self.recvbuffer = bufferclass()
项目:python-driver    作者:bblfsh    | 项目源码 | 文件源码
def __init__(self, codestr: str, astdict: AstDict) -> None:
        self._astdict = astdict
        # Tokenize and create the noop extractor and the position fixer
        self._tokens: List[Token] = [Token(*i) for i in tokenize.tokenize(BytesIO(codestr.encode('utf-8')).readline)]
        token_lines = _create_tokenized_lines(codestr, self._tokens)
        self.noops_sync = NoopExtractor(codestr, token_lines)
        self.pos_sync   = LocationFixer(codestr, token_lines)
        self.codestr    = codestr

        # This will store a dict of nodes to end positions, it will be filled
        # on parse()
        self._node2endpos = None

        self.visit_Global = self.visit_Nonlocal = self._promote_names
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def __len__(self):
        """
        Returns the length of the content
        """
        if not self.filepath:
            # If there is no filepath, then we're probably dealing with a
            # stream in memory like a StringIO or BytesIO stream.
            if self.stream:
                # Advance to the end of the file
                ptr = self.stream.tell()
                # Advance to the end of the file and get our length
                length = self.stream.seek(0L, SEEK_END)
                if length != ptr:
                    # Return our pointer
                    self.stream.seek(ptr, SEEK_SET)
            else:
                # No Stream or Filepath; nothing has been initialized
                # yet at all so just return 0
                length = 0
        else:
            if self.stream and self._dirty is True:
                self.stream.flush()
                self._dirty = False

            # Get the size
            length = getsize(self.filepath)

        return length
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def get_stream(self, resource):
        return io.BytesIO(self.get_bytes(resource))
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def get_resource_stream(self, manager, resource_name):
        return io.BytesIO(self.get_resource_string(manager, resource_name))
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def prepare_response(self, request, cached):
        """Verify our vary headers match and construct a real urllib3
        HTTPResponse object.
        """
        # Special case the '*' Vary value as it means we cannot actually
        # determine if the cached response is suitable for this request.
        if "*" in cached.get("vary", {}):
            return

        # Ensure that the Vary headers for the cached response match our
        # request
        for header, value in cached.get("vary", {}).items():
            if request.headers.get(header, None) != value:
                return

        body_raw = cached["response"].pop("body")

        headers = CaseInsensitiveDict(data=cached['response']['headers'])
        if headers.get('transfer-encoding', '') == 'chunked':
            headers.pop('transfer-encoding')

        cached['response']['headers'] = headers

        try:
            body = io.BytesIO(body_raw)
        except TypeError:
            # This can happen if cachecontrol serialized to v1 format (pickle)
            # using Python 2. A Python 2 str(byte string) will be unpickled as
            # a Python 3 str (unicode string), which will cause the above to
            # fail with:
            #
            #     TypeError: 'str' does not support the buffer interface
            body = io.BytesIO(body_raw.encode('utf8'))

        return HTTPResponse(
            body=body,
            preload_content=False,
            **cached["response"]
        )
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def __init__(self, fp, callback):
        self.__buf = BytesIO()
        self.__fp = fp
        self.__callback = callback
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def captured_output(stream_name):
    """Return a context manager used by captured_stdout/stdin/stderr
    that temporarily replaces the sys stream *stream_name* with a StringIO.

    Taken from Lib/support/__init__.py in the CPython repo.
    """
    orig_stdout = getattr(sys, stream_name)
    setattr(sys, stream_name, StreamWrapper.from_stream(orig_stdout))
    try:
        yield getattr(sys, stream_name)
    finally:
        setattr(sys, stream_name, orig_stdout)
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def test_verifying_zipfile():
    if not hasattr(zipfile.ZipExtFile, '_update_crc'):
        pytest.skip('No ZIP verification. Missing ZipExtFile._update_crc.')

    sio = StringIO()
    zf = zipfile.ZipFile(sio, 'w')
    zf.writestr("one", b"first file")
    zf.writestr("two", b"second file")
    zf.writestr("three", b"third file")
    zf.close()

    # In default mode, VerifyingZipFile checks the hash of any read file
    # mentioned with set_expected_hash(). Files not mentioned with
    # set_expected_hash() are not checked.
    vzf = wheel.install.VerifyingZipFile(sio, 'r')
    vzf.set_expected_hash("one", hashlib.sha256(b"first file").digest())
    vzf.set_expected_hash("three", "blurble")
    vzf.open("one").read()
    vzf.open("two").read()
    try:
        vzf.open("three").read()
    except wheel.install.BadWheelFile:
        pass
    else:
        raise Exception("expected exception 'BadWheelFile()'")

    # In strict mode, VerifyingZipFile requires every read file to be
    # mentioned with set_expected_hash().
    vzf.strict = True
    try:
        vzf.open("two").read()
    except wheel.install.BadWheelFile:
        pass
    else:
        raise Exception("expected exception 'BadWheelFile()'")

    vzf.set_expected_hash("two", None)
    vzf.open("two").read()
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def get_resource_stream(self, manager, resource_name):
        return io.BytesIO(self.get_resource_string(manager, resource_name))
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def quopri_encode(input, errors='strict'):
    assert errors == 'strict'
    f = BytesIO(input)
    g = BytesIO()
    quopri.encode(f, g, quotetabs=True)
    return (g.getvalue(), len(input))
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def quopri_decode(input, errors='strict'):
    assert errors == 'strict'
    f = BytesIO(input)
    g = BytesIO()
    quopri.decode(f, g)
    return (g.getvalue(), len(input))
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def uu_encode(input, errors='strict', filename='<data>', mode=0o666):
    assert errors == 'strict'
    infile = BytesIO(input)
    outfile = BytesIO()
    read = infile.read
    write = outfile.write

    # Encode
    write(('begin %o %s\n' % (mode & 0o777, filename)).encode('ascii'))
    chunk = read(45)
    while chunk:
        write(binascii.b2a_uu(chunk))
        chunk = read(45)
    write(b' \nend\n')

    return (outfile.getvalue(), len(input))
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def uu_decode(input, errors='strict'):
    assert errors == 'strict'
    infile = BytesIO(input)
    outfile = BytesIO()
    readline = infile.readline
    write = outfile.write

    # Find start of encoded data
    while 1:
        s = readline()
        if not s:
            raise ValueError('Missing "begin" line in input data')
        if s[:5] == b'begin':
            break

    # Decode
    while True:
        s = readline()
        if not s or s == b'end\n':
            break
        try:
            data = binascii.a2b_uu(s)
        except binascii.Error as v:
            # Workaround for broken uuencoders by /Fredrik Lundh
            nbytes = (((s[0]-32) & 63) * 4 + 5) // 3
            data = binascii.a2b_uu(s[:nbytes])
            #sys.stderr.write("Warning: %s\n" % str(v))
        write(data)
    if not s:
        raise ValueError('Truncated input data')

    return (outfile.getvalue(), len(input))
项目:ownbot    作者:michaelimfeld    | 项目源码 | 文件源码
def test_load_config_empty_file(self):
        """
            Test loading of the config attr if the config file is empty
        """
        usrmgr = self.__get_dummy_object()
        with patch("os.path.exists", return_value=True),\
                patch("ownbot.usermanager.open") as open_mock:

            open_mock.return_value = io.BytesIO(b"")

            self.assertEqual(usrmgr.config, {})
            self.assertTrue(open_mock.called)
项目:geo-pyprint    作者:ioda-net    | 项目源码 | 文件源码
def _create_map_image(self):
        images = self._get_images()

        self._map_image = Image.new('RGBA', self._map_size)

        for img in images:
            if not isinstance(img, Image.Image):
                img = Image.open(BytesIO(img.content)).convert('RGBA')

            self._map_image = Image.alpha_composite(self._map_image, img)
项目:geo-pyprint    作者:ioda-net    | 项目源码 | 文件源码
def _create_pdf_libreoffice(self):
        output_image = BytesIO()
        self._map_image.save(output_image, 'PNG')

        render = Renderer(media_path='.')
        # TODO: use the configuration to select the template
        # TODO: use the configuration to select the name of the key in the template
        result = render.render('template.odt', my_map=output_image)
        with NamedTemporaryFile(
                mode='wb+',
                prefix='geo-pyprint_',
                delete=True
        ) as generated_odt:
            generated_odt.write(result)
            generated_odt.flush()

            output_name = generated_odt.name + '.pdf'
            cmd = [
                'unoconv',
                '-f',
                'pdf',
                '-o',
                output_name,
                generated_odt.name
            ]
            subprocess.call(cmd, timeout=None)

            return output_name
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def load_verify_locations(self, cafile=None, capath=None, cadata=None):
        if cafile is not None:
            cafile = cafile.encode('utf-8')
        if capath is not None:
            capath = capath.encode('utf-8')
        self._ctx.load_verify_locations(cafile, capath)
        if cadata is not None:
            self._ctx.load_verify_locations(BytesIO(cadata))