Python io 模块,SEEK_SET 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用io.SEEK_SET

项目:PINCE    作者:korcankaraokcu    | 项目源码 | 文件源码
def refresh_contents(self):
        log_file = open(self.log_path)
        log_file.seek(0, io.SEEK_END)
        end_pos = log_file.tell()
        if end_pos > 20000:
            log_file.seek(end_pos - 20000, io.SEEK_SET)
        else:
            log_file.seek(0, io.SEEK_SET)
        contents = log_file.read().split("\n", 1)[-1]
        if contents != self.contents:
            self.contents = contents
            self.textBrowser_LogContent.clear()
            self.textBrowser_LogContent.setPlainText(contents)

            # Scrolling to bottom
            cursor = self.textBrowser_LogContent.textCursor()
            cursor.movePosition(QTextCursor.End)
            self.textBrowser_LogContent.setTextCursor(cursor)
            self.textBrowser_LogContent.ensureCursorVisible()
        log_file.close()
项目:pyheatshrink    作者:johan-sports    | 项目源码 | 文件源码
def seek(self, offset, whence=io.SEEK_SET):
        """Change the file position.

        The new position is specified by offset, relative to the
        position indicated by whence. Values for whence are:

            0: start of stream (default); offset must not be negative
            1: current stream position
            2: end of stream; offset must not be positive

        Returns the new file position.

        Note that seeking is emulated, so depending on the parameters,
        this operation may be extremely slow.
        """
        with self._lock:
            self._check_can_seek()
            return self._buffer.seek(offset, whence)
项目:oci-python-sdk    作者:oracle    | 项目源码 | 文件源码
def __init__(self, file_object, start, size):
        """
        Build an object that will act like a BufferedReader object,
        but constrained to a predetermined part of the file_object.

        :param file_object: A file opened for reading in binary mode.
        :param start: Start of the part within the file_object
        :param size: Ideal size of the part.  This will be set
                     to the number of bytes remaining in the
                     file if there aren't enough bytes remaining.
        """
        self.file = file_object
        self.bytes_read = 0
        self.start = start

        # Seek to the end of the file to see how many bytes remain
        # from the start of the part.
        self.file.seek(0, io.SEEK_END)
        self.size = min(self.file.tell() - self.start, size)

        # Reset the pointer to the start of the part.
        self.file.seek(start, io.SEEK_SET)
项目:Playground3    作者:CrimsonVista    | 项目源码 | 文件源码
def seek(self, offset, whence=io.SEEK_SET):
        self.__raiseIfClosed()
        if not self.__seekable:
            raise OSError("Seek not enabled for this stream")
        if whence == io.SEEK_SET:
            if offset < 0:
                raise ValueError("Cannot have a negative absolute seek")
            newStreamPosition = offset
        elif whence == io.SEEK_CUR:
            newStreamPosition = self.__streamPosition + whence
        elif whence == io.SEEK_END:
            if not self.__buffers:
                newStreamPosition = 0
            else:
                newStreamPosition = self.__streamEnd + offset
        self.__streamPosition = newStreamPosition
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
项目:dontwi    作者:vocalodon    | 项目源码 | 文件源码
def get_mime(self, media_io):
        if hasattr(media_io, "getvalue"):
            buffer = media_io.getvalue()
        elif hasattr(media_io, "seekable") and media_io.seekable():
            buffer = media_io.read(1024)
            media_io.seek(SEEK_SET)
        else:
            raise DontwiMediaError
        return self.magic.from_buffer(buffer)
项目:dontwi    作者:vocalodon    | 项目源码 | 文件源码
def get_media_size(media_io):
        if hasattr(media_io, "getvalue"):
            size = len(media_io.getvalue())
        elif hasattr(media_io, "seekable") and media_io.seekable():
            media_io.seek(0, SEEK_END)
            size = media_io.tell()
            media_io.seek(SEEK_SET)
        else:
            raise DontwiMediaError
        return size
项目:pixelinkds    作者:hgrecco    | 项目源码 | 文件源码
def reset(self):
        """Move the cursor the beginning of the 1st frame descriptor.
        """
        self._fin.seek(self._offset, io.SEEK_SET)
