Python io 模块,SEEK_END 实例源码

我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用io.SEEK_END

项目:io_scene_kcl    作者:Syroot    | 项目源码 | 文件源码
def write(self, writer, base_address):
            pos = writer.tell()
            writer.seek(0, io.SEEK_END)
            end_position = writer.tell()
            if self.is_leaf:
                # Write the offset back at this nodes address.
                writer.seek(pos)
                writer.write_uint32((end_position - base_address - 2) | 0x80000000)
                writer.seek(end_position + 4)
                # Write the triangle indices and terminate the list with 0xFFFF.
                writer.write_uint16s(self.indices)
                writer.write_uint16(0xFFFF)
            else:
                writer.seek(pos)
                writer.write_uint32(end_position - base_address)
                writer.seek(end_position + 4)
                base = writer.tell()
                writer.write_uint32s([0x00000000] * 8)
                writer.seek(base)
                for node in self.branches:
                    node.write(writer, base)
            writer.seek(pos + 4)
项目:PINCE    作者:korcankaraokcu    | 项目源码 | 文件源码
def refresh_contents(self):
        log_file = open(self.log_path)
        log_file.seek(0, io.SEEK_END)
        end_pos = log_file.tell()
        if end_pos > 20000:
            log_file.seek(end_pos - 20000, io.SEEK_SET)
        else:
            log_file.seek(0, io.SEEK_SET)
        contents = log_file.read().split("\n", 1)[-1]
        if contents != self.contents:
            self.contents = contents
            self.textBrowser_LogContent.clear()
            self.textBrowser_LogContent.setPlainText(contents)

            # Scrolling to bottom
            cursor = self.textBrowser_LogContent.textCursor()
            cursor.movePosition(QTextCursor.End)
            self.textBrowser_LogContent.setTextCursor(cursor)
            self.textBrowser_LogContent.ensureCursorVisible()
        log_file.close()
项目:simple-avs    作者:robladbrook    | 项目源码 | 文件源码
def _downstream_thread(self, stream, downstream_boundary):
        data_buff = io.BytesIO()
        multipart_parser = MultipartParser(data_buff, downstream_boundary)

        while not self._stop_threads_event.is_set():
            if stream.data:
                current_buffer_pos = data_buff.tell()
                data_buff.seek(0, io.SEEK_END)
                data_buff.write(b''.join(stream.data))
                data_buff.seek(current_buffer_pos)
                stream.data = []

            message_part = multipart_parser.get_next_part()

            if not message_part:
                time.sleep(0.5)
                continue

            self._process_message_parts([message_part])
项目:oci-python-sdk    作者:oracle    | 项目源码 | 文件源码
def __init__(self, file_object, start, size):
        """
        Build an object that will act like a BufferedReader object,
        but constrained to a predetermined part of the file_object.

        :param file_object: A file opened for reading in binary mode.
        :param start: Start of the part within the file_object
        :param size: Ideal size of the part.  This will be set
                     to the number of bytes remaining in the
                     file if there aren't enough bytes remaining.
        """
        self.file = file_object
        self.bytes_read = 0
        self.start = start

        # Seek to the end of the file to see how many bytes remain
        # from the start of the part.
        self.file.seek(0, io.SEEK_END)
        self.size = min(self.file.tell() - self.start, size)

        # Reset the pointer to the start of the part.
        self.file.seek(start, io.SEEK_SET)
项目:Playground3    作者:CrimsonVista    | 项目源码 | 文件源码
def seek(self, offset, whence=io.SEEK_SET):
        self.__raiseIfClosed()
        if not self.__seekable:
            raise OSError("Seek not enabled for this stream")
        if whence == io.SEEK_SET:
            if offset < 0:
                raise ValueError("Cannot have a negative absolute seek")
            newStreamPosition = offset
        elif whence == io.SEEK_CUR:
            newStreamPosition = self.__streamPosition + whence
        elif whence == io.SEEK_END:
            if not self.__buffers:
                newStreamPosition = 0
            else:
                newStreamPosition = self.__streamEnd + offset
        self.__streamPosition = newStreamPosition
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
项目:dontwi    作者:vocalodon    | 项目源码 | 文件源码
def get_media_size(media_io):
        if hasattr(media_io, "getvalue"):
            size = len(media_io.getvalue())
        elif hasattr(media_io, "seekable") and media_io.seekable():
            media_io.seek(0, SEEK_END)
            size = media_io.tell()
            media_io.seek(SEEK_SET)
        else:
            raise DontwiMediaError
        return size
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def _reset_frame(self):
        self._iobuf = io.BytesIO(self._iobuf.read())
        self._iobuf.seek(0, 2)  # io.SEEK_END == 2 (constant not present in 2.6)
        self._current_frame = None
