我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.umask()。
def generate_or_read_from_file(key_file='.secret_key', key_length=64): """Multiprocess-safe secret key file generator. Useful to replace the default (and thus unsafe) SECRET_KEY in settings.py upon first start. Save to use, i.e. when multiple Python interpreters serve the dashboard Django application (e.g. in a mod_wsgi + daemonized environment). Also checks if file permissions are set correctly and throws an exception if not. """ abspath = os.path.abspath(key_file) lock = lockutils.external_lock(key_file + ".lock", lock_path=os.path.dirname(abspath)) with lock: if not os.path.exists(key_file): key = generate_key(key_length) old_umask = os.umask(0o177) # Use '0600' file permissions with open(key_file, 'w') as f: f.write(key) os.umask(old_umask) else: if (os.stat(key_file).st_mode & 0o777) != 0o600: raise FilePermissionError("Insecure key file permissions!") with open(key_file, 'r') as f: key = f.readline() return key
def clearUnMasked(func): """Decorator which clears the umask for a method. The umask is a permissions mask that gets applied whenever new files or folders are created. For I/O methods that have a permissions parameter, it is important that the umask is cleared prior to execution, otherwise the default umask may alter the resulting permissions :type func: function """ @functools.wraps(func) def wrapper(*args, **kwargs): # set umask to zero, store old umask oldMask = os.umask(0) try: # execute method payload return func(*args, **kwargs) finally: # set mask back to previous value os.umask(oldMask) return wrapper
def drop_privileges(self, uid_name, gid_name): if os.getuid() != 0: # We're not root so, like, whatever dude self.logger.info("Not running as root. Cannot drop permissions.") return # Get the uid/gid from the name running_uid = pwd.getpwnam(uid_name).pw_uid running_gid = grp.getgrnam(gid_name).gr_gid # Remove group privileges os.setgroups([]) # Try setting the new uid/gid os.setgid(running_gid) os.setuid(running_uid) # Ensure a very conservative umask old_umask = os.umask(0o077) self.logger.info("Changed permissions to: %s: %i, %s, %i"%(uid_name, running_uid, gid_name, running_gid))
def daemonize(): # See http://www.erlenstar.demon.co.uk/unix/faq_toc.html#TOC16 if os.fork(): # launch child and... os._exit(0) # kill off parent os.setsid() if os.fork(): # launch child and... os._exit(0) # kill off parent again. os.umask(077) null=os.open('/dev/null', os.O_RDWR) for i in range(3): try: os.dup2(null, i) except OSError, e: if e.errno != errno.EBADF: raise os.close(null)
def generate(self): return textwrap.dedent(""" import pupy, os if os.name == 'posix': pupy.infos['daemonize']=True if os.fork(): # launch child and... os._exit(0) # kill off parent os.setsid() if os.fork(): # launch child and... os._exit(0) # kill off parent again. os.umask(022) # Don't allow others to write null=os.open('/dev/null', os.O_RDWR) for i in range(3): try: os.dup2(null, i) except OSError, e: if e.errno != errno.EBADF: raise os.close(null) """)
def _perform_single(self, achall): response, validation = achall.response_and_validation() root_path = self.full_roots[achall.domain] validation_path = self._get_validation_path(root_path, achall) logger.debug("Attempting to save validation to %s", validation_path) # Change permissions to be world-readable, owner-writable (GH #1795) old_umask = os.umask(0o022) try: with open(validation_path, "wb") as validation_file: validation_file.write(validation.encode()) finally: os.umask(old_umask) self.performed[root_path].add(achall) return response
def _create_default_file(): """Creates the default profile file, returning its contents.""" import json profiles_default_data = { 'active_profile': None, 'profiles': {} } os.makedirs(profiles_path, exist_ok=True) # Populate the file, ensuring that its permissions are restrictive enough. old_umask = os.umask(0o077) try: with open(profiles_file, 'w', encoding='utf8') as outfile: json.dump(profiles_default_data, outfile) finally: os.umask(old_umask) return profiles_default_data
def become_daemon(self, root_dir='/'): if os.fork() != 0: # launch child and ... os._exit(0) # kill off parent os.setsid() os.chdir(root_dir) os.umask(0) if os.fork() != 0: # fork again so we are not a session leader os._exit(0) sys.stdin.close() sys.__stdin__ = sys.stdin sys.stdout.close() sys.stdout = sys.__stdout__ = _NullDevice() sys.stderr.close() sys.stderr = sys.__stderr__ = _NullDevice() for fd in range(1024): try: os.close(fd) except OSError: pass
def __init__(self, cfg): old_umask = os.umask(cfg.umask) fdir = cfg.worker_tmp_dir if fdir and not os.path.isdir(fdir): raise RuntimeError("%s doesn't exist. Can't create workertmp." % fdir) fd, name = tempfile.mkstemp(prefix="wgunicorn-", dir=fdir) # allows the process to write to the file util.chown(name, cfg.uid, cfg.gid) os.umask(old_umask) # unlink the file so we don't leak tempory files try: if not IS_CYGWIN: util.unlink(name) self._tmp = os.fdopen(fd, 'w+b', 1) except: os.close(fd) raise self.spinner = 0
def get_process_umask(): result = os.umask(0o22) os.umask(result) return result
def current_umask(): """Get the current umask which involves having to set it temporarily.""" mask = os.umask(0) os.umask(mask) return mask
def unzip_file(filename, location, flatten=True): """ Unzip the file (with path `filename`) to the destination `location`. All files are written based on system defaults and umask (i.e. permissions are not preserved), except that regular file members with any execute permissions (user, group, or world) have "chmod +x" applied after being written. Note that for windows, any execute changes using os.chmod are no-ops per the python docs. """ ensure_dir(location) zipfp = open(filename, 'rb') try: zip = zipfile.ZipFile(zipfp, allowZip64=True) leading = has_leading_dir(zip.namelist()) and flatten for info in zip.infolist(): name = info.filename data = zip.read(name) fn = name if leading: fn = split_leading_dir(name)[1] fn = os.path.join(location, fn) dir = os.path.dirname(fn) if fn.endswith('/') or fn.endswith('\\'): # A directory ensure_dir(fn) else: ensure_dir(dir) fp = open(fn, 'wb') try: fp.write(data) finally: fp.close() mode = info.external_attr >> 16 # if mode and regular file and any execute permissions for # user/group/world? if mode and stat.S_ISREG(mode) and mode & 0o111: # make dest file have execute for user/group/world # (chmod +x) no-op on windows per python docs os.chmod(fn, (0o777 - current_umask() | 0o111)) finally: zipfp.close()
def current_umask(): tmp = os.umask(0o022) os.umask(tmp) return tmp
def daemonize(self): try: pid = os.fork() if pid > 0: sys.exit(0) except OSError, e: sys.stderr.write("fork #1 failed: %d (%s)\n" % (e.errno, e.strerror)) sys.exit(1) os.chdir("/") os.setsid() os.umask(0) try: pid = os.fork() if pid > 0: sys.exit(0) except OSError, e: sys.stderr.write("fork #2 failed: %d (%s)\n" % (e.errno, e.strerror)) sys.exit(1) sys.stdout.flush() sys.stderr.flush() si = file(self.stdin, 'r') so = file(self.stdout, 'a+') se = file(self.stderr, 'a+', 0) os.dup2(si.fileno(), sys.stdin.fileno()) os.dup2(so.fileno(), sys.stdout.fileno()) os.dup2(se.fileno(), sys.stderr.fileno()) atexit.register(self.delpid) pid = str(os.getpid()) file(self.pidfile,'w+').write("%s\n" % pid)
def daemonize(double_fork=True): '''Puts process in the background using usual UNIX best practices.''' try: os.umask(0o22) except Exception as e: raise Exception("Unable to change file creation mask: %s" % e) os.chdir('/') # First fork if double_fork: try: pid = os.fork() if pid > 0: os._exit(0) except OSError as e: raise Exception("Error on first fork: [%d] %s" % (e.errno, e.strerr,)) os.setsid() # Second fork try: pid = os.fork() if pid > 0: os._exit(0) except OSError as e: raise Exception("Error on second fork: [%d] %s" % (e.errno, e.strerr,)) close_open_files() os.dup2(os.open(os.devnull, os.O_RDWR), sys.stdin.fileno()) os.dup2(os.open(os.devnull, os.O_RDWR), sys.stdout.fileno()) os.dup2(os.open(os.devnull, os.O_RDWR), sys.stderr.fileno())
def open(file, flag=None, mode=0666): """Open the database file, filename, and return corresponding object. The flag argument, used to control how the database is opened in the other DBM implementations, is ignored in the dumbdbm module; the database is always opened for update, and will be created if it does not exist. The optional mode argument is the UNIX mode of the file, used only when the database has to be created. It defaults to octal code 0666 (and will be modified by the prevailing umask). """ # flag argument is currently ignored # Modify mode depending on the umask try: um = _os.umask(0) _os.umask(um) except AttributeError: pass else: # Turn off any bits that are set in the umask mode = mode & (~um) return _Database(file, mode)
def _create_file_if_needed(self): """Create an empty file if necessary. This method will not initialize the file. Instead it implements a simple version of "touch" to ensure the file has been created. """ if not os.path.exists(self._file.filename()): old_umask = os.umask(0o177) try: open(self._file.filename(), 'a+b').close() finally: os.umask(old_umask)
def _create_file_if_needed(self): """Create an empty file if necessary. This method will not initialize the file. Instead it implements a simple version of "touch" to ensure the file has been created. """ if not os.path.exists(self._filename): old_umask = os.umask(0o177) try: open(self._filename, 'a+b').close() finally: os.umask(old_umask)
def setUp(self): self.umask = os.umask(0o022)
def tearDown(self): os.umask(self.umask)
def _setupSocket(self): """Creates and binds the socket for communication with the server.""" oldUmask = None if type(self._bindAddress) is str: # Unix socket sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) try: os.unlink(self._bindAddress) except OSError: pass if self._umask is not None: oldUmask = os.umask(self._umask) else: # INET socket assert type(self._bindAddress) is tuple assert len(self._bindAddress) == 2 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind(self._bindAddress) sock.listen(socket.SOMAXCONN) if oldUmask is not None: os.umask(oldUmask) return sock
def current_umask(): tmp = os.umask(0x12) # 022 os.umask(tmp) return tmp
def main(): os.umask(0) create_wishlist() create_baitlist() create_oneclick() names_from_wishlist() while True: names_from_wishlist() time.sleep(60) print_modellist()
def main(): os.umask(0) create_wgetrc() create_wishlist() names_from_wishlist() while True: names_from_wishlist() print("Currently following Models are being baited:") print_modellist() time.sleep(60)
def daemonize(stdin="/dev/null", stdout="/dev/null", stderr="/dev/null"): ''' Daemonize current script ''' try: pid = os.fork() if pid > 0: sys.exit(0) except OSError, e: sys.stderr.write ("fork #1 failed: (%d) %s\n" % (e.errno, e.strerror) ) sys.exit(1) os.chdir("/") os.umask(0) os.setsid() try: pid = os.fork() if pid > 0: sys.exit(0) except OSError, e: sys.stderr.write ("fork #2 failed: (%d) %s\n" % (e.errno, e.strerror) ) sys.exit(1) stdin_par = os.path.dirname(stdin) stdout_par = os.path.dirname(stdout) stderr_par = os.path.dirname(stderr) if not stdin_par: os.path.makedirs(stdin_par) if not stdout_par: os.path.makedirs(stdout_par) if not stderr_par: os.path.makedirs(stderr_par) si = open(stdin, 'r') so = open(stdout, 'a+') se = open(stderr, 'a+', 0) os.dup2(si.fileno(), sys.stdin.fileno()) os.dup2(so.fileno(), sys.stdout.fileno()) os.dup2(se.fileno(), sys.stderr.fileno())
def _create_file_if_needed(self): """Create an empty file if necessary. This method will not initialize the file. Instead it implements a simple version of "touch" to ensure the file has been created. """ if not os.path.exists(self._filename): old_umask = os.umask(0177) try: open(self._filename, 'a+b').close() finally: os.umask(old_umask)
def _create_file_if_needed(self): """Create an empty file if necessary. This method will not initialize the file. Instead it implements a simple version of "touch" to ensure the file has been created. """ if not os.path.exists(self._file.filename()): old_umask = os.umask(0177) try: open(self._file.filename(), 'a+b').close() finally: os.umask(old_umask)