我们从Python开源项目中,提取了以下33个代码示例,用于说明如何使用zipfile.error()。
def adjust(self): self.closeFP() if self.index > len(self.filenames): raise StopIteration elif self.index == len(self.filenames): self.iter = iter(self.custom) else: self.current = self.filenames[self.index] if os.path.splitext(self.current)[1].lower() == ".zip": try: _ = zipfile.ZipFile(self.current, 'r') except zipfile.error, ex: errMsg = "something appears to be wrong with " errMsg += "the file '%s' ('%s'). Please make " % (self.current, getSafeExString(ex)) errMsg += "sure that you haven't made any changes to it" raise SqlmapInstallationException, errMsg if len(_.namelist()) == 0: errMsg = "no file(s) inside '%s'" % self.current raise SqlmapDataException(errMsg) self.fp = _.open(_.namelist()[0]) else: self.fp = open(self.current, 'r') self.iter = iter(self.fp) self.index += 1
def next(self): retVal = None while True: self.counter += 1 try: retVal = self.iter.next().rstrip() except zipfile.error, ex: errMsg = "something appears to be wrong with " errMsg += "the file '%s' ('%s'). Please make " % (self.current, getSafeExString(ex)) errMsg += "sure that you haven't made any changes to it" raise SqlmapInstallationException, errMsg except StopIteration: self.adjust() retVal = self.iter.next().rstrip() if not self.proc_count or self.counter % self.proc_count == self.proc_id: break return retVal
def next(self): retVal = None while True: self.counter += 1 try: retVal = self.iter.next().rstrip() except zipfile.error, ex: errMsg = "something seems to be wrong with " errMsg += "the file '%s' ('%s'). Please make " % (self.current, ex) errMsg += "sure that you haven't made any changes to it" raise SqlmapInstallationException, errMsg except StopIteration: self.adjust() retVal = self.iter.next().rstrip() try: retVal = retVal.decode(UNICODE_ENCODING) except UnicodeDecodeError: continue if not self.proc_count or self.counter % self.proc_count == self.proc_id: break return retVal
def getunzipped(username, repo, thedir): """Downloads and unzips a zip file""" theurl = "https://github.com/" + username + "/" + repo + "/archive/master.zip" name = os.path.join(thedir, 'temp.zip') try: name = urllib.urlretrieve(theurl, name) name = os.path.join(thedir, 'temp.zip') except IOError as e: print("Can't retrieve %r to %r: %s" % (theurl, thedir, e)) return try: z = zipfile.ZipFile(name) except zipfile.error as e: print("Bad zipfile (from %r): %s" % (theurl, e)) return z.extractall(thedir) z.close() os.remove(name) copy_tree(os.path.join(thedir, repo + "-master"), thedir) shutil.rmtree(os.path.join(thedir, repo + "-master"))
def copyDirectoy(src, dst, ignorePattern=None): try: if ignorePattern: shutil.copytree(src, dst, ignore=shutil.ignore_patterns(*ignorePattern)) return shutil.copytree(src, dst) except OSError as exc: if exc.errno == errno.ENOTDIR: shutil.copy(src, dst) else: logger.error("Failed to copy directory {} to destination: {}".format(src, dst), exc_info=True) raise
def getunzipped(theurl, thedir, what): name = os.path.join("/","tmp",str(uuid())+".zip") try: # name, hdrs = urllib.urlretrieve(theurl, name) dlwithprogress(theurl,name, what) except IOError, e: print("Can't retrieve %r to %r: %s" % (theurl, thedir, e)) return try: z = zipfile.ZipFile(name) except zipfile.error, e: print("Bad zipfile (%r): %s" % (theurl, e)) return for n in z.namelist(): if n.endswith("/"): continue dest = os.path.join(thedir, n) destdir = os.path.dirname(dest) if not os.path.isdir(destdir): os.makedirs(destdir) try: f = open(dest, 'w') data = z.read(n) f.write(data) f.close() #print(n) except IOError, e: print("An error occured @ file %s: %s" % (n,e)) z.close() os.unlink(name) print("Successfully downloaded %s" % what)
def extract_zip(zip_name, exclude_term=None): """Extracts a zip file to its containing directory.""" zip_dir = os.path.dirname(os.path.abspath(zip_name)) try: with zipfile.ZipFile(zip_name) as z: # write each zipped file out if it isn't a directory files = [zip_file for zip_file in z.namelist() if not zip_file.endswith('/')] print('Extracting %i files from %r.' % (len(files), zip_name)) for zip_file in files: # remove any provided extra directory term from zip file if exclude_term: dest_file = zip_file.replace(exclude_term, '') else: dest_file = zip_file dest_file = os.path.normpath(os.path.join(zip_dir, dest_file)) dest_dir = os.path.dirname(dest_file) # make directory if it does not exist if not os.path.isdir(dest_dir): os.makedirs(dest_dir) # read file from zip, then write to new directory data = z.read(zip_file) with open(dest_file, 'wb') as f: f.write(encode_utf8(data)) except zipfile.error as e: print("Bad zipfile (%r): %s" % (zip_name, e)) raise e
def open(path, mode='r'): if 'w' in mode or 'a' in mode: raise IOError( _errno.EINVAL, path, "Write access not supported") elif 'r+' in mode: raise IOError( _errno.EINVAL, path, "Write access not supported") full_path = path path, rest = _locate(path) if not rest: return _open(path, mode) else: try: zf = _zipfile.ZipFile(path, 'r') except _zipfile.error: raise IOError( _errno.ENOENT, full_path, "No such file or directory") try: data = zf.read(rest) except (_zipfile.error, KeyError): zf.close() raise IOError( _errno.ENOENT, full_path, "No such file or directory") zf.close() if mode == 'rb': return _BytesIO(data) else: if _sys.version_info[0] == 3: data = data.decode('ascii') return _StringIO(data)
def isfile(path): full_path = path path, rest = _locate(path) if not rest: ok = _os.path.isfile(path) if ok: try: zf = _zipfile.ZipFile(path, 'r') return False except (_zipfile.error, IOError): return True return False zf = None try: zf = _zipfile.ZipFile(path, 'r') info = zf.getinfo(rest) zf.close() return True except (KeyError, _zipfile.error): if zf is not None: zf.close() # Check if this is a directory try: info = zf.getinfo(rest + '/') except KeyError: pass else: return False rest = rest + '/' for nm in zf.namelist(): if nm.startswith(rest): # Directory return False # No trace in zipfile raise IOError( _errno.ENOENT, full_path, "No such file or directory")
def _list_project(project_directory, ignore_filter, frontend): try: file_infos = [] for root, dirs, files in os.walk(project_directory): filtered_dirs = [] for d in dirs: info = _FileInfo(project_directory=project_directory, filename=os.path.join(root, d), is_directory=True) if ignore_filter(info): continue else: filtered_dirs.append(d) file_infos.append(info) # don't even recurse into filtered-out directories, mostly because recursing into # "envs" is very slow dirs[:] = filtered_dirs for f in files: info = _FileInfo(project_directory=project_directory, filename=os.path.join(root, f), is_directory=False) if not ignore_filter(info): file_infos.append(info) return file_infos except OSError as e: frontend.error("Could not list files in %s: %s." % (project_directory, str(e))) return None
def _parse_ignore_file(filename, frontend): patterns = [] try: with codecs.open(filename, 'r', 'utf-8') as f: for line in f: line = line.strip() # comments can't be at the end of the line, # you can only comment out an entire line. if line.startswith("#"): continue # you can backslash-escape a hash at the start if line.startswith("\#"): line = line[1:] if line != '': patterns.append(_FilePattern(pattern=line)) return patterns except (OSError, IOError) as e: # it's ok for .projectignore to be absent, but not OK # to fail to read it if present. if e.errno == errno.ENOENT: # return default patterns anyway return patterns else: frontend.error("Failed to read %s: %s" % (filename, str(e))) return None
def _git_ignored_files(project_directory, frontend): if not os.path.exists(os.path.join(project_directory, ".git")): return [] # It is pretty involved to parse .gitignore correctly. Lots of # little syntax rules that don't quite match python's fnmatch, # there can be multiple .gitignore, and there are also things # in the git config file that affect what's ignored. So we # let git do this itself. If the project has a `.git` we assume # the user is using git. # --other means show untracked (not added) files # --ignored means show ignored files # --exclude-standard means use the usual .gitignore and other configuration # --directory means output "node_modules/" if it's ignored, not 100000 JS files try: output = logged_subprocess.check_output( ['git', 'ls-files', '--others', '--ignored', '--exclude-standard', '--directory'], cwd=project_directory) # for whatever reason, git doesn't include the ".git" in the ignore list return [".git"] + output.decode('utf-8').splitlines() except subprocess.CalledProcessError as e: message = e.output.decode('utf-8').replace("\n", " ") frontend.error("'git ls-files' failed to list ignored files: %s." % (message)) return None except OSError as e: frontend.error("Failed to run 'git ls-files'; %s" % str(e)) return None
def zipwalk(zfilename): """Zip file tree generator. For each file entry in a zip archive, this yields a two tuple of the zip information and the data of the file as a StringIO object. zipinfo, filedata zipinfo is an instance of zipfile.ZipInfo class which gives information of the file contained in the zip archive. filedata is a StringIO instance representing the actual file data. If the file again a zip file, the generator extracts the contents of the zip file and walks them. Inspired by os.walk . """ tempdir = os.environ.get('TEMP', os.environ.get('TMP', os.environ.get('TMPDIR', '/tmp'))) try: z = zipfile.ZipFile(zfilename, "r") for info in z.infolist(): fname = info.filename data = z.read(fname) if fname.endswith(".zip"): tmpfpath = os.path.join(tempdir, os.path.basename(fname)) try: open(tmpfpath, 'w+b').write(data) except (IOError, OSError): logger.error("Failed to write file, {}".format(tmpfpath), exc_info=True) if zipfile.is_zipfile(tmpfpath): try: for x in zipwalk(tmpfpath): yield x except Exception: logger.error("Failed", exc_info=True) raise try: os.remove(tmpfpath) except: pass else: yield (info, cStringIO.StringIO(data)) except (RuntimeError, zipfile.error): raise
def _unzip_iter(filename, root, verbose=True): if verbose: sys.stdout.write('Unzipping %s' % os.path.split(filename)[1]) sys.stdout.flush() try: zf = zipfile.ZipFile(filename) except zipfile.error as e: yield ErrorMessage(filename, 'Error with downloaded zip file') return except Exception as e: yield ErrorMessage(filename, e) return # Get lists of directories & files namelist = zf.namelist() dirlist = set() for x in namelist: if x.endswith('/'): dirlist.add(x) else: dirlist.add(x.rsplit('/',1)[0] + '/') filelist = [x for x in namelist if not x.endswith('/')] # Create the target directory if it doesn't exist if not os.path.exists(root): os.mkdir(root) # Create the directory structure for dirname in sorted(dirlist): pieces = dirname[:-1].split('/') for i in range(len(pieces)): dirpath = os.path.join(root, *pieces[:i+1]) if not os.path.exists(dirpath): os.mkdir(dirpath) # Extract files. for i, filename in enumerate(filelist): filepath = os.path.join(root, *filename.split('/')) with open(filepath, 'wb') as outfile: try: contents = zf.read(filename) except Exception as e: yield ErrorMessage(filename, e) return outfile.write(contents) if verbose and (i*10/len(filelist) > (i-1)*10/len(filelist)): sys.stdout.write('.') sys.stdout.flush() if verbose: print() ###################################################################### # Index Builder ###################################################################### # This may move to a different file sometime.
def listdir(path): full_path = path path, rest = _locate(path) if not rest and not _os.path.isfile(path): return _os.listdir(path) else: try: zf = _zipfile.ZipFile(path, 'r') except _zipfile.error: raise IOError( _errno.ENOENT, full_path, "No such file or directory") result = set() seen = False try: for nm in zf.namelist(): if rest is None: seen = True value = nm.split('/')[0] if value: result.add(value) elif nm.startswith(rest): if nm == rest: seen = True value = '' pass elif nm[len(rest)] == '/': seen = True value = nm[len(rest)+1:].split('/')[0] else: value = None if value: result.add(value) except _zipfile.error: zf.close() raise IOError( _errno.ENOENT, full_path, "No such file or directory") zf.close() if not seen: raise IOError( _errno.ENOENT, full_path, "No such file or directory") return list(result)
def isdir(path): full_path = path path, rest = _locate(path) if not rest: ok = _os.path.isdir(path) if not ok: try: zf = _zipfile.ZipFile(path, 'r') except (_zipfile.error, IOError): return False return True return True zf = None try: try: zf = _zipfile.ZipFile(path) except _zipfile.error: raise IOError( _errno.ENOENT, full_path, "No such file or directory") try: info = zf.getinfo(rest) except KeyError: pass else: # File found return False rest = rest + '/' try: info = zf.getinfo(rest) except KeyError: pass else: # Directory entry found return True for nm in zf.namelist(): if nm.startswith(rest): return True raise IOError( _errno.ENOENT, full_path, "No such file or directory") finally: if zf is not None: zf.close()
def islink(path): full_path = path path, rest = _locate(path) if not rest: return _os.path.islink(path) try: zf = _zipfile.ZipFile(path) except _zipfile.error: raise IOError( _errno.ENOENT, full_path, "No such file or directory") try: try: info = zf.getinfo(rest) except KeyError: pass else: # File return False rest += '/' try: info = zf.getinfo(rest) except KeyError: pass else: # Directory return False for nm in zf.namelist(): if nm.startswith(rest): # Directory without listing return False raise IOError( _errno.ENOENT, full_path, "No such file or directory") finally: zf.close()
def _get_source_and_dest_files(archive_path, list_files, project_dir, parent_dir, errors): names = list_files(archive_path) if len(names) == 0: errors.append("A valid project archive must contain at least one file.") return None items = [(name, prefix, remainder) for (name, (prefix, remainder)) in zip(names, [_split_after_first(name) for name in names])] candidate_prefix = items[0][1] if candidate_prefix == "..": errors.append("Archive contains relative path '%s' which is not allowed." % (items[0][0])) return None if project_dir is None: project_dir = candidate_prefix if os.path.isabs(project_dir): assert parent_dir is None canonical_project_dir = os.path.realpath(os.path.abspath(project_dir)) canonical_parent_dir = os.path.dirname(canonical_project_dir) else: if parent_dir is None: parent_dir = os.getcwd() canonical_parent_dir = os.path.realpath(os.path.abspath(parent_dir)) canonical_project_dir = os.path.realpath(os.path.abspath(os.path.join(canonical_parent_dir, project_dir))) # candidate_prefix is untrusted and may try to send us outside of parent_dir. # this assertion is because of the check for candidate_prefix == ".." above. assert canonical_project_dir.startswith(canonical_parent_dir) if os.path.exists(canonical_project_dir): # This is an error to ensure we always do a "fresh" unpack # without worrying about overwriting stuff. errors.append("Directory '%s' already exists." % canonical_project_dir) return None src_and_dest = [] for (name, prefix, remainder) in items: if prefix != candidate_prefix: errors.append(("A valid project archive contains only one project directory " + "with all files inside that directory. '%s' is outside '%s'.") % (name, candidate_prefix)) return None if remainder is None: # this is an entry that's either the prefix dir itself, # or a file at the root not in any dir continue dest = os.path.realpath(os.path.abspath(os.path.join(canonical_project_dir, remainder))) # this check deals with ".." in the name for example if not dest.startswith(canonical_project_dir): errors.append("Archive entry '%s' would end up at '%s' which is outside '%s'." % (name, dest, canonical_project_dir)) return None src_and_dest.append((name, dest)) return (canonical_project_dir, src_and_dest)
def _unzip_iter(filename, root, verbose=True): if verbose: sys.stdout.write('Unzipping %s' % os.path.split(filename)[1]) sys.stdout.flush() try: zf = zipfile.ZipFile(filename) except zipfile.error as e: yield ErrorMessage(filename, 'Error with downloaded zip file') return except Exception as e: yield ErrorMessage(filename, e) return # Get lists of directories & files namelist = zf.namelist() dirlist = set() for x in namelist: if x.endswith('/'): dirlist.add(x) else: dirlist.add(x.rsplit('/',1)[0] + '/') filelist = [x for x in namelist if not x.endswith('/')] # Create the target directory if it doesn't exist if not os.path.exists(root): os.mkdir(root) # Create the directory structure for dirname in sorted(dirlist): pieces = dirname[:-1].split('/') for i in range(len(pieces)): dirpath = os.path.join(root, *pieces[:i+1]) if not os.path.exists(dirpath): os.mkdir(dirpath) # Extract files. for i, filename in enumerate(filelist): filepath = os.path.join(root, *filename.split('/')) try: with open(filepath, 'wb') as dstfile, zf.open(filename) as srcfile: shutil.copyfileobj(srcfile, dstfile) except Exception as e: yield ErrorMessage(filename, e) return if verbose and (i*10/len(filelist) > (i-1)*10/len(filelist)): sys.stdout.write('.') sys.stdout.flush() if verbose: print() ###################################################################### # Index Builder ###################################################################### # This may move to a different file sometime.
def _get_source_and_dest_files(archive_path, list_files, project_dir, parent_dir, frontend): names = list_files(archive_path) if len(names) == 0: frontend.error("A valid project archive must contain at least one file.") return None items = [(name, prefix, remainder) for (name, (prefix, remainder)) in zip(names, [_split_after_first(name) for name in names])] candidate_prefix = items[0][1] if candidate_prefix == "..": frontend.error("Archive contains relative path '%s' which is not allowed." % (items[0][0])) return None if project_dir is None: project_dir = candidate_prefix if os.path.isabs(project_dir): assert parent_dir is None canonical_project_dir = os.path.realpath(os.path.abspath(project_dir)) canonical_parent_dir = os.path.dirname(canonical_project_dir) else: if parent_dir is None: parent_dir = os.getcwd() canonical_parent_dir = os.path.realpath(os.path.abspath(parent_dir)) canonical_project_dir = os.path.realpath(os.path.abspath(os.path.join(canonical_parent_dir, project_dir))) # candidate_prefix is untrusted and may try to send us outside of parent_dir. # this assertion is because of the check for candidate_prefix == ".." above. assert canonical_project_dir.startswith(canonical_parent_dir) if os.path.exists(canonical_project_dir): # This is an error to ensure we always do a "fresh" unpack # without worrying about overwriting stuff. frontend.error("Directory '%s' already exists." % canonical_project_dir) return None src_and_dest = [] for (name, prefix, remainder) in items: if prefix != candidate_prefix: frontend.error(("A valid project archive contains only one project directory " + "with all files inside that directory. '%s' is outside '%s'.") % (name, candidate_prefix)) return None if remainder is None: # this is an entry that's either the prefix dir itself, # or a file at the root not in any dir continue dest = os.path.realpath(os.path.abspath(os.path.join(canonical_project_dir, remainder))) # this check deals with ".." in the name for example if not dest.startswith(canonical_project_dir): frontend.error("Archive entry '%s' would end up at '%s' which is outside '%s'." % (name, dest, canonical_project_dir)) return None src_and_dest.append((name, dest)) return (canonical_project_dir, src_and_dest)