Python os 模块,umask() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.umask()

项目:mos-horizon    作者:Mirantis    | 项目源码 | 文件源码
def generate_or_read_from_file(key_file='.secret_key', key_length=64):
    """Multiprocess-safe secret key file generator.

    Useful to replace the default (and thus unsafe) SECRET_KEY in settings.py
    upon first start. Save to use, i.e. when multiple Python interpreters
    serve the dashboard Django application (e.g. in a mod_wsgi + daemonized
    environment).  Also checks if file permissions are set correctly and
    throws an exception if not.
    """
    abspath = os.path.abspath(key_file)
    lock = lockutils.external_lock(key_file + ".lock",
                                   lock_path=os.path.dirname(abspath))
    with lock:
        if not os.path.exists(key_file):
            key = generate_key(key_length)
            old_umask = os.umask(0o177)  # Use '0600' file permissions
            with open(key_file, 'w') as f:
                f.write(key)
            os.umask(old_umask)
        else:
            if (os.stat(key_file).st_mode & 0o777) != 0o600:
                raise FilePermissionError("Insecure key file permissions!")
            with open(key_file, 'r') as f:
                key = f.readline()
        return key
项目:zoocore    作者:dsparrow27    | 项目源码 | 文件源码
def clearUnMasked(func):
    """Decorator which clears the umask for a method.
    The umask is a permissions mask that gets applied
    whenever new files or folders are created. For I/O methods
    that have a permissions parameter, it is important that the
    umask is cleared prior to execution, otherwise the default
    umask may alter the resulting permissions

    :type func: function
    """

    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        # set umask to zero, store old umask
        oldMask = os.umask(0)
        try:
            # execute method payload
            return func(*args, **kwargs)
        finally:
            # set mask back to previous value
            os.umask(oldMask)

    return wrapper
项目:Static-UPnP    作者:nigelb    | 项目源码 | 文件源码
def drop_privileges(self, uid_name, gid_name):
    if os.getuid() != 0:
        # We're not root so, like, whatever dude
        self.logger.info("Not running as root. Cannot drop permissions.")
        return

    # Get the uid/gid from the name
    running_uid = pwd.getpwnam(uid_name).pw_uid
    running_gid = grp.getgrnam(gid_name).gr_gid

    # Remove group privileges
    os.setgroups([])

    # Try setting the new uid/gid
    os.setgid(running_gid)
    os.setuid(running_uid)

    # Ensure a very conservative umask
    old_umask = os.umask(0o077)
    self.logger.info("Changed permissions to: %s: %i, %s, %i"%(uid_name, running_uid, gid_name, running_gid))
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def daemonize():
    # See http://www.erlenstar.demon.co.uk/unix/faq_toc.html#TOC16
    if os.fork():   # launch child and...
        os._exit(0) # kill off parent
    os.setsid()
    if os.fork():   # launch child and...
        os._exit(0) # kill off parent again.
    os.umask(077)
    null=os.open('/dev/null', os.O_RDWR)
    for i in range(3):
        try:
            os.dup2(null, i)
        except OSError, e:
            if e.errno != errno.EBADF:
                raise
    os.close(null)
项目:OSPTF    作者:xSploited    | 项目源码 | 文件源码
def generate(self):
        return textwrap.dedent("""
        import pupy, os

        if os.name == 'posix':
            pupy.infos['daemonize']=True
            if os.fork():   # launch child and...
                os._exit(0) # kill off parent
            os.setsid()
            if os.fork():   # launch child and...
                os._exit(0) # kill off parent again.
            os.umask(022)   # Don't allow others to write
            null=os.open('/dev/null', os.O_RDWR)
            for i in range(3):
                try:
                    os.dup2(null, i)
                except OSError, e:
                    if e.errno != errno.EBADF:
                        raise
            os.close(null)
        """)
项目:TCP-IP    作者:JackZ0    | 项目源码 | 文件源码
def _perform_single(self, achall):
        response, validation = achall.response_and_validation()

        root_path = self.full_roots[achall.domain]
        validation_path = self._get_validation_path(root_path, achall)
        logger.debug("Attempting to save validation to %s", validation_path)

        # Change permissions to be world-readable, owner-writable (GH #1795)
        old_umask = os.umask(0o022)

        try:
            with open(validation_path, "wb") as validation_file:
                validation_file.write(validation.encode())
        finally:
            os.umask(old_umask)

        self.performed[root_path].add(achall)

        return response