项目:io_scene_kcl    作者:Syroot    | 项目源码 | 文件源码
def seek(self, offset, whence=io.SEEK_SET):
        self.reader.seek(offset, whence)
项目:io_scene_kcl    作者:Syroot    | 项目源码 | 文件源码
def seek(self, offset, whence=io.SEEK_SET):
        self.writer.seek(offset, whence)
项目:fingerprint-securedrop    作者:freedomofpress    | 项目源码 | 文件源码
def get_full_trace(self, start_idx, end_idx):
        """Returns the Tor DATA cells transmitted over a circuit during a
        specified time period."""
        # Sanity check
        assert start_idx >= 0 and end_idx > 0, ("Invalid (negative) logfile "
                                                "position")
        assert end_idx > start_idx, ("logfile section end_idx must come "
                                     "after start_idx")

        self.cell_log.seek(start_idx, SEEK_SET)
        return self.cell_log.read(end_idx - start_idx)
项目:io_scene_bfres    作者:Syroot    | 项目源码 | 文件源码
def seek(self, offset, whence=io.SEEK_SET):
        self.reader.seek(offset, whence)
项目:io_scene_bfres    作者:Syroot    | 项目源码 | 文件源码
def seek(self, offset, whence=io.SEEK_SET):
        self.writer.seek(offset, whence)
项目:SpongeAuth    作者:lukegb    | 项目源码 | 文件源码
def _generate_image():
    from PIL import Image
    thumb = Image.new('RGB', (100, 100), 'blue')
    thumb_io = io.BytesIO()
    thumb.save(thumb_io, format='PNG')
    thumb_io.seek(0, io.SEEK_SET)
    return SimpleUploadedFile("image.png", thumb_io.getbuffer(), content_type="image/png")
项目:mauzr    作者:eqrx    | 项目源码 | 文件源码
def __getitem__(self, name):
        # Retrieve value of an input pin.

        pin = self.pins[name]
        if pin["type"] != "in":
            raise KeyError(f"Not an input: {name}")
        pin["file"].seek(0, io.SEEK_SET)
        return pin["file"].read(1) == "1"
项目:ivaochdoc    作者:ivaoch    | 项目源码 | 文件源码
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
项目:ceiba-dl    作者:lantw44    | 项目源码 | 文件源码
def web(self, path, args={}, encoding=None, allow_return_none=False):
        self.logger.debug('????????')
        if allow_return_none:
            if path in self.web_cache and self.web_cache[path] == args:
                self.logger.debug('????? {} ????'.format(path))
                self.logger.debug('???{}'.format(args))
                return
        self.web_cache[path] = dict(args)
        url = urllib.parse.urljoin(self.web_url, urllib.parse.quote(path))
        if len(args) > 0:
            url += '?' + urllib.parse.urlencode(args)
        self.logger.debug('HTTP ?????{}'.format(url))
        data = io.BytesIO()
        self.curl.setopt(pycurl.URL, url)
        self.curl.setopt(pycurl.COOKIE, self.web_cookie)
        self.curl.setopt(pycurl.NOBODY, False)
        self.curl.setopt(pycurl.NOPROGRESS, True)
        self.curl.setopt(pycurl.WRITEDATA, data)
        self.curl.setopt(pycurl.HEADERFUNCTION, lambda *x: None)
        self.curl.setopt(pycurl.XFERINFOFUNCTION, lambda *x: None)
        self.curl.perform()
        status = self.curl.getinfo(pycurl.RESPONSE_CODE)
        if status != 200:
            raise ServerError(status)
        data.seek(io.SEEK_SET)
        return etree.parse(data, etree.HTMLParser(
            encoding=encoding, remove_comments=True))
项目:relaax    作者:deeplearninc    | 项目源码 | 文件源码
def recv(self, n):
        self._buffer.seek(self._read_pos, io.SEEK_SET)
        bs = self._buffer.read(n)
        self._read_pos = self._buffer.tell()
        return bs
项目:io_scene_mk8muunt    作者:Syroot    | 项目源码 | 文件源码
def seek(self, offset, whence=io.SEEK_SET):
        self.reader.seek(offset, whence)
项目:io_scene_mk8muunt    作者:Syroot    | 项目源码 | 文件源码
def seek(self, offset, whence=io.SEEK_SET):
        self.writer.seek(offset, whence)