项目:fingerprint-securedrop    作者:freedomofpress    | 项目源码 | 文件源码
def get_cell_log_pos(self):
        """Returns the current position of the last byte in the Tor cell log."""
        return self.cell_log.seek(0, SEEK_END)
项目:pymp4    作者:beardypig    | 项目源码 | 文件源码
def dump():
    parser = argparse.ArgumentParser(description='Dump all the boxes from an MP4 file')
    parser.add_argument("input_file", type=argparse.FileType("rb"), metavar="FILE", help="Path to the MP4 file to open")

    args = parser.parse_args()

    fd = args.input_file
    fd.seek(0, io.SEEK_END)
    eof = fd.tell()
    fd.seek(0)

    while fd.tell() < eof:
        box = Box.parse_stream(fd)
        print(box)
项目:ivaochdoc    作者:ivaoch    | 项目源码 | 文件源码
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
项目:relaax    作者:deeplearninc    | 项目源码 | 文件源码
def sendall(self, data):
        self._buffer.seek(0, io.SEEK_END)
        self._buffer.write(data)
项目:news-for-good    作者:thecodinghub    | 项目源码 | 文件源码
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
项目:Tencent_Cartoon_Download    作者:Fretice    | 项目源码 | 文件源码
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
项目:fieldsight-kobocat    作者:awemulya    | 项目源码 | 文件源码
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
项目:python3-ipfs-api    作者:jgraef    | 项目源码 | 文件源码
def seek(self, offset, whence):
        if (whence == io.SEEK_SET):
            self._pos = offset
        elif (whence == io.SEEK_CUR):
            self._pos += offset
        elif (whence == io.SEEK_END):
            self._pos = self._file._filesize + offset
        return self._pos
项目:python3-ipfs-api    作者:jgraef    | 项目源码 | 文件源码
def test_small_file_seek_and_read(self):
        with self.fs.open(self.KEY_LOGO_PNG, "rb") as f:
            self.assertEqual(64, f.seek(64, io.SEEK_CUR))
            self.assertEqual(128, f.seek(64, io.SEEK_CUR))
            self.assertEqual(256, f.seek(256, io.SEEK_SET))
            self.assertEqual(24610, f.seek(-256, io.SEEK_END))
            self.assertEqual(
                b'\x04$\x00_\x85$\xfb^\xf8\xe8]\x7f;}\xa8\xb7',
                f.read(16))
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def testTruncate(self):
        f = _FileIO(TESTFN, 'w')
        f.write(bytes(bytearray(range(10))))
        self.assertEqual(f.tell(), 10)
        f.truncate(5)
        self.assertEqual(f.tell(), 10)
        self.assertEqual(f.seek(0, io.SEEK_END), 5)
        f.truncate(15)
        self.assertEqual(f.tell(), 5)
        self.assertEqual(f.seek(0, io.SEEK_END), 15)
        f.close()
项目:ivport-v2    作者:ivmech    | 项目源码 | 文件源码
def seek(self, offset, whence=io.SEEK_SET):
        """
        Change the stream position to the given byte *offset*. *offset* is
        interpreted relative to the position indicated by *whence*. Values for
        *whence* are:

        * ``SEEK_SET`` or ``0`` – start of the stream (the default); *offset*
          should be zero or positive

        * ``SEEK_CUR`` or ``1`` – current stream position; *offset* may be
          negative

        * ``SEEK_END`` or ``2`` – end of the stream; *offset* is usually
          negative

        Return the new absolute position.
        """
        with self.lock:
            if whence == io.SEEK_CUR:
                offset = self._pos + offset
            elif whence == io.SEEK_END:
                offset = self._length + offset
            if offset < 0:
                raise ValueError(
                    'New position is before the start of the stream')
            self._set_pos(offset)
            return self._pos
项目:CloudPrint    作者:William-An    | 项目源码 | 文件源码
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
项目:pyheatshrink    作者:johan-sports    | 项目源码 | 文件源码
def test_seeking_from_end(self):
        with EncodedFile(io.BytesIO(COMPRESSED)) as fp:
            self.assertEqual(fp.read(100), TEXT[:100])
            seeked_pos = fp.seek(-100, io.SEEK_END)
            self.assertEqual(seeked_pos, len(TEXT) - 100)
            self.assertEqual(fp.read(100), TEXT[-100:])