项目:bpy_lambda    作者:bcongdon    | 项目源码 | 文件源码
def _create_default_file():
    """Creates the default profile file, returning its contents."""
    import json

    profiles_default_data = {
        'active_profile': None,
        'profiles': {}
    }

    os.makedirs(profiles_path, exist_ok=True)

    # Populate the file, ensuring that its permissions are restrictive enough.
    old_umask = os.umask(0o077)
    try:
        with open(profiles_file, 'w', encoding='utf8') as outfile:
            json.dump(profiles_default_data, outfile)
    finally:
        os.umask(old_umask)

    return profiles_default_data
项目:SameKeyProxy    作者:xzhou    | 项目源码 | 文件源码
def become_daemon(self, root_dir='/'):
        if os.fork() != 0:  # launch child and ...
            os._exit(0)  # kill off parent
        os.setsid()
        os.chdir(root_dir)
        os.umask(0)
        if os.fork() != 0: # fork again so we are not a session leader
            os._exit(0)
        sys.stdin.close()
        sys.__stdin__ = sys.stdin
        sys.stdout.close()
        sys.stdout = sys.__stdout__ = _NullDevice()
        sys.stderr.close()
        sys.stderr = sys.__stderr__ = _NullDevice()
        for fd in range(1024):
            try:
                os.close(fd)
            except OSError:
                pass
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def __init__(self, cfg):
        old_umask = os.umask(cfg.umask)
        fdir = cfg.worker_tmp_dir
        if fdir and not os.path.isdir(fdir):
            raise RuntimeError("%s doesn't exist. Can't create workertmp." % fdir)
        fd, name = tempfile.mkstemp(prefix="wgunicorn-", dir=fdir)

        # allows the process to write to the file
        util.chown(name, cfg.uid, cfg.gid)
        os.umask(old_umask)

        # unlink the file so we don't leak tempory files
        try:
            if not IS_CYGWIN:
                util.unlink(name)
            self._tmp = os.fdopen(fd, 'w+b', 1)
        except:
            os.close(fd)
            raise

        self.spinner = 0
项目:pupy    作者:ru-faraon    | 项目源码 | 文件源码
def generate(self):
        return textwrap.dedent("""
        import pupy, os

        if os.name == 'posix':
            pupy.infos['daemonize']=True
            if os.fork():   # launch child and...
                os._exit(0) # kill off parent
            os.setsid()
            if os.fork():   # launch child and...
                os._exit(0) # kill off parent again.
            os.umask(022)   # Don't allow others to write
            null=os.open('/dev/null', os.O_RDWR)
            for i in range(3):
                try:
                    os.dup2(null, i)
                except OSError, e:
                    if e.errno != errno.EBADF:
                        raise
            os.close(null)
        """)
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def get_process_umask():
    result = os.umask(0o22)
    os.umask(result)
    return result
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def current_umask():
    """Get the current umask which involves having to set it temporarily."""
    mask = os.umask(0)
    os.umask(mask)
    return mask
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def unzip_file(filename, location, flatten=True):
    """
    Unzip the file (with path `filename`) to the destination `location`.  All
    files are written based on system defaults and umask (i.e. permissions are
    not preserved), except that regular file members with any execute
    permissions (user, group, or world) have "chmod +x" applied after being
    written. Note that for windows, any execute changes using os.chmod are
    no-ops per the python docs.
    """
    ensure_dir(location)
    zipfp = open(filename, 'rb')
    try:
        zip = zipfile.ZipFile(zipfp, allowZip64=True)
        leading = has_leading_dir(zip.namelist()) and flatten
        for info in zip.infolist():
            name = info.filename
            data = zip.read(name)
            fn = name
            if leading:
                fn = split_leading_dir(name)[1]
            fn = os.path.join(location, fn)
            dir = os.path.dirname(fn)
            if fn.endswith('/') or fn.endswith('\\'):
                # A directory
                ensure_dir(fn)
            else:
                ensure_dir(dir)
                fp = open(fn, 'wb')
                try:
                    fp.write(data)
                finally:
                    fp.close()
                    mode = info.external_attr >> 16
                    # if mode and regular file and any execute permissions for
                    # user/group/world?
                    if mode and stat.S_ISREG(mode) and mode & 0o111:
                        # make dest file have execute for user/group/world
                        # (chmod +x) no-op on windows per python docs
                        os.chmod(fn, (0o777 - current_umask() | 0o111))
    finally:
        zipfp.close()
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def current_umask():
    tmp = os.umask(0o022)
    os.umask(tmp)
    return tmp
