我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.SEEK_END。
def create_media(media): """Download media link""" if is_valid_url(media.data_value): filename = media.data_value.split('/')[-1] data_file = NamedTemporaryFile() content_type = mimetypes.guess_type(filename) with closing(requests.get(media.data_value, stream=True)) as r: for chunk in r.iter_content(chunk_size=CHUNK_SIZE): if chunk: data_file.write(chunk) data_file.seek(os.SEEK_SET, os.SEEK_END) size = os.path.getsize(data_file.name) data_file.seek(os.SEEK_SET) media.data_value = filename media.data_file = InMemoryUploadedFile( data_file, 'data_file', filename, content_type, size, charset=None) return media return None
def seek(self, pos, whence=os.SEEK_SET): """Seek to a position in the file. """ if self.closed: raise ValueError("I/O operation on closed file") if whence == os.SEEK_SET: self.position = min(max(pos, 0), self.size) elif whence == os.SEEK_CUR: if pos < 0: self.position = max(self.position + pos, 0) else: self.position = min(self.position + pos, self.size) elif whence == os.SEEK_END: self.position = max(min(self.size + pos, self.size), 0) else: raise ValueError("Invalid argument") self.buffer = b"" self.fileobj.seek(self.position)
def seek(self, pos, whence=os.SEEK_SET): """Seek to a position in the file. """ if self.closed: raise ValueError("I/O operation on closed file") if whence == os.SEEK_SET: self.position = min(max(pos, 0), self.size) elif whence == os.SEEK_CUR: if pos < 0: self.position = max(self.position + pos, 0) else: self.position = min(self.position + pos, self.size) elif whence == os.SEEK_END: self.position = max(min(self.size + pos, self.size), 0) else: raise ValueError("Invalid argument") self.buffer = "" self.fileobj.seek(self.position)
def map_file(cls, fileobj, offset = 0, size = None): # If no size is given, it's the whole file by default if size is None: fileobj.seek(0, os.SEEK_END) size = fileobj.tell() - offset # Read the footer fileobj.seek(offset + size - cls._Footer.size) values_pos, = cls._Footer.unpack(fileobj.read(cls._Footer.size)) fileobj.seek(offset) # Map everything id_mapper = cls.IdMapper.map_file(fileobj, offset, size = values_pos) value_array = cls.ValueArray.map_file(fileobj, offset + values_pos, size = size - cls._Footer.size - values_pos) return cls(value_array, id_mapper)
def md5sum2(filename, offset=0, partsize=0): m = get_md5() fp = open(filename, 'rb') if offset > os.path.getsize(filename): fp.seek(os.SEEK_SET, os.SEEK_END) else: fp.seek(offset) left_len = partsize BufferSize = BUFFER_SIZE while True: if left_len <= 0: break elif left_len < BufferSize: buffer_content = fp.read(left_len) else: buffer_content = fp.read(BufferSize) m.update(buffer_content) left_len = left_len - len(buffer_content) md5sum = m.hexdigest() return md5sum
def try_enc(): import os inFile = open('x.wav', 'rb') inFile.seek(0, os.SEEK_END) wavFileSize = inFile.tell() inFile.seek(44) # skip wav header outFile = open('x.mp3', 'wb') lame = LameEncoder(44100,1,128) while(1): inBytes = inFile.read(512) if inBytes == '': break #inBuf = ctypes.create_string_buffer(inBytes, 512) sample_count = len(inBytes) /2 output_buff_len = int(1.25 * sample_count + 7200) output_buff = (ctypes.c_char*output_buff_len)() lame.dll.lame_encode_buffer.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_int, ctypes.c_int, ctypes.POINTER(ctypes.c_char), ctypes.c_int]; output_size = lame.dll.lame_encode_buffer(lame.lame, inBytes, 0, len(inBytes)/2, output_buff, output_buff_len); outFile.write(output_buff[0:output_size]) outFile.close()
def test_stream(self): # guess content types from extension descriptor = AttachableDescriptor(io.BytesIO(b'Simple text'), extension='.txt') self.assertIsInstance(descriptor, StreamDescriptor) self.assertEqual(descriptor.content_type, 'text/plain') descriptor.seek(2) self.assertEqual(descriptor.tell(), 2) descriptor.seek(0, os.SEEK_END) self.assertEqual(descriptor.tell(), 11) # guess extension from original filename descriptor = AttachableDescriptor(io.BytesIO(b'Simple text'), original_filename='letter.pdf') self.assertEqual(descriptor.extension, '.pdf') # guess extension from content type descriptor = AttachableDescriptor(io.BytesIO(b'Simple text'), content_type='application/json') self.assertEqual(descriptor.extension, '.json') self.assertRaises(DescriptorOperationError, lambda: descriptor.filename)
def reopen(self): if self.inner_file is not None: self.inner_file.close() open(self.filename, 'a').close() f = open(self.filename, 'rb') f.seek(0, os.SEEK_END) length = f.tell() if length > 100*1000*1000: f.seek(-1000*1000, os.SEEK_END) while True: if f.read(1) in ('', '\n'): break data = f.read() f.close() f = open(self.filename, 'wb') f.write(data) f.close() self.inner_file = codecs.open(self.filename, 'a', 'utf-8')
def xor_file(input_file, output_file, xorkey): number_added = 0 while True: some_bytes = input_file.read(4) if len(some_bytes) == 0: break if len(some_bytes) % 4 != 0: number_added = 4 - len(some_bytes) some_bytes = some_bytes + "\x00" * (number_added) writable_bytes = struct.pack("<I", (struct.unpack("<I", some_bytes)[0]) ^ xorkey) output_file.write(writable_bytes) if number_added != 0: number_added = 0 - number_added output_file.seek(number_added, os.SEEK_END) output_file.truncate()
def read_random_line(f): import os chunk_size = 16 with open(f, 'rb') as f_handle: f_handle.seek(0, os.SEEK_END) size = f_handle.tell() # i = random.randint(0, size) i = u_randint(0, size) while True: i -= chunk_size if i < 0: chunk_size += i i = 0 f_handle.seek(i, os.SEEK_SET) d = f_handle.read(chunk_size) i_newline = d.rfind(b'\n') if i_newline != -1: i += i_newline + 1 break if i == 0: break f_handle.seek(i, os.SEEK_SET) return f_handle.readline()
def _is_ascii_file(data): """ This function returns True if the data represents an ASCII file. Please note that a False value does not necessary means that the data represents a binary file. It can be a (very *RARE* in real life, but can easily be forged) ascii file. """ # Skip header... data.seek(BINARY_HEADER) size = struct.unpack('<I', data.read(4))[0] # Use seek() method to get size of the file. data.seek(0, os.SEEK_END) file_size = data.tell() # Reset to the start of the file. data.seek(0) if size == 0: # Odds to get that result from an ASCII file are null... print("WARNING! Reported size (facet number) is 0, assuming invalid binary STL file.") return False # Assume binary in this case. return (file_size != BINARY_HEADER + 4 + BINARY_STRIDE * size)
def uuid_from_file(fn, block_size=1 << 20): """ Returns an arbitrary sized unique ASCII string based on the file contents. (exact hashing method may change). """ with open(fn, 'rb') as f: # first get the size import os f.seek(0, os.SEEK_END) size = f.tell() f.seek(0, os.SEEK_SET) del os # done! import hashlib sha1 = hashlib.new('sha512') while True: data = f.read(block_size) if not data: break sha1.update(data) # skip the '0x' return hex(size)[2:] + sha1.hexdigest()
def _uuid_from_file(fn, block_size=1 << 20): with open(fn, 'rb') as f: # first get the size f.seek(0, os.SEEK_END) size = f.tell() f.seek(0, os.SEEK_SET) # done! import hashlib sha1 = hashlib.new('sha512') while True: data = f.read(block_size) if not data: break sha1.update(data) return (hex(size)[2:] + sha1.hexdigest()).encode()
def __init__(self, f, summary=True): """Args: + f: Either a file name or a seekable binary stream. + summary: If True, call self.read_summary(). """ super().__init__(0) if isinstance(f, IOBase): self.stream = f else: self.stream = open(f, 'rb') self.stream.seek(0, SEEK_END) self.stream_size = self.stream.tell() self.stream.seek(0, SEEK_SET) if summary: self.read_summary()
def seek(self, offset, whence=os.SEEK_SET): """Set the file's current position. Args: offset: seek offset as number. whence: seek mode. Supported modes are os.SEEK_SET (absolute seek), and os.SEEK_CUR (seek relative to the current position) and os.SEEK_END (seek relative to the end, offset should be negative). """ self._verify_read_mode() if whence == os.SEEK_SET: self._offset = offset elif whence == os.SEEK_CUR: self._offset += offset elif whence == os.SEEK_END: file_stat = self.stat() self._offset = file_stat.st_size + offset else: raise InvalidArgumentError('Whence mode %d is not supported', whence)
def seek(self, offset, whence=os.SEEK_SET): """Set the file's current position. Args: offset: seek offset as number. whence: seek mode. Supported modes are os.SEEK_SET (absolute seek), os.SEEK_CUR (seek relative to the current position), and os.SEEK_END (seek relative to the end, offset should be negative). """ if whence == os.SEEK_SET: self._position = offset elif whence == os.SEEK_CUR: self._position += offset elif whence == os.SEEK_END: file_stat = stat(self._filename) self._position = file_stat.st_size + offset else: raise InvalidArgumentError('Whence mode %d is not supported', whence) self._buffer = '' self._buffer_pos = 0 self._eof = False
def test_strip_header1(self): test_input_file = io.BytesIO(b"\x46\x2E\x6C\x6F\x61\x64\x2E\x41" b"\x43\x00\x80\xF9\x06\x00\x07\xB5" b"\x50\x00\x00\x3B\x20\x4C\x4F\x41" b"\x44\x45\x52\x20\x66\x6F\x72") temp_output_path, bytes_count = self.strip_header( test_input_file, True) temp_output_file = open(temp_output_path, "rb") temp_output_file.seek(0, os.SEEK_END) try: self.assertEqual(temp_output_file.tell(), 14) self.assertEqual(bytes_count, 14) finally: temp_output_file.close() os.remove(temp_output_path)
def test_strip_header2(self): test_input_file = io.BytesIO(b"\x46\x2E\x6C\x6F\x61\x64\x2E\x41" b"\x43\x00\x80\xF9\x06\x00\x07\xB5" b"\x50\x00\x00\x3B\x20\x4C\x4F\x41" b"\x44\x45\x52\x20\x66\x6F\x72\x20\x20") temp_output_path, bytes_count = self.strip_header( test_input_file, False) temp_output_file = open(temp_output_path, "rb") temp_output_file.seek(0, os.SEEK_END) try: self.assertEqual(temp_output_file.tell(), 16) self.assertEqual(bytes_count, 16) finally: temp_output_file.close() os.remove(temp_output_path)
def test_strip_header3(self): test_input_file = io.BytesIO(b"\x46\x2E\x6C\x6F\x61\x64\x2E\x41" b"\x43\x00\x80\x0A\x00\x00\x07\xB5" b"\x50\x00\x00\x3B\x20\x4C\x4F\x41" b"\x44\x45\x52\x20\x66\x6F\x72\x20") temp_output_path, bytes_count = self.strip_header( test_input_file, False) temp_output_file = open(temp_output_path, "rb") temp_output_file.seek(0, os.SEEK_END) try: self.assertEqual(temp_output_file.tell(), 10) self.assertEqual(bytes_count, 10) finally: temp_output_file.close() os.remove(temp_output_path)
def write_org_json(self, date=(datetime.date.today()), organization='llnl',dict_to_write={}, path_ending_type='', is_list=False): """ Writes stats from the organization to JSON. """ path = ('../github-data/' + organization + '-org/' + path_ending_type + '/' + str(date) + '.json') self.checkDir(path) with open(path, 'w') as out_clear:#clear old data out_clear.close() with open(path, 'a') as out: if is_list:#used for list of items out.write('[') for item in dict_to_write: out.write(json.dumps(dict_to_write[item], sort_keys=True, indent=4, separators=(',', ': ')) + ',') out.seek(-1, os.SEEK_END)#kill last comma out.truncate() if is_list: out.write(']') out.close()