项目:pyheatshrink    作者:johan-sports    | 项目源码 | 文件源码
def test_seeking_from_end_beyond_beginning(self):
        with EncodedFile(io.BytesIO(COMPRESSED)) as fp:
            # Go to end to get size
            size = fp.seek(0, io.SEEK_END)
            # Go to beginning
            self.assertNotRaises(fp.seek, -size, io.SEEK_END)
            # One before beginning
            self.assertRaises(IOError, fp.seek, -size - 1, io.SEEK_END)
项目:pyheatshrink    作者:johan-sports    | 项目源码 | 文件源码
def seek(self, offset, whence=io.SEEK_SET):
        # Recalculate offset as an absolute file position.
        if whence == io.SEEK_SET:
            pass
        elif whence == io.SEEK_CUR:
            offset += self._pos
        elif whence == io.SEEK_END:
            if self._size < 0:
                # Finish reading the file
                while self.read(io.DEFAULT_BUFFER_SIZE):
                    pass
            offset += self._size
        else:
            raise ValueError('Invalid value for whence: {}'.format(whence))

        if offset < 0:
            msg = '[Error {code}] {msg}'
            raise IOError(msg.format(code=errno.EINVAL,
                                     msg=os.strerror(errno.EINVAL)))

        # Make it so that offset is the number of bytes to skip forward.
        if offset < self._pos:
            self._rewind()
        else:
            offset -= self._pos

        # Read and discard data until we reach the desired position
        while offset > 0:
            data = self.read(min(io.DEFAULT_BUFFER_SIZE, offset))
            if not data:
                break
            offset -= len(data)

        return self._pos
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def testTruncate(self):
        f = _FileIO(TESTFN, 'w')
        f.write(bytes(bytearray(range(10))))
        self.assertEqual(f.tell(), 10)
        f.truncate(5)
        self.assertEqual(f.tell(), 10)
        self.assertEqual(f.seek(0, io.SEEK_END), 5)
        f.truncate(15)
        self.assertEqual(f.tell(), 5)
        self.assertEqual(f.seek(0, io.SEEK_END), 15)
        f.close()
项目:gardenbot    作者:GoestaO    | 项目源码 | 文件源码
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
项目:projeto    作者:BarmyPenguin    | 项目源码 | 文件源码
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
项目:flask-zhenai-mongo-echarts    作者:Fretice    | 项目源码 | 文件源码
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
项目:aweasome_learning    作者:Knight-ZXW    | 项目源码 | 文件源码
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def testTruncate(self):
        f = _FileIO(TESTFN, 'w')
        f.write(bytes(bytearray(range(10))))
        self.assertEqual(f.tell(), 10)
        f.truncate(5)
        self.assertEqual(f.tell(), 10)
        self.assertEqual(f.seek(0, io.SEEK_END), 5)
        f.truncate(15)
        self.assertEqual(f.tell(), 5)
        self.assertEqual(f.seek(0, io.SEEK_END), 15)
        f.close()
项目:diffoscope    作者:ReproducibleBuilds    | 项目源码 | 文件源码
def recognizes(cls, file):
        size = os.stat(file.path).st_size
        if size < CBFS_HEADER_SIZE or size > CBFS_MAXIMUM_FILE_SIZE:
            return False
        with open(file.path, 'rb') as f:
            # pick at the latest byte as it should contain the relative offset of the header
            f.seek(-4, io.SEEK_END)
            # <pgeorgi> given the hardware we support so far, it looks like
            #           that field is now bound to be little endian
            #   -- #coreboot, 2015-10-14
            rel_offset = struct.unpack('<i', f.read(4))[0]
            if rel_offset < 0 and -rel_offset > CBFS_HEADER_SIZE and -rel_offset < size:
                f.seek(rel_offset, io.SEEK_END)
                logger.debug('looking for header at offset: %x', f.tell())
                if is_header_valid(f.read(CBFS_HEADER_SIZE), size):
                    return True
            elif not file.name.endswith('.rom'):
                return False
            else:
                logger.debug('CBFS relative offset seems wrong, scanning whole image')
            f.seek(0, io.SEEK_SET)
            offset = 0
            buf = f.read(CBFS_HEADER_SIZE)
            while len(buf) >= CBFS_HEADER_SIZE:
                if is_header_valid(buf, size, offset):
                    return True
                if len(buf) - offset <= CBFS_HEADER_SIZE:
                    buf = f.read(32768)
                    offset = 0
                else:
                    offset += 1
            return False