项目:IotCenter    作者:panjanek    | 项目源码 | 文件源码
def daemonize(self):
        try:
            pid = os.fork()
            if pid > 0:
                sys.exit(0)
        except OSError, e:
            sys.stderr.write("fork #1 failed: %d (%s)\n" % (e.errno, e.strerror))
            sys.exit(1)

        os.chdir("/")
        os.setsid()
        os.umask(0)

        try:
            pid = os.fork()
            if pid > 0:
                sys.exit(0)
        except OSError, e:
            sys.stderr.write("fork #2 failed: %d (%s)\n" % (e.errno, e.strerror))
            sys.exit(1)

        sys.stdout.flush()
        sys.stderr.flush()
        si = file(self.stdin, 'r')
        so = file(self.stdout, 'a+')
        se = file(self.stderr, 'a+', 0)
        os.dup2(si.fileno(), sys.stdin.fileno())
        os.dup2(so.fileno(), sys.stdout.fileno())
        os.dup2(se.fileno(), sys.stderr.fileno())

        atexit.register(self.delpid)
        pid = str(os.getpid())
        file(self.pidfile,'w+').write("%s\n" % pid)
项目:amadash    作者:ipartola    | 项目源码 | 文件源码
def daemonize(double_fork=True):
    '''Puts process in the background using usual UNIX best practices.'''

    try:
        os.umask(0o22)
    except Exception as e:
        raise Exception("Unable to change file creation mask: %s" % e)

    os.chdir('/')

    # First fork
    if double_fork:
        try:
            pid = os.fork()
            if pid > 0:
                os._exit(0)
        except OSError as e:
            raise Exception("Error on first fork: [%d] %s" % (e.errno, e.strerr,))

    os.setsid()

    # Second fork
    try:
        pid = os.fork()
        if pid > 0:
            os._exit(0)
    except OSError as e:
        raise Exception("Error on second fork: [%d] %s" % (e.errno, e.strerr,))

    close_open_files()
    os.dup2(os.open(os.devnull, os.O_RDWR), sys.stdin.fileno())
    os.dup2(os.open(os.devnull, os.O_RDWR), sys.stdout.fileno())
    os.dup2(os.open(os.devnull, os.O_RDWR), sys.stderr.fileno())
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def get_process_umask():
    result = os.umask(0o22)
    os.umask(result)
    return result
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def current_umask():
    """Get the current umask which involves having to set it temporarily."""
    mask = os.umask(0)
    os.umask(mask)
    return mask
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def unzip_file(filename, location, flatten=True):
    """
    Unzip the file (with path `filename`) to the destination `location`.  All
    files are written based on system defaults and umask (i.e. permissions are
    not preserved), except that regular file members with any execute
    permissions (user, group, or world) have "chmod +x" applied after being
    written. Note that for windows, any execute changes using os.chmod are
    no-ops per the python docs.
    """
    ensure_dir(location)
    zipfp = open(filename, 'rb')
    try:
        zip = zipfile.ZipFile(zipfp, allowZip64=True)
        leading = has_leading_dir(zip.namelist()) and flatten
        for info in zip.infolist():
            name = info.filename
            data = zip.read(name)
            fn = name
            if leading:
                fn = split_leading_dir(name)[1]
            fn = os.path.join(location, fn)
            dir = os.path.dirname(fn)
            if fn.endswith('/') or fn.endswith('\\'):
                # A directory
                ensure_dir(fn)
            else:
                ensure_dir(dir)
                fp = open(fn, 'wb')
                try:
                    fp.write(data)
                finally:
                    fp.close()
                    mode = info.external_attr >> 16
                    # if mode and regular file and any execute permissions for
                    # user/group/world?
                    if mode and stat.S_ISREG(mode) and mode & 0o111:
                        # make dest file have execute for user/group/world
                        # (chmod +x) no-op on windows per python docs
                        os.chmod(fn, (0o777 - current_umask() | 0o111))
    finally:
        zipfp.close()
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def current_umask():
    tmp = os.umask(0o022)
    os.umask(tmp)
    return tmp
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def open(file, flag=None, mode=0666):
    """Open the database file, filename, and return corresponding object.

    The flag argument, used to control how the database is opened in the
    other DBM implementations, is ignored in the dumbdbm module; the
    database is always opened for update, and will be created if it does
    not exist.

    The optional mode argument is the UNIX mode of the file, used only when
    the database has to be created.  It defaults to octal code 0666 (and
    will be modified by the prevailing umask).

    """
    # flag argument is currently ignored

    # Modify mode depending on the umask
    try:
        um = _os.umask(0)
        _os.umask(um)
    except AttributeError:
        pass
    else:
        # Turn off any bits that are set in the umask
        mode = mode & (~um)

    return _Database(file, mode)