项目:news-for-good    作者:thecodinghub    | 项目源码 | 文件源码
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
项目:Tencent_Cartoon_Download    作者:Fretice    | 项目源码 | 文件源码
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
项目:fieldsight-kobocat    作者:awemulya    | 项目源码 | 文件源码
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
项目:python3-ipfs-api    作者:jgraef    | 项目源码 | 文件源码
def seek(self, offset, whence):
        if (whence == io.SEEK_SET):
            self._pos = offset
        elif (whence == io.SEEK_CUR):
            self._pos += offset
        elif (whence == io.SEEK_END):
            self._pos = self._file._filesize + offset
        return self._pos
项目:python3-ipfs-api    作者:jgraef    | 项目源码 | 文件源码
def test_small_file_seek_and_read(self):
        with self.fs.open(self.KEY_LOGO_PNG, "rb") as f:
            self.assertEqual(64, f.seek(64, io.SEEK_CUR))
            self.assertEqual(128, f.seek(64, io.SEEK_CUR))
            self.assertEqual(256, f.seek(256, io.SEEK_SET))
            self.assertEqual(24610, f.seek(-256, io.SEEK_END))
            self.assertEqual(
                b'\x04$\x00_\x85$\xfb^\xf8\xe8]\x7f;}\xa8\xb7',
                f.read(16))
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
项目:pyfilesystem2    作者:PyFilesystem    | 项目源码 | 文件源码
def seek(self, offset, whence=SEEK_SET):
        return self._f.seek(offset, whence)
项目:ivport-v2    作者:ivmech    | 项目源码 | 文件源码
def seek(self, offset, whence=io.SEEK_SET):
        """
        Change the stream position to the given byte *offset*. *offset* is
        interpreted relative to the position indicated by *whence*. Values for
        *whence* are:

        * ``SEEK_SET`` or ``0`` – start of the stream (the default); *offset*
          should be zero or positive

        * ``SEEK_CUR`` or ``1`` – current stream position; *offset* may be
          negative

        * ``SEEK_END`` or ``2`` – end of the stream; *offset* is usually
          negative

        Return the new absolute position.
        """
        with self.lock:
            if whence == io.SEEK_CUR:
                offset = self._pos + offset
            elif whence == io.SEEK_END:
                offset = self._length + offset
            if offset < 0:
                raise ValueError(
                    'New position is before the start of the stream')
            self._set_pos(offset)
            return self._pos
项目:CloudPrint    作者:William-An    | 项目源码 | 文件源码
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
项目:pyheatshrink    作者:johan-sports    | 项目源码 | 文件源码
def seek(self, offset, whence=io.SEEK_SET):
        # Recalculate offset as an absolute file position.
        if whence == io.SEEK_SET:
            pass
        elif whence == io.SEEK_CUR:
            offset += self._pos
        elif whence == io.SEEK_END:
            if self._size < 0:
                # Finish reading the file
                while self.read(io.DEFAULT_BUFFER_SIZE):
                    pass
            offset += self._size
        else:
            raise ValueError('Invalid value for whence: {}'.format(whence))

        if offset < 0:
            msg = '[Error {code}] {msg}'
            raise IOError(msg.format(code=errno.EINVAL,
                                     msg=os.strerror(errno.EINVAL)))

        # Make it so that offset is the number of bytes to skip forward.
        if offset < self._pos:
            self._rewind()
        else:
            offset -= self._pos

        # Read and discard data until we reach the desired position
        while offset > 0:
            data = self.read(min(io.DEFAULT_BUFFER_SIZE, offset))
            if not data:
                break
            offset -= len(data)

        return self._pos
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
项目:gardenbot    作者:GoestaO    | 项目源码 | 文件源码
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
项目:projeto    作者:BarmyPenguin    | 项目源码 | 文件源码
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
项目:flask-zhenai-mongo-echarts    作者:Fretice    | 项目源码 | 文件源码
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
项目:aweasome_learning    作者:Knight-ZXW    | 项目源码 | 文件源码
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
项目:keepass-menu    作者:frostidaho    | 项目源码 | 文件源码
def seek(self, offset, whence=io.SEEK_SET):
        if self.in_buffer:
            return self.in_buffer.seek(offset, whence)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