项目:blog_flask    作者:momantai    | 项目源码 | 文件源码
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
项目:oci-python-sdk    作者:oracle    | 项目源码 | 文件源码
def seek(self, offset, whence):
        """
        Seek within the part.  This is similar to the standard seek, except
        that io.SEEK_SET is the start of the part and io.SEEK_END is the
        end of the part.

        :param offset: Offset in bytes from location determined by the whence value
        :param whence: io.SEEK_END and io.SEEK_SET are supported.
        """
        if whence == io.SEEK_END:
            self.file.seek(self.start + self.size + offset, io.SEEK_SET)
        elif whence == io.SEEK_SET:
            self.file.seek(self.start + offset, io.SEEK_SET)
        else:
            raise RuntimeError("Unhandled whence value: {}".format(whence))
项目:oci-python-sdk    作者:oracle    | 项目源码 | 文件源码
def add_parts_from_file(self, file_path):
        """
        Splits a file into parts and adds all parts to an internal list of parts to upload.  The parts will not be uploaded to the server until upload is called.

        :param string file_path: Path of file to upload in parts
        """
        with io.open(file_path, mode='rb') as file_object:
            file_object.seek(0, io.SEEK_END)
            end = file_object.tell()
            file_object.seek(0, io.SEEK_SET)
            offset = 0
            while file_object.tell() < end:
                self.add_part_from_file(file_path, offset=offset, size=self.part_size)
                offset += self.part_size
                file_object.seek(offset, io.SEEK_SET)
项目:dopplerr    作者:Stibbons    | 项目源码 | 文件源码
def _reverse_read_lines(fp, buf_size=8192):  # pylint: disable=invalid-name
        """
        Async generator that returns the lines of a file in reverse order.

        ref: https://stackoverflow.com/a/23646049/8776239
        and: https://stackoverflow.com/questions/2301789/read-a-file-in-reverse-order-using-python
        """
        segment = None  # holds possible incomplete segment at the beginning of the buffer
        offset = 0
        await fp.seek(0, io.SEEK_END)
        file_size = remaining_size = await fp.tell()
        while remaining_size > 0:
            offset = min(file_size, offset + buf_size)
            await fp.seek(file_size - offset)
            buffer = await fp.read(min(remaining_size, buf_size))
            remaining_size -= buf_size
            lines = buffer.splitlines(True)
            # the first line of the buffer is probably not a complete line so
            # we'll save it and append it to the last line of the next buffer
            # we read
            if segment is not None:
                # if the previous chunk starts right from the beginning of line
                # do not concat the segment to the last line of new chunk
                # instead, yield the segment first
                if buffer[-1] == '\n':
                    # print 'buffer ends with newline'
                    yield segment
                else:
                    lines[-1] += segment
                    # print 'enlarged last line to >{}<, len {}'.format(lines[-1], len(lines))
            segment = lines[0]
            for index in range(len(lines) - 1, 0, -1):
                l = lines[index]
                if l:
                    yield l
        # Don't yield None if the file was empty
        if segment is not None:
            yield segment
项目:MyFriend-Rob    作者:lcheniv    | 项目源码 | 文件源码
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
项目:Playground3    作者:CrimsonVista    | 项目源码 | 文件源码
def available(self):
        """
        This is duplicate functionality if we have a HighPerformanceStreamIO.
        But we also want to support those that aren't. TODO: Better solution?
        """
        curPos = self._stream.tell()
        self._stream.seek(0, SEEK_END)
        endPos = self._stream.tell()
        self._stream.seek(curPos)
        return endPos-curPos
项目:Playground3    作者:CrimsonVista    | 项目源码 | 文件源码
def _rawStreamSize(self):
        curPos = self._stream.tell()
        self._stream.seek(0, SEEK_END)
        endPos = self._stream.tell()
        self._stream.seek(curPos)
        return endPos - self._prefixStart
项目:Playground3    作者:CrimsonVista    | 项目源码 | 文件源码
def _rawAvailable(self):
        "some underlying streams may not support 'available'"
        curPos = self._stream.tell()
        self._stream.seek(0, SEEK_END)
        endPos = self._stream.tell()
        self._stream.seek(curPos)
        return endPos - curPos