项目:oscars2016    作者:0x0ece    | 项目源码 | 文件源码
def _create_file_if_needed(self):
        """Create an empty file if necessary.

        This method will not initialize the file. Instead it implements a
        simple version of "touch" to ensure the file has been created.
        """
        if not os.path.exists(self._file.filename()):
            old_umask = os.umask(0o177)
            try:
                open(self._file.filename(), 'a+b').close()
            finally:
                os.umask(old_umask)
项目:oscars2016    作者:0x0ece    | 项目源码 | 文件源码
def _create_file_if_needed(self):
        """Create an empty file if necessary.

        This method will not initialize the file. Instead it implements a
        simple version of "touch" to ensure the file has been created.
        """
        if not os.path.exists(self._filename):
            old_umask = os.umask(0o177)
            try:
                open(self._filename, 'a+b').close()
            finally:
                os.umask(old_umask)
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def setUp(self):
        self.umask = os.umask(0o022)
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def tearDown(self):
        os.umask(self.umask)
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def _setupSocket(self):
        """Creates and binds the socket for communication with the server."""
        oldUmask = None
        if type(self._bindAddress) is str:
            # Unix socket
            sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
            try:
                os.unlink(self._bindAddress)
            except OSError:
                pass
            if self._umask is not None:
                oldUmask = os.umask(self._umask)
        else:
            # INET socket
            assert type(self._bindAddress) is tuple
            assert len(self._bindAddress) == 2
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

        sock.bind(self._bindAddress)
        sock.listen(socket.SOMAXCONN)

        if oldUmask is not None:
            os.umask(oldUmask)

        return sock
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def get_process_umask():
    result = os.umask(0o22)
    os.umask(result)
    return result
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def current_umask():
    """Get the current umask which involves having to set it temporarily."""
    mask = os.umask(0)
    os.umask(mask)
    return mask
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def current_umask():
    tmp = os.umask(0x12)    # 022
    os.umask(tmp)
    return tmp
项目:pip-update-requirements    作者:alanhamlett    | 项目源码 | 文件源码
def get_process_umask():
    result = os.umask(0o22)
    os.umask(result)
    return result
项目:pip-update-requirements    作者:alanhamlett    | 项目源码 | 文件源码
def current_umask():
    """Get the current umask which involves having to set it temporarily."""
    mask = os.umask(0)
    os.umask(mask)
    return mask
项目:pip-update-requirements    作者:alanhamlett    | 项目源码 | 文件源码
def unzip_file(filename, location, flatten=True):
    """
    Unzip the file (with path `filename`) to the destination `location`.  All
    files are written based on system defaults and umask (i.e. permissions are
    not preserved), except that regular file members with any execute
    permissions (user, group, or world) have "chmod +x" applied after being
    written. Note that for windows, any execute changes using os.chmod are
    no-ops per the python docs.
    """
    ensure_dir(location)
    zipfp = open(filename, 'rb')
    try:
        zip = zipfile.ZipFile(zipfp, allowZip64=True)
        leading = has_leading_dir(zip.namelist()) and flatten
        for info in zip.infolist():
            name = info.filename
            data = zip.read(name)
            fn = name
            if leading:
                fn = split_leading_dir(name)[1]
            fn = os.path.join(location, fn)
            dir = os.path.dirname(fn)
            if fn.endswith('/') or fn.endswith('\\'):
                # A directory
                ensure_dir(fn)
            else:
                ensure_dir(dir)
                fp = open(fn, 'wb')
                try:
                    fp.write(data)
                finally:
                    fp.close()
                    mode = info.external_attr >> 16
                    # if mode and regular file and any execute permissions for
                    # user/group/world?
                    if mode and stat.S_ISREG(mode) and mode & 0o111:
                        # make dest file have execute for user/group/world
                        # (chmod +x) no-op on windows per python docs
                        os.chmod(fn, (0o777 - current_umask() | 0o111))
    finally:
        zipfp.close()
