我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.fchmod()。
def _AddDiversions(self, chroot_path): for source, dest in self._DIVERSIONS.items(): self._ExecChroot( chroot_path, 'dpkg-divert', '--local', '--rename', '--add', source) self._ExecChroot( chroot_path, 'ln', '--symbolic', '--force', dest, source) with open(os.path.join(chroot_path, 'usr', 'sbin', 'policy-rc.d'), 'w') as fh: fh.write('#!/bin/sh\n') fh.write('exit 101\n') os.fchmod(fh.fileno(), 0o744)
def write_file(path, content, owner='root', group='root', perms=0o444): """Create or overwrite a file with the contents of a byte string.""" uid = pwd.getpwnam(owner).pw_uid gid = grp.getgrnam(group).gr_gid # lets see if we can grab the file and compare the context, to avoid doing # a write. existing_content = None existing_uid, existing_gid = None, None try: with open(path, 'rb') as target: existing_content = target.read() stat = os.stat(path) existing_uid, existing_gid = stat.st_uid, stat.st_gid except: pass if content != existing_content: log("Writing file {} {}:{} {:o}".format(path, owner, group, perms), level=DEBUG) with open(path, 'wb') as target: os.fchown(target.fileno(), uid, gid) os.fchmod(target.fileno(), perms) target.write(content) return # the contents were the same, but we might still need to change the # ownership. if existing_uid != uid: log("Changing uid on already existing content: {} -> {}" .format(existing_uid, uid), level=DEBUG) os.chown(path, uid, -1) if existing_gid != gid: log("Changing gid on already existing content: {} -> {}" .format(existing_gid, gid), level=DEBUG) os.chown(path, -1, gid)
def writefile(path, data, owner=None, group=None, mode=None): if isinstance(data, bytes): openmode = 'wb' else: openmode = 'w' if owner is not None: uid = pwd.getpwnam(owner).pw_uid else: uid = -1 if group is not None: gid = grp.getgrnam(group).gr_gid else: gid = -1 with open(path, openmode) as f: os.fchown( f.fileno(), uid, gid ) if mode is not None: os.fchmod(f.fileno(), mode) return f.write(data)
def write_data(fpath, data, modified, raise_err=True, tmp_dir=None): """Safely write data to file path. """ tmp_dir = tmp_dir or os.path.dirname(fpath) with tempfile.NamedTemporaryFile(dir=tmp_dir, delete=False, prefix='.tmp') as temp: if data: temp.write(data) os.fchmod(temp.fileno(), 0o644) os.utime(temp.name, (modified, modified)) try: os.rename(temp.name, fpath) except OSError: _LOGGER.error('Unable to rename: %s => %s', temp.name, fpath, exc_info=True) if raise_err: raise
def create_script(filename, templatename, mode=EXEC_MODE, **kwargs): """This Creates a file from a JINJA template. The templates exist in our lib/python/treadmill/templates directory. :param ``str`` filename: Name of the file to generate. :param ``str`` templatename: The name of the template file. :param ``int`` mode: The mode for the file (Defaults to +x). :param ``dict`` kwargs: key/value passed into the template. """ filepath = os.path.dirname(filename) with tempfile.NamedTemporaryFile(dir=filepath, delete=False) as f: for data in generate_template(templatename, **kwargs): f.write(data) if os.name == 'posix': # cast to int required in order for default EXEC_MODE to work os.fchmod(f.fileno(), int(mode)) os.rename(f.name, filename)
def set_list_write(filename, entries): """Write a list of values to a file. One per line. :param ``str`` filename: Name of the file to read. :param ``set`` entries: Set of values to write into ``filename``. Value can be unicode. """ values = { entry.encode(encoding='utf8', errors='replace') for entry in entries } with io.open(filename, 'wb') as f: f.writelines(values) if os.name == 'posix': os.fchmod(f.fileno(), 0o644)
def script_write(filename, script): """Write a script to a file. Proper execute permissions will be set. :param ``str`` filename: File to write to. :param ``script: String or iterable returning strings. Can be unicode. """ if isinstance(script, six.string_types): # If the script is fully provided in a string, wrap it in a StringIO if hasattr(script, 'decode'): script = io.StringIO(script.decode()) else: script = io.StringIO(script) with io.open(filename, 'wb') as f: for chunk in script: # The value must be properly encoded data = chunk.encode(encoding='utf8', errors='replace') f.write(data) if os.name == 'posix': os.fchmod(f.fileno(), 0o755)
def safe_replacement(path, *open_args, mode=None, **open_kws): path = str(path) if mode is None: try: mode = stat.S_IMODE(os.lstat(path).st_mode) except OSError: pass open_kws.update( delete=False, dir=os.path.dirname(path), prefix=os.path.basename(path)+'.' ) if not open_args: open_kws['mode'] = 'w' with tempfile.NamedTemporaryFile(*open_args, **open_kws) as tmp: try: if mode is not None: os.fchmod(tmp.fileno(), mode) yield tmp if not tmp.closed: tmp.flush() os.rename(tmp.name, path) finally: try: os.unlink(tmp.name) except OSError: pass
def submit(config, user, run_id, pids): """ Submits pipeline defined by 'config' as user 'user'. Dumps the config in a temp. file that is removed after succesful completion. Returns exit code, stdout, and stderr. """ pids[run_id] = mp.current_process().pid (fd, tmp_cfg) = tempfile.mkstemp(prefix='pypers_', suffix='.cfg', text=True) os.fchmod(fd, 0644) with os.fdopen(fd, 'w') as fh: json.dump(config, fh) cmd = [which('np_submit.py'), '-i', tmp_cfg] (ec, err, out) = run_as(cmd=cmd, user=user) if ec == 0: os.unlink(tmp_cfg) return (err, out) else: raise Exception('Unable to execute cmd %s:\n%s\n%s' % (cmd, err, out))
def write_homepage(self, status_info): """ Re-writes the site homepage using the provided statistics in the homepage template (which is effectively a simple Python format string). :param tuple status_info: A namedtuple containing statistics obtained by :class:`BigBrother`. """ self.logger.info('writing homepage') with tempfile.NamedTemporaryFile(mode='w', dir=str(self.output_path), delete=False) as index: try: index.file.write(self.homepage_template.format(**status_info)) except BaseException: index.delete = True raise else: os.fchmod(index.file.fileno(), 0o664) os.replace(index.name, str(self.output_path / 'index.html'))
def _create_tempfile(self, pathname): # pragma: no cover path = self.join(pathname) dirname = os.path.dirname(path) if not os.path.exists(dirname): tracing.trace('os.makedirs(%s)' % dirname) try: os.makedirs(dirname, mode=obnamlib.NEW_DIR_MODE) except OSError as e: # pragma: no cover # This avoids a race condition: another Obnam process # may have created the directory between our check and # creation attempt. If so, we ignore it. As long as # the directory exists, all's good. if e.errno != errno.EEXIST: raise fd, tempname = tempfile.mkstemp(dir=dirname) os.fchmod(fd, obnamlib.NEW_FILE_MODE) os.close(fd) f = self.open(tempname, 'wb') f.close() return tempname
def handle_begin_put(req_id, path, mode): prev_umask = os.umask(0o077) try: if path is None: f = tempfile.NamedTemporaryFile(delete=False) path = wpath = f.name else: path = force_str(path) if os.path.isdir(path): raise IOError('%s is a directory' % path) wpath = path + '~chopsticks-tmp' f = open(wpath, 'wb') finally: os.umask(prev_umask) os.fchmod(f.fileno(), mode) active_puts[req_id] = (f, wpath, path, sha1())
def write_file(path, content, owner='root', group='root', perms=0o444): """Create or overwrite a file with the contents of a byte string.""" log("Writing file {} {}:{} {:o}".format(path, owner, group, perms)) uid = pwd.getpwnam(owner).pw_uid gid = grp.getgrnam(group).gr_gid with open(path, 'wb') as target: os.fchown(target.fileno(), uid, gid) os.fchmod(target.fileno(), perms) target.write(content)
def store_context(self, file_name, config_data): if not os.path.isabs(file_name): file_name = os.path.join(hookenv.charm_dir(), file_name) with open(file_name, 'w') as file_stream: os.fchmod(file_stream.fileno(), 0o600) yaml.dump(config_data, file_stream)
def write_file(path, content, owner='root', group='root', perms=0o444): """Create or overwrite a file with the contents of a byte string.""" uid = pwd.getpwnam(owner).pw_uid gid = grp.getgrnam(group).gr_gid # lets see if we can grab the file and compare the context, to avoid doing # a write. existing_content = None existing_uid, existing_gid = None, None try: with open(path, 'rb') as target: existing_content = target.read() stat = os.stat(path) existing_uid, existing_gid = stat.st_uid, stat.st_gid except: pass if content != existing_content: log("Writing file {} {}:{} {:o}".format(path, owner, group, perms), level=DEBUG) with open(path, 'wb') as target: os.fchown(target.fileno(), uid, gid) os.fchmod(target.fileno(), perms) if six.PY3 and isinstance(content, six.string_types): content = content.encode('UTF-8') target.write(content) return # the contents were the same, but we might still need to change the # ownership. if existing_uid != uid: log("Changing uid on already existing content: {} -> {}" .format(existing_uid, uid), level=DEBUG) os.chown(path, uid, -1) if existing_gid != gid: log("Changing gid on already existing content: {} -> {}" .format(existing_gid, gid), level=DEBUG) os.chown(path, -1, gid)
def create_shell_script(filename, shell_script): shell_script = '#!/bin/bash\n' + 'echo Entered ' + filename + '\n' + shell_script shell_script = shell_script + 'echo Exited ' + filename + '\n' with open(filename, 'w') as file_: file_.write(shell_script) fd = os.open(filename, os.O_RDONLY) os.fchmod(fd, 0755) os.close(fd)
def write_pid_file(self): with open(PID_PATH,'w+') as f: os.fchmod(f.fileno(), 0666) f.write(str(self.pid)) f.flush()
def set_pid(self, pid): with open(self.pid_path,'w+') as f: os.fchmod(f.fileno(), 0666) f.write(str(pid)) f.flush() # both contexts
def mkstemp(*args, **kwargs): """Wrap tempfile.mkstemp, setting the permissions of the created temporary file as specified in settings (see bug 1983). """ fd, name = tempfile.mkstemp(*args, **kwargs) if hasattr(os, 'fchmod'): os.fchmod(fd, settings.POOTLE_SYNC_FILE_MODE) else: os.chmod(name, settings.POOTLE_SYNC_FILE_MODE) return fd, name
def main(): module = icon_lib.IconModule(FLAGS.chroot_path) os.mkdir(os.path.join(FLAGS.chroot_path, 'systemid')) tool_path = os.path.join(FLAGS.chroot_path, 'icon', 'systemid') os.makedirs(tool_path, exist_ok=True) script = os.path.join(tool_path, 'startup.sh') with open(script, 'w') as fh: os.fchmod(fh.fileno(), 0o755) fh.write("""\ #!/bin/bash e2fsck -p LABEL=SYSTEMID mount -o data=journal,noatime,sync LABEL=SYSTEMID /systemid . /systemid/systemid echo ${SYSTEMID} > /etc/hostname hostname --file /etc/hostname grep ${SYSTEMID} /etc/hosts >/dev/null || echo "127.0.2.1 ${SYSTEMID}" >> /etc/hosts """) with module.ServiceFile('systemid.service') as fh: fh.write(""" [Unit] Description=Mount /systemid and configure from it DefaultDependencies=no Conflicts=shutdown.target After=systemd-remount-fs.service Before=sysinit.target [Service] Type=oneshot RemainAfterExit=yes ExecStart=/icon/systemid/startup.sh [Install] WantedBy=sysinit.target """) module.EnableService('systemid.service')
def main(): base = os.path.dirname(sys.argv[0]) unsigned_manifest = os.path.join(FLAGS.image_dir, 'manifest.json.unsigned') signed_manifest = os.path.join(FLAGS.image_dir, 'manifest.json') with tempfile.NamedTemporaryFile(dir=FLAGS.image_dir, delete=False) as fh: try: Exec( os.path.join(base, 'build_manifest.py'), '--default-rollout', str(FLAGS.default_rollout), '--image-dir', FLAGS.image_dir, '--old-manifest', unsigned_manifest, '--max-images', str(FLAGS.max_images), stdout=fh) os.rename(fh.name, unsigned_manifest) except: os.unlink(fh.name) raise with tempfile.NamedTemporaryFile(dir=FLAGS.image_dir, delete=False) as fh, open(unsigned_manifest, 'r') as fh_in: try: args = [ os.path.join(base, 'wrap_file.py'), '--cert', FLAGS.cert, '--key', FLAGS.key, ] for other_cert in FLAGS.other_certs or []: args.extend([ '--other-cert', other_cert, ]) Exec( *args, stdin=fh_in, stdout=fh) os.fchmod(fh.fileno(), 0o644) os.rename(fh.name, signed_manifest) except: os.unlink(fh.name) raise