我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用zipfile.ZipExtFile()。
def open(self, path_or_handle, mode='r'): """ Don't re-open files that are passed as handles, but for easy use-cases, this'll work normally with regular paths """ #TODO: we have to be smarter here about whether we're being passed a file # or whether we should hunt down for a particular filename # based on what each state spits out if mode == 'r' and isinstance(path_or_handle, zipfile.ZipExtFile): # see pa.py for an example of needing zipfile support return TextIOWrapper(path_or_handle, encoding='utf8', errors='ignore', line_buffering=True) elif hasattr(path_or_handle, 'mode'): #py2/3 file/buffer type return path_or_handle else: return open(path_or_handle, mode, errors='ignore')
def test_verifying_zipfile(): if not hasattr(zipfile.ZipExtFile, '_update_crc'): pytest.skip('No ZIP verification. Missing ZipExtFile._update_crc.') sio = StringIO() zf = zipfile.ZipFile(sio, 'w') zf.writestr("one", b"first file") zf.writestr("two", b"second file") zf.writestr("three", b"third file") zf.close() # In default mode, VerifyingZipFile checks the hash of any read file # mentioned with set_expected_hash(). Files not mentioned with # set_expected_hash() are not checked. vzf = wheel.install.VerifyingZipFile(sio, 'r') vzf.set_expected_hash("one", hashlib.sha256(b"first file").digest()) vzf.set_expected_hash("three", "blurble") vzf.open("one").read() vzf.open("two").read() try: vzf.open("three").read() except wheel.install.BadWheelFile: pass else: raise Exception("expected exception 'BadWheelFile()'") # In strict mode, VerifyingZipFile requires every read file to be # mentioned with set_expected_hash(). vzf.strict = True try: vzf.open("two").read() except wheel.install.BadWheelFile: pass else: raise Exception("expected exception 'BadWheelFile()'") vzf.set_expected_hash("two", None) vzf.open("two").read()
def test_verifying_zipfile(): if not hasattr(zipfile.ZipExtFile, '_update_crc'): pytest.skip('No ZIP verification. Missing ZipExtFile._update_crc.') bio = BytesIO() zf = zipfile.ZipFile(bio, 'w') zf.writestr("one", b"first file") zf.writestr("two", b"second file") zf.writestr("three", b"third file") zf.close() # In default mode, VerifyingZipFile checks the hash of any read file # mentioned with set_expected_hash(). Files not mentioned with # set_expected_hash() are not checked. vzf = wheel.install.VerifyingZipFile(bio, 'r') vzf.set_expected_hash("one", hashlib.sha256(b"first file").digest()) vzf.set_expected_hash("three", "blurble") vzf.open("one").read() vzf.open("two").read() pytest.raises(wheel.install.BadWheelFile, vzf.open("three").read) # In strict mode, VerifyingZipFile requires every read file to be # mentioned with set_expected_hash(). vzf.strict = True pytest.raises(wheel.install.BadWheelFile, vzf.open, "two") vzf.set_expected_hash("two", None) vzf.open("two").read()
def process_extracted_file(self, extracted_file, extracted_filename): """ Processes an individual file extracted from the zip Args: extracted_file (zipfile.ZipExtFile): the extracted file-like or iterable object extracted_filename (str): the filename of the extracted file Returns: (bool, list(str)): bool is True if file processed successfuly, error messages are returned in the list """ if extracted_filename.startswith(PEARSON_FILE_TYPES.VCDC): # We send Pearson CDD files and get the results as VCDC files return self.process_vcdc_file(extracted_file) elif extracted_filename.startswith(PEARSON_FILE_TYPES.EAC): # We send Pearson EAD files and get the results as EAC files return self.process_eac_file(extracted_file) elif extracted_filename.startswith(PEARSON_FILE_TYPES.EXAM): return self.process_exam_file(extracted_file) elif any(extracted_filename.startswith(file_type) for file_type in PEARSON_INTENDED_SKIP_FILE_TYPES): # for files we don't care about, act like we processed them # so they don't cause us to leave the zip file on the server # this would cause us to reprocess these zip files forever return True, [] return False, []
def process_vcdc_file(self, extracted_file): """ Processes a VCDC file extracted from the zip Args: extracted_file (zipfile.ZipExtFile): the extracted file-like object Returns: (bool, list(str)): bool is True if file processed successfuly, error messages are returned in the list """ log.debug('Found VCDC file: %s', extracted_file) results, invalid_rows = VCDCReader().read(extracted_file) messages = self.get_invalid_row_messages(invalid_rows) for result in results: try: exam_profile = ExamProfile.objects.get(profile__student_id=result.client_candidate_id) except ExamProfile.DoesNotExist: messages.append(format_and_log_error( 'Unable to find an ExamProfile record:', client_candidate_id=result.client_candidate_id, error=result.message, )) continue if result.status == EAC_SUCCESS_STATUS and 'WARNING' not in result.message: exam_profile.status = ExamProfile.PROFILE_SUCCESS else: exam_profile.status = ExamProfile.PROFILE_FAILED messages.append(format_and_log_error( 'ExamProfile sync failed:', client_candidate_id=result.client_candidate_id, username=exam_profile.profile.user.username, error=result.message, )) exam_profile.save() return True, messages
def process_eac_file(self, extracted_file): """ Processes a EAC file extracted from the zip Args: extracted_file (zipfile.ZipExtFile): the extracted file-like object Returns: (bool, list(str)): bool is True if file processed successfuly, error messages are returned in the list """ log.debug('Found EAC file: %s', extracted_file) results, invalid_rows = EACReader().read(extracted_file) messages = self.get_invalid_row_messages(invalid_rows) for result in results: try: exam_authorization = ExamAuthorization.objects.get(id=result.client_authorization_id) except ExamAuthorization.DoesNotExist: messages.append(format_and_log_error( 'Unable to find a matching ExamAuthorization record:', client_candidate_id=result.client_candidate_id, client_authorization_id=result.client_authorization_id, error=result.message, )) continue if result.status == VCDC_SUCCESS_STATUS: exam_authorization.status = ExamAuthorization.STATUS_SUCCESS else: exam_authorization.status = ExamAuthorization.STATUS_FAILED messages.append(format_and_log_error( 'ExamAuthorization sync failed:', username=exam_authorization.user.username, client_authorization_id=result.client_authorization_id, error=result.message, )) exam_authorization.save() return True, messages
def verify_unit_response(zip_ext_file, min_lines): assert isinstance(zip_ext_file, zipfile.ZipExtFile) unit_output = gzip.decompress(zip_ext_file.read()) assert len(unit_output.decode().split('\n')) >= min_lines, 'Expect at least {} lines. Full unit output {}'.format( min_lines, unit_output)