项目:CaptureBaits    作者:WhosMyName    | 项目源码 | 文件源码
def main():
    os.umask(0)
    create_wishlist()
    create_baitlist()
    create_oneclick()
    names_from_wishlist()
    while True:
        names_from_wishlist()
        time.sleep(60)
        print_modellist()
项目:CaptureBaits    作者:WhosMyName    | 项目源码 | 文件源码
def main():
    os.umask(0)
    create_wgetrc()
    create_wishlist()
    names_from_wishlist()
    while True:
        names_from_wishlist()
        print("Currently following Models are being baited:")
        print_modellist()
        time.sleep(60)
项目:CaptureBaits    作者:WhosMyName    | 项目源码 | 文件源码
def main():
    os.umask(0)
    create_wishlist()
    create_baitlist()
    create_oneclick()
    names_from_wishlist()
    while True:
        names_from_wishlist()
        time.sleep(60)
        print_modellist()
项目:salt-toaster    作者:openSUSE    | 项目源码 | 文件源码
def daemonize(stdin="/dev/null", stdout="/dev/null", stderr="/dev/null"):
    '''
    Daemonize current script
    '''
    try: 
        pid = os.fork() 
        if pid > 0:
            sys.exit(0)
    except OSError, e: 
        sys.stderr.write ("fork #1 failed: (%d) %s\n" % (e.errno, e.strerror) )
        sys.exit(1)

    os.chdir("/") 
    os.umask(0) 
    os.setsid() 

    try: 
        pid = os.fork() 
        if pid > 0:
            sys.exit(0)
    except OSError, e: 
        sys.stderr.write ("fork #2 failed: (%d) %s\n" % (e.errno, e.strerror) )
        sys.exit(1)

    stdin_par = os.path.dirname(stdin)
    stdout_par = os.path.dirname(stdout)
    stderr_par = os.path.dirname(stderr)
    if not stdin_par:
        os.path.makedirs(stdin_par)
    if not stdout_par:
        os.path.makedirs(stdout_par)
    if not stderr_par:
        os.path.makedirs(stderr_par)

    si = open(stdin, 'r')
    so = open(stdout, 'a+')
    se = open(stderr, 'a+', 0)
    os.dup2(si.fileno(), sys.stdin.fileno())
    os.dup2(so.fileno(), sys.stdout.fileno())
    os.dup2(se.fileno(), sys.stderr.fileno())
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def get_process_umask():
    result = os.umask(0o22)
    os.umask(result)
    return result
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def current_umask():
    """Get the current umask which involves having to set it temporarily."""
    mask = os.umask(0)
    os.umask(mask)
    return mask
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def unzip_file(filename, location, flatten=True):
    """
    Unzip the file (with path `filename`) to the destination `location`.  All
    files are written based on system defaults and umask (i.e. permissions are
    not preserved), except that regular file members with any execute
    permissions (user, group, or world) have "chmod +x" applied after being
    written. Note that for windows, any execute changes using os.chmod are
    no-ops per the python docs.
    """
    ensure_dir(location)
    zipfp = open(filename, 'rb')
    try:
        zip = zipfile.ZipFile(zipfp, allowZip64=True)
        leading = has_leading_dir(zip.namelist()) and flatten
        for info in zip.infolist():
            name = info.filename
            data = zip.read(name)
            fn = name
            if leading:
                fn = split_leading_dir(name)[1]
            fn = os.path.join(location, fn)
            dir = os.path.dirname(fn)
            if fn.endswith('/') or fn.endswith('\\'):
                # A directory
                ensure_dir(fn)
            else:
                ensure_dir(dir)
                fp = open(fn, 'wb')
                try:
                    fp.write(data)
                finally:
                    fp.close()
                    mode = info.external_attr >> 16
                    # if mode and regular file and any execute permissions for
                    # user/group/world?
                    if mode and stat.S_ISREG(mode) and mode & 0o111:
                        # make dest file have execute for user/group/world
                        # (chmod +x) no-op on windows per python docs
                        os.chmod(fn, (0o777 - current_umask() | 0o111))
    finally:
        zipfp.close()
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def current_umask():
    tmp = os.umask(0o022)
    os.umask(tmp)
    return tmp