项目:Playground3    作者:CrimsonVista    | 项目源码 | 文件源码
def available(self):
        """
        Get the available bytes, cutting off the suffix if
        it's been acocunted for
        """
        if not self._rawStreamSize() >= (self.PREFIX_SIZE + self.SUFFIX_SIZE):
            return 0
        curPos = self._stream.tell()
        # even if the suffix hasn't been received yet, we calculate our offsets as if it had.
        # why? because if it hasn't been received yet, we don't want to finish! The whole packet
        # isn't framed (verified) until the final bytes are received.
        self._stream.seek(-self.SUFFIX_SIZE, SEEK_END)
        endPos = self._stream.tell()
        self._stream.seek(curPos)
        return endPos-curPos
项目:Playground3    作者:CrimsonVista    | 项目源码 | 文件源码
def seek(self, offset, whence=SEEK_SET):
        "Adjust seek from prefix start and, if present, from prefix"
        if not self._rawStreamSize() >= (self.PREFIX_SIZE + self.SUFFIX_SIZE):
            return
        if whence == SEEK_SET:
            offset += self._prefixStart + self.PREFIX_SIZE
            return self._stream.seek(offset, whence)
        elif whence == SEEK_CUR:
            return self._stream.seek(offset, whence)
        elif whence == SEEK_END:
            # even if the suffix hasn't been received yet, we calculate our offsets as if it had.
            # why? because if it hasn't been received yet, we don't want to finish! The whole packet
            # isn't framed (verified) until the final bytes are received.
            offset = offset - self.SUFFIX_SIZE
            return self._stream.seek(offset, whence)
项目:Playground3    作者:CrimsonVista    | 项目源码 | 文件源码
def update(self, newData):
        beforeWritePos = self.tell()
        self.seek(0, io.SEEK_END)
        self.write(newData)
        self.seek(beforeWritePos)
项目:Playground3    作者:CrimsonVista    | 项目源码 | 文件源码
def available(self):
        cur = self.tell()
        end = self.seek(0, io.SEEK_END)
        self.seek(cur)
        return end-cur
项目:vinyl    作者:pyrated    | 项目源码 | 文件源码
def ended(self) -> bool:
        try:
            fileno = self._codeio.fileno()
            offset = os.fstat(fileno).st_size
        except io.UnsupportedOperation:
            old_offset = self.offset
            self._codeio.seek(0, io.SEEK_END)
            offset = self._codeio.tell()
            self._codeio.seek(old_offset)
        return self.offset == offset
项目:python-dse-driver    作者:datastax    | 项目源码 | 文件源码
def _reset_frame(self):
        self._iobuf = io.BytesIO(self._iobuf.read())
        self._iobuf.seek(0, 2)  # io.SEEK_END == 2 (constant not present in 2.6)
        self._current_frame = None
项目:python-sense-emu    作者:RPi-Distro    | 项目源码 | 文件源码
def _play_run(self, f):
        err = None
        try:
            # Calculate how many records are in the file; we'll use this later
            # when updating the progress bar
            rec_total = (f.seek(0, io.SEEK_END) - HEADER_REC.size) // DATA_REC.size
            f.seek(0)
            skipped = 0
            for rec, data in enumerate(self._play_source(f)):
                now = time()
                if data.timestamp < now:
                    skipped += 1
                    continue
                else:
                    if self._play_event.wait(data.timestamp - now):
                        break
                self.props.application.pressure.set_values(data.pressure, data.ptemp)
                self.props.application.humidity.set_values(data.humidity, data.htemp)
                self.props.application.imu.set_imu_values(
                    (data.ax, data.ay, data.az),
                    (data.gx, data.gy, data.gz),
                    (data.cx, data.cy, data.cz),
                    (data.ox, data.oy, data.oz),
                    )
                # Again, would be better to use custom signals here but
                # attempting to do so just results in seemingly random
                # segfaults during playback
                with self._play_update_lock:
                    if self._play_update_id == 0:
                        self._play_update_id = GLib.idle_add(self._play_update_controls, rec / rec_total)
        except Exception as e:
            err = e
        finally:
            f.close()
            # Must ensure that controls are only re-enabled *after* all pending
            # control updates have run
            with self._play_update_lock:
                if self._play_update_id:
                    GLib.source_remove(self._play_update_id)
                    self._play_update_id = 0
            # Get the main thread to re-enable the controls at the end of
            # playback
            GLib.idle_add(self._play_controls_finish, err)