项目:diffoscope    作者:ReproducibleBuilds    | 项目源码 | 文件源码
def recognizes(cls, file):
        size = os.stat(file.path).st_size
        if size < CBFS_HEADER_SIZE or size > CBFS_MAXIMUM_FILE_SIZE:
            return False
        with open(file.path, 'rb') as f:
            # pick at the latest byte as it should contain the relative offset of the header
            f.seek(-4, io.SEEK_END)
            # <pgeorgi> given the hardware we support so far, it looks like
            #           that field is now bound to be little endian
            #   -- #coreboot, 2015-10-14
            rel_offset = struct.unpack('<i', f.read(4))[0]
            if rel_offset < 0 and -rel_offset > CBFS_HEADER_SIZE and -rel_offset < size:
                f.seek(rel_offset, io.SEEK_END)
                logger.debug('looking for header at offset: %x', f.tell())
                if is_header_valid(f.read(CBFS_HEADER_SIZE), size):
                    return True
            elif not file.name.endswith('.rom'):
                return False
            else:
                logger.debug('CBFS relative offset seems wrong, scanning whole image')
            f.seek(0, io.SEEK_SET)
            offset = 0
            buf = f.read(CBFS_HEADER_SIZE)
            while len(buf) >= CBFS_HEADER_SIZE:
                if is_header_valid(buf, size, offset):
                    return True
                if len(buf) - offset <= CBFS_HEADER_SIZE:
                    buf = f.read(32768)
                    offset = 0
                else:
                    offset += 1
            return False
项目:synthesizer    作者:irmen    | 项目源码 | 文件源码
def add_sample(self, sample, end_callback=None):
        assert sample.samplewidth == self.samplewidth
        assert sample.samplerate == self.samplerate
        assert sample.nchannels == self.nchannels
        stream = io.BytesIO()
        sample.write_wav(stream)
        stream.seek(0, io.SEEK_SET)
        self.add_stream(stream, end_callback=end_callback)
项目:blog_flask    作者:momantai    | 项目源码 | 文件源码
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
项目:bfres    作者:Syroot    | 项目源码 | 文件源码
def load(self, value_type: type):
        """
        Reads and returns an instance of the value_type from the following offset or returns None if the read offset is
        0.
        :param value_type: The type of the instance to read.
        :return: The instance or None.
        """
        offset = self.read_offset()
        if not offset:
            return None
        with self.temporary_seek(offset, io.SEEK_SET):
            return self._read_res_data(value_type)
项目:bfres    作者:Syroot    | 项目源码 | 文件源码
def load_custom(self, callback, offset: int = None):
        """
        Reads and returns an instance of arbitrary type from the following offset with the given callback or returns
        None if the read offset is 0.
        :param callback: The callback to read the instance data with.
        :param offset: The optional offset to use instead of reading a following one.
        :return: The data instance or None.
        """
        offset = offset or self.read_offset()
        if not offset:
            return None
        with self.temporary_seek(offset, io.SEEK_SET):
            return callback(self)
项目:bfres    作者:Syroot    | 项目源码 | 文件源码
def load_dict(self, value_type: type):
        """
        Reads and returns a ResDict instance with elements of the given value_type from the following offset or returns
        an empty instance if the read offset is 0.
        :param value_type: The type of the elements.
        :return: The ResDict instance.
        """
        offset = self.read_offset()
        resdict = bfres.ResDict(value_type)
        if not offset:
            return resdict
        with self.temporary_seek(offset, io.SEEK_SET):
            resdict.load(self)
            return resdict
项目:bfres    作者:Syroot    | 项目源码 | 文件源码
def load_list(self, value_type: type, count: int, offset: int = None):
        items = []
        offset = offset or self.read_offset()
        if not offset or not count:
            return items
        # Seek to the list start and read it.
        with self.temporary_seek(offset, io.SEEK_SET):
            for i in range(count):
                items.append(self._read_res_data(value_type))
        return items
项目:bfres    作者:Syroot    | 项目源码 | 文件源码
def load_string(self, encoding: str = None):
        """
        Reads and returns a str instance from the following offset or None if the read offset is 0.
        :return: The read text.
        """
        offset = self.read_offset()
        if not offset:
            return None
        with self.temporary_seek(offset, io.SEEK_SET):
            return self.read_string_0(encoding or self.encoding)
