我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.SEEK_SET。
def create_media(media): """Download media link""" if is_valid_url(media.data_value): filename = media.data_value.split('/')[-1] data_file = NamedTemporaryFile() content_type = mimetypes.guess_type(filename) with closing(requests.get(media.data_value, stream=True)) as r: for chunk in r.iter_content(chunk_size=CHUNK_SIZE): if chunk: data_file.write(chunk) data_file.seek(os.SEEK_SET, os.SEEK_END) size = os.path.getsize(data_file.name) data_file.seek(os.SEEK_SET) media.data_value = filename media.data_file = InMemoryUploadedFile( data_file, 'data_file', filename, content_type, size, charset=None) return media return None
def seek(self, pos, whence=os.SEEK_SET): """Seek to a position in the file. """ if self.closed: raise ValueError("I/O operation on closed file") if whence == os.SEEK_SET: self.position = min(max(pos, 0), self.size) elif whence == os.SEEK_CUR: if pos < 0: self.position = max(self.position + pos, 0) else: self.position = min(self.position + pos, self.size) elif whence == os.SEEK_END: self.position = max(min(self.size + pos, self.size), 0) else: raise ValueError("Invalid argument") self.buffer = b"" self.fileobj.seek(self.position)
def seek(self, pos, whence=os.SEEK_SET): """Seek to a position in the file. """ if self.closed: raise ValueError("I/O operation on closed file") if whence == os.SEEK_SET: self.position = min(max(pos, 0), self.size) elif whence == os.SEEK_CUR: if pos < 0: self.position = max(self.position + pos, 0) else: self.position = min(self.position + pos, self.size) elif whence == os.SEEK_END: self.position = max(min(self.size + pos, self.size), 0) else: raise ValueError("Invalid argument") self.buffer = "" self.fileobj.seek(self.position)
def setup(self): # Reset everything. self.f.seek(self.beginning, os.SEEK_SET) self.whereToWriteNewIFDOffset = None self.offsetOfNewPage = 0 self.IIMM = IIMM = self.f.read(4) if not IIMM: # empty file - first page self.isFirst = True return self.isFirst = False if IIMM == b"II\x2a\x00": self.setEndian("<") elif IIMM == b"MM\x00\x2a": self.setEndian(">") else: raise RuntimeError("Invalid TIFF file header") self.skipIFDs() self.goToEnd()
def md5sum2(filename, offset=0, partsize=0): m = get_md5() fp = open(filename, 'rb') if offset > os.path.getsize(filename): fp.seek(os.SEEK_SET, os.SEEK_END) else: fp.seek(offset) left_len = partsize BufferSize = BUFFER_SIZE while True: if left_len <= 0: break elif left_len < BufferSize: buffer_content = fp.read(left_len) else: buffer_content = fp.read(BufferSize) m.update(buffer_content) left_len = left_len - len(buffer_content) md5sum = m.hexdigest() return md5sum
def _get_org(self, ipnum): """ Seek and return organization or ISP name for ipnum. Return org/isp name. :arg ipnum: Result of ip2long conversion """ seek_org = self._seek_country(ipnum) if seek_org == self._databaseSegments: return None read_length = (2 * self._recordLength - 1) * self._databaseSegments try: self._lock.acquire() self._fp.seek(seek_org + read_length, os.SEEK_SET) buf = self._fp.read(const.MAX_ORG_RECORD_LENGTH) finally: self._lock.release() if PY3 and type(buf) is bytes: buf = buf.decode(ENCODING) return buf[:buf.index(chr(0))]
def chunk_generator(chunk_count, chunk_size, file_data): """ Generic chunk generator logic :param chunk_count: Number of chunks wanted :param chunk_size: Size of each chunk :param file_data: bytes to be split into chunk :return: """ try: total_len = len(file_data) is_fp = False except TypeError: total_len = get_file_size(file_data) is_fp = True for i in range(chunk_count): start_range = i * chunk_size end_range = (start_range + chunk_size) if i < (chunk_count - 1) else total_len chunk_info = Chunk(i, start_range, end_range, chunk_count) if is_fp: file_data.seek(chunk_info.start, os.SEEK_SET) yield chunk_info, file_data.read(chunk_info.length) else: yield chunk_info, file_data[chunk_info.start: chunk_info.end]
def read_random_line(f): import os chunk_size = 16 with open(f, 'rb') as f_handle: f_handle.seek(0, os.SEEK_END) size = f_handle.tell() # i = random.randint(0, size) i = u_randint(0, size) while True: i -= chunk_size if i < 0: chunk_size += i i = 0 f_handle.seek(i, os.SEEK_SET) d = f_handle.read(chunk_size) i_newline = d.rfind(b'\n') if i_newline != -1: i += i_newline + 1 break if i == 0: break f_handle.seek(i, os.SEEK_SET) return f_handle.readline()
def uuid_from_file(fn, block_size=1 << 20): """ Returns an arbitrary sized unique ASCII string based on the file contents. (exact hashing method may change). """ with open(fn, 'rb') as f: # first get the size import os f.seek(0, os.SEEK_END) size = f.tell() f.seek(0, os.SEEK_SET) del os # done! import hashlib sha1 = hashlib.new('sha512') while True: data = f.read(block_size) if not data: break sha1.update(data) # skip the '0x' return hex(size)[2:] + sha1.hexdigest()
def _uuid_from_file(fn, block_size=1 << 20): with open(fn, 'rb') as f: # first get the size f.seek(0, os.SEEK_END) size = f.tell() f.seek(0, os.SEEK_SET) # done! import hashlib sha1 = hashlib.new('sha512') while True: data = f.read(block_size) if not data: break sha1.update(data) return (hex(size)[2:] + sha1.hexdigest()).encode()
def close(self): """ Close the blend file writes the blend file to disk if changes has happened """ handle = self.handle if self.is_modified: if self.is_compressed: log.debug("close compressed blend file") handle.seek(os.SEEK_SET, 0) log.debug("compressing started") fs = gzip.open(self.filepath_orig, "wb") data = handle.read(FILE_BUFFER_SIZE) while data: fs.write(data) data = handle.read(FILE_BUFFER_SIZE) fs.close() log.debug("compressing finished") handle.close()
def get(self, path, default=..., sdna_index_refine=None, use_nil=True, use_str=True, base_index=0, ): ofs = self.file_offset if base_index != 0: assert(base_index < self.count) ofs += (self.size // self.count) * base_index self.file.handle.seek(ofs, os.SEEK_SET) if sdna_index_refine is None: sdna_index_refine = self.sdna_index else: self.file.ensure_subtype_smaller(self.sdna_index, sdna_index_refine) dna_struct = self.file.structs[sdna_index_refine] return dna_struct.field_get( self.file.header, self.file.handle, path, default=default, use_nil=use_nil, use_str=use_str, )
def set(self, path, value, sdna_index_refine=None, ): if sdna_index_refine is None: sdna_index_refine = self.sdna_index else: self.file.ensure_subtype_smaller(self.sdna_index, sdna_index_refine) dna_struct = self.file.structs[sdna_index_refine] self.file.handle.seek(self.file_offset, os.SEEK_SET) self.file.is_modified = True return dna_struct.field_set( self.file.header, self.file.handle, path, value) # --------------- # Utility get/set # # avoid inline pointer casting
def iter_array(block, length=-1): assert(block.code == b'DATA') from . import blendfile import os handle = block.file.handle header = block.file.header for i in range(length): block.file.handle.seek(block.file_offset + (header.pointer_size * i), os.SEEK_SET) offset = blendfile.DNA_IO.read_pointer(handle, header) sub_block = block.file.find_block_from_offset(offset) yield sub_block # ----------------------------------------------------------------------------- # ID Expand
def rar_v4(f): # based on http://www.forensicswiki.org/wiki/RAR # and http://acritum.com/winrar/rar-format while True: pos = f.tell() crc, typ, flags, size = struct.unpack('<HBHH', f.read(7)) if flags & 0x8000: size += struct.unpack('<L', f.read(4))[0] if not 0x72 <= typ <= 0x7b: raise FileCorrupted f.try_seek(pos + size, os.SEEK_SET) # f.try_seek(size, os.SEEK_CUR) f.update_pos() if typ == 0x7b: break elif typ == 0x73 and flags & 0x80: return Ellipsis # encrypted, assume bad faith return True
def _get_org(self, ipnum): """ Seek and return organization (or ISP) name for converted IP addr. @param ipnum: Converted IP address @type ipnum: int @return: org/isp name @rtype: str """ seek_org = self._seek_country(ipnum) if seek_org == self._databaseSegments: return None record_pointer = seek_org + (2 * self._recordLength - 1) * self._databaseSegments self._filehandle.seek(record_pointer, os.SEEK_SET) org_buf = self._filehandle.read(const.MAX_ORG_RECORD_LENGTH) return org_buf[:org_buf.index(chr(0))]
def write(self, stream, seekfirst=True): """Write header and child elements. This checks first whether the Element is in a consistent state. Args: + stream: As in Element.write(). + seekfirst: As in Element.write(). Raises: + EbmlException, if the write fails. + Inconsistent, if the Element is not in a consistent state. """ self.check_consistency() if seekfirst: stream.seek(self.pos_absolute, SEEK_SET) stream.write(self.header.encode()) Container._write(self, stream, False)
def __init__(self, f, summary=True): """Args: + f: Either a file name or a seekable binary stream. + summary: If True, call self.read_summary(). """ super().__init__(0) if isinstance(f, IOBase): self.stream = f else: self.stream = open(f, 'rb') self.stream.seek(0, SEEK_END) self.stream_size = self.stream.tell() self.stream.seek(0, SEEK_SET) if summary: self.read_summary()
def parse_SeekHead(self, child, stream): #pylint: disable=invalid-name "Parse SeekHead element and recursively read elements." LOG.debug("Segment: parsed {}".format(child)) recursed = False for seek_entry in child.children_named('Seek'): try: self.find(seek_entry.seek_pos) except ValueError: # Recurse if this is the first time we've seen this seek entry LOG.debug("Segment: adding seek entry {}".format(seek_entry)) if seek_entry.seek_id_name != 'Cluster' and stream: # This recursively reads any elements this seek entry # points to that haven't been read already. self.read_element(stream, seek_entry.seek_pos, summary=True, seekfirst=True) recursed = True if recursed: stream.seek(child.pos_end_absolute, SEEK_SET)
def write(self, stream, seekfirst=True): """Write this element to a binary stream. If this element element is not dirty, this method should reproduce the byte stream used to read it. Raises: + ValueError: if the object's data cannot be encoded in self.size bytes. """ if seekfirst: stream.seek(self.pos_absolute, SEEK_SET) stream.write(self.header.encode()) stream.write(self.encode(self.value, self.size)) # Virtual
def seek(self, offset, whence=os.SEEK_SET): """Set the file's current position. Args: offset: seek offset as number. whence: seek mode. Supported modes are os.SEEK_SET (absolute seek), and os.SEEK_CUR (seek relative to the current position) and os.SEEK_END (seek relative to the end, offset should be negative). """ self._verify_read_mode() if whence == os.SEEK_SET: self._offset = offset elif whence == os.SEEK_CUR: self._offset += offset elif whence == os.SEEK_END: file_stat = self.stat() self._offset = file_stat.st_size + offset else: raise InvalidArgumentError('Whence mode %d is not supported', whence)