项目:sndlatr    作者:Schibum    | 项目源码 | 文件源码
def _create_file_if_needed(self):
    """Create an empty file if necessary.

    This method will not initialize the file. Instead it implements a
    simple version of "touch" to ensure the file has been created.
    """
    if not os.path.exists(self._filename):
      old_umask = os.umask(0177)
      try:
        open(self._filename, 'a+b').close()
      finally:
        os.umask(old_umask)
项目:sndlatr    作者:Schibum    | 项目源码 | 文件源码
def _create_file_if_needed(self):
    """Create an empty file if necessary.

    This method will not initialize the file. Instead it implements a
    simple version of "touch" to ensure the file has been created.
    """
    if not os.path.exists(self._file.filename()):
      old_umask = os.umask(0177)
      try:
        open(self._file.filename(), 'a+b').close()
      finally:
        os.umask(old_umask)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def get_process_umask():
    result = os.umask(0o22)
    os.umask(result)
    return result
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def current_umask():
    """Get the current umask which involves having to set it temporarily."""
    mask = os.umask(0)
    os.umask(mask)
    return mask
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def unzip_file(filename, location, flatten=True):
    """
    Unzip the file (with path `filename`) to the destination `location`.  All
    files are written based on system defaults and umask (i.e. permissions are
    not preserved), except that regular file members with any execute
    permissions (user, group, or world) have "chmod +x" applied after being
    written. Note that for windows, any execute changes using os.chmod are
    no-ops per the python docs.
    """
    ensure_dir(location)
    zipfp = open(filename, 'rb')
    try:
        zip = zipfile.ZipFile(zipfp, allowZip64=True)
        leading = has_leading_dir(zip.namelist()) and flatten
        for info in zip.infolist():
            name = info.filename
            data = zip.read(name)
            fn = name
            if leading:
                fn = split_leading_dir(name)[1]
            fn = os.path.join(location, fn)
            dir = os.path.dirname(fn)
            if fn.endswith('/') or fn.endswith('\\'):
                # A directory
                ensure_dir(fn)
            else:
                ensure_dir(dir)
                fp = open(fn, 'wb')
                try:
                    fp.write(data)
                finally:
                    fp.close()
                    mode = info.external_attr >> 16
                    # if mode and regular file and any execute permissions for
                    # user/group/world?
                    if mode and stat.S_ISREG(mode) and mode & 0o111:
                        # make dest file have execute for user/group/world
                        # (chmod +x) no-op on windows per python docs
                        os.chmod(fn, (0o777 - current_umask() | 0o111))
    finally:
        zipfp.close()
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def current_umask():
    tmp = os.umask(0o022)
    os.umask(tmp)
    return tmp
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def get_process_umask():
    result = os.umask(0o22)
    os.umask(result)
    return result
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def unzip_file(filename, location, flatten=True):
    """
    Unzip the file (with path `filename`) to the destination `location`.  All
    files are written based on system defaults and umask (i.e. permissions are
    not preserved), except that regular file members with any execute
    permissions (user, group, or world) have "chmod +x" applied after being
    written. Note that for windows, any execute changes using os.chmod are
    no-ops per the python docs.
    """
    ensure_dir(location)
    zipfp = open(filename, 'rb')
    try:
        zip = zipfile.ZipFile(zipfp, allowZip64=True)
        leading = has_leading_dir(zip.namelist()) and flatten
        for info in zip.infolist():
            name = info.filename
            data = zip.read(name)
            fn = name
            if leading:
                fn = split_leading_dir(name)[1]
            fn = os.path.join(location, fn)
            dir = os.path.dirname(fn)
            if fn.endswith('/') or fn.endswith('\\'):
                # A directory
                ensure_dir(fn)
            else:
                ensure_dir(dir)
                fp = open(fn, 'wb')
                try:
                    fp.write(data)
                finally:
                    fp.close()
                    mode = info.external_attr >> 16
                    # if mode and regular file and any execute permissions for
                    # user/group/world?
                    if mode and stat.S_ISREG(mode) and mode & 0o111:
                        # make dest file have execute for user/group/world
                        # (chmod +x) no-op on windows per python docs
                        os.chmod(fn, (0o777 - current_umask() | 0o111))
    finally:
        zipfp.close()
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def current_umask():
    tmp = os.umask(0o022)
    os.umask(tmp)
    return tmp
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def get_process_umask():
    result = os.umask(0o22)
    os.umask(result)
    return result