项目:oci-python-sdk    作者:oracle    | 项目源码 | 文件源码
def seek(self, offset, whence):
        """
        Seek within the part.  This is similar to the standard seek, except
        that io.SEEK_SET is the start of the part and io.SEEK_END is the
        end of the part.

        :param offset: Offset in bytes from location determined by the whence value
        :param whence: io.SEEK_END and io.SEEK_SET are supported.
        """
        if whence == io.SEEK_END:
            self.file.seek(self.start + self.size + offset, io.SEEK_SET)
        elif whence == io.SEEK_SET:
            self.file.seek(self.start + offset, io.SEEK_SET)
        else:
            raise RuntimeError("Unhandled whence value: {}".format(whence))
项目:oci-python-sdk    作者:oracle    | 项目源码 | 文件源码
def add_parts_from_file(self, file_path):
        """
        Splits a file into parts and adds all parts to an internal list of parts to upload.  The parts will not be uploaded to the server until upload is called.

        :param string file_path: Path of file to upload in parts
        """
        with io.open(file_path, mode='rb') as file_object:
            file_object.seek(0, io.SEEK_END)
            end = file_object.tell()
            file_object.seek(0, io.SEEK_SET)
            offset = 0
            while file_object.tell() < end:
                self.add_part_from_file(file_path, offset=offset, size=self.part_size)
                offset += self.part_size
                file_object.seek(offset, io.SEEK_SET)
项目:MyFriend-Rob    作者:lcheniv    | 项目源码 | 文件源码
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
项目:Playground3    作者:CrimsonVista    | 项目源码 | 文件源码
def seek(self, offset, whence=SEEK_SET):
        "Adjust seek from prefix start and, if present, from prefix"
        if not self._rawStreamSize() >= (self.PREFIX_SIZE + self.SUFFIX_SIZE):
            return
        if whence == SEEK_SET:
            offset += self._prefixStart + self.PREFIX_SIZE
            return self._stream.seek(offset, whence)
        elif whence == SEEK_CUR:
            return self._stream.seek(offset, whence)
        elif whence == SEEK_END:
            # even if the suffix hasn't been received yet, we calculate our offsets as if it had.
            # why? because if it hasn't been received yet, we don't want to finish! The whole packet
            # isn't framed (verified) until the final bytes are received.
            offset = offset - self.SUFFIX_SIZE
            return self._stream.seek(offset, whence)
项目:ogre_blender_importer    作者:lamogui    | 项目源码 | 文件源码
def importMesh(self, stream, filename=None):
        assert(issubclass(type(stream),IOBase));

        #Check mesh name validity
        if (filename is None):
            if (hasattr(stream,'name')):
                filename = stream.name;
            elif (hasattr(stream, 'filename')):
                filename = stream.filename;
            else:
                raise ValueError("Cannot determine the filename of the stream please add filename parameter")

        filename = os.path.basename(filename);
        mesh_name = os.path.splitext(filename)[0];
        if mesh_name in bpy.data.meshes.keys():
            raise ValueError("Mesh with name " + mesh_name + " already exists in blender");

        #Check header and select impl
        self._determineEndianness(stream);
        headerID = self._readUShorts(stream,1)[0];
        if (headerID != OgreMeshSerializer.HEADER_CHUNK_ID):
            raise ValueError("File header not found");
        ver = OgreSerializer.readString(stream);
        stream.seek(0,SEEK_SET);
        impl = None;
        for i in self._versionData:
            if (i.versionString == ver):
                impl = i.impl;
                break;
        if (impl is None):
            print(ver);
            raise ValueError("Cannot find serializer implementation for "
                             "mesh version " + ver);

        #Create the blender mesh and import the mesh
        mesh = OgreMesh(mesh_name);
        impl.importMesh(stream,mesh,self.listener);

        #Check if newer version
        if (ver != self._versionData[0].versionString):
            print("Warning: "
                  " older format (" + ver + "); you should upgrade it as soon as possible" +
                  " using the OgreMeshUpgrade tool.");

        #Probably useless
        if (self.listener is not None):
            listener.processMeshCompleted(mesh);