Python stat 模块,S_IRGRP 实例源码

我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用stat.S_IRGRP

项目:nitro    作者:KVM-VMI    | 项目源码 | 文件源码
def main(args):
    vm_name = args['<vm_name>']
    # get domain from libvirt
    con = libvirt.open('qemu:///system')
    domain = con.lookupByName(vm_name)

    path = os.path.join(os.getcwd(), '{}.raw'.format(vm_name))
    with open(path, 'w') as f:
        # chmod to be r/w by everyone
        os.chmod(path, stat.S_IRUSR | stat.S_IWUSR |
                                stat.S_IRGRP | stat.S_IWGRP |
                                stat.S_IROTH | stat.S_IWOTH)
        # take a ram dump
        flags = libvirt.VIR_DUMP_MEMORY_ONLY
        dumpformat = libvirt.VIR_DOMAIN_CORE_DUMP_FORMAT_RAW
        domain.coreDumpWithFormat(path, dumpformat, flags)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_read_only_bytecode(self):
        # When bytecode is read-only but should be rewritten, fail silently.
        with source_util.create_modules('_temp') as mapping:
            # Create bytecode that will need to be re-created.
            py_compile.compile(mapping['_temp'])
            bytecode_path = imp.cache_from_source(mapping['_temp'])
            with open(bytecode_path, 'r+b') as bytecode_file:
                bytecode_file.seek(0)
                bytecode_file.write(b'\x00\x00\x00\x00')
            # Make the bytecode read-only.
            os.chmod(bytecode_path,
                        stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
            try:
                # Should not raise IOError!
                self.import_(mapping['_temp'], '_temp')
            finally:
                # Make writable for eventual clean-up.
                os.chmod(bytecode_path, stat.S_IWUSR)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_execute_bit_not_copied(self):
        # Issue 6070: under posix .pyc files got their execute bit set if
        # the .py file had the execute bit set, but they aren't executable.
        with temp_umask(0o022):
            sys.path.insert(0, os.curdir)
            try:
                fname = TESTFN + os.extsep + "py"
                open(fname, 'w').close()
                os.chmod(fname, (stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH |
                                 stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH))
                fn = imp.cache_from_source(fname)
                unlink(fn)
                __import__(TESTFN)
                if not os.path.exists(fn):
                    self.fail("__import__ did not result in creation of "
                              "either a .pyc or .pyo file")
                s = os.stat(fn)
                self.assertEqual(stat.S_IMODE(s.st_mode),
                                 stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
            finally:
                del sys.path[0]
                remove_files(TESTFN)
                unload(TESTFN)
项目:indy-node    作者:hyperledger    | 项目源码 | 文件源码
def set_own_perm(usr, dir_list):
    uid = pwd.getpwnam(usr).pw_uid
    gid = grp.getgrnam(usr).gr_gid
    perm_mask_rw = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IWGRP
    perm_mask_rwx = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IWGRP | stat.S_IXGRP

    for cdir in dir_list:
        os.chown(cdir, uid, gid)
        os.chmod(cdir, perm_mask_rwx)
        for croot, sub_dirs, cfiles in os.walk(cdir):
            for fs_name in sub_dirs:
                os.chown(os.path.join(croot, fs_name), uid, gid)
                os.chmod(os.path.join(croot, fs_name), perm_mask_rwx)
            for fs_name in cfiles:
                os.chown(os.path.join(croot, fs_name), uid, gid)
                os.chmod(os.path.join(croot, fs_name), perm_mask_rw)
项目:indy-node    作者:hyperledger    | 项目源码 | 文件源码
def set_own_perm(usr, dir_list):
    uid = pwd.getpwnam(usr).pw_uid
    gid = grp.getgrnam(usr).gr_gid
    perm_mask_rw = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IWGRP
    perm_mask_rwx = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IWGRP | stat.S_IXGRP

    for cdir in dir_list:
        os.chown(cdir, uid, gid)
        os.chmod(cdir, perm_mask_rwx)
        for croot, sub_dirs, cfiles in os.walk(cdir):
            for fs_name in sub_dirs:
                os.chown(os.path.join(croot, fs_name), uid, gid)
                os.chmod(os.path.join(croot, fs_name), perm_mask_rwx)
            for fs_name in cfiles:
                os.chown(os.path.join(croot, fs_name), uid, gid)
                os.chmod(os.path.join(croot, fs_name), perm_mask_rw)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_execute_bit_not_copied(self):
        # Issue 6070: under posix .pyc files got their execute bit set if
        # the .py file had the execute bit set, but they aren't executable.
        oldmask = os.umask(022)
        sys.path.insert(0, os.curdir)
        try:
            fname = TESTFN + os.extsep + "py"
            f = open(fname, 'w').close()
            os.chmod(fname, (stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH |
                             stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH))
            __import__(TESTFN)
            fn = fname + 'c'
            if not os.path.exists(fn):
                fn = fname + 'o'
                if not os.path.exists(fn):
                    self.fail("__import__ did not result in creation of "
                              "either a .pyc or .pyo file")
            s = os.stat(fn)
            self.assertEqual(stat.S_IMODE(s.st_mode),
                             stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
        finally:
            os.umask(oldmask)
            remove_files(TESTFN)
            unload(TESTFN)
            del sys.path[0]
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_execute_bit_not_copied(self):
        # Issue 6070: under posix .pyc files got their execute bit set if
        # the .py file had the execute bit set, but they aren't executable.
        oldmask = os.umask(022)
        sys.path.insert(0, os.curdir)
        try:
            fname = TESTFN + os.extsep + "py"
            f = open(fname, 'w').close()
            os.chmod(fname, (stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH |
                             stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH))
            __import__(TESTFN)
            fn = fname + 'c'
            if not os.path.exists(fn):
                fn = fname + 'o'
                if not os.path.exists(fn):
                    self.fail("__import__ did not result in creation of "
                              "either a .pyc or .pyo file")
            s = os.stat(fn)
            self.assertEqual(stat.S_IMODE(s.st_mode),
                             stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
        finally:
            os.umask(oldmask)
            remove_files(TESTFN)
            unload(TESTFN)
            del sys.path[0]
项目:polyloader    作者:elfsternberg    | 项目源码 | 文件源码
def test_execute_bit_not_copied(self):
        # Issue 6070: under posix .pyc files got their execute bit set if
        # the .py file had the execute bit set, but they aren't executable.
        oldmask = os.umask(0o22)
        sys.path.insert(0, os.curdir)
        try:
            fname = TESTFN + os.extsep + "py"
            f = open(fname, 'w').close()
            os.chmod(fname, (stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH |
                             stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH))
            __import__(TESTFN)
            fn = fname + 'c'
            if not os.path.exists(fn):
                fn = fname + 'o'
                if not os.path.exists(fn):
                    self.fail("__import__ did not result in creation of "
                              "either a .pyc or .pyo file")
            s = os.stat(fn)
            assert(s.st_mode & (stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH))
            assert(not (s.st_mode & (stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)))
        finally:
            os.umask(oldmask)
            clean_tmpfiles(fname)
            unload(TESTFN)
            del sys.path[0]
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_read_only_bytecode(self):
        # When bytecode is read-only but should be rewritten, fail silently.
        with source_util.create_modules('_temp') as mapping:
            # Create bytecode that will need to be re-created.
            py_compile.compile(mapping['_temp'])
            bytecode_path = imp.cache_from_source(mapping['_temp'])
            with open(bytecode_path, 'r+b') as bytecode_file:
                bytecode_file.seek(0)
                bytecode_file.write(b'\x00\x00\x00\x00')
            # Make the bytecode read-only.
            os.chmod(bytecode_path,
                        stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
            try:
                # Should not raise IOError!
                self.import_(mapping['_temp'], '_temp')
            finally:
                # Make writable for eventual clean-up.
                os.chmod(bytecode_path, stat.S_IWUSR)
项目:find_circ2    作者:rajewsky-lab    | 项目源码 | 文件源码
def store_index(self,ipath):
        self.logger.info("# indexed_fasta.store_index('%s')" % ipath)

        # write to tmp-file first and in the end rename in order to have this atomic 
        # otherwise parallel building of the same index may screw it up.

        import tempfile
        tmp = tempfile.NamedTemporaryFile(mode="w",dir = os.path.dirname(ipath),delete=False)
        for chrom in sorted(self.chrom_stats.keys()):
            ofs,ldata,skip,skipchar,size = self.chrom_stats[chrom]
            tmp.write("%s\t%d\t%d\t%d\t%r\t%d\n" % (chrom,ofs,ldata,skip,skipchar,size))

        # make sure everything is on disk
        os.fsync(tmp)
        tmp.close()

        # make it accessible to everyone
        import stat
        os.chmod(tmp.name, stat.S_IROTH | stat.S_IRGRP | stat.S_IRUSR)

        # this is atomic on POSIX as we have created tmp in the same directory, 
        # therefore same filesystem
        os.rename(tmp.name,ipath)
项目:find_circ    作者:rajewsky-lab    | 项目源码 | 文件源码
def store_index(self,ipath):
        debug("# indexed_fasta.store_index('%s')" % ipath)

        # write to tmp-file first and in the end rename in order to have this atomic 
        # otherwise parallel building of the same index may screw it up.

        import tempfile
        tmp = tempfile.NamedTemporaryFile(mode="w",dir = os.path.dirname(ipath),delete=False)
        for chrom in sorted(self.chrom_stats.keys()):
            ofs,ldata,skip,skipchar,size = self.chrom_stats[chrom]
            tmp.write("%s\t%d\t%d\t%d\t%r\t%d\n" % (chrom,ofs,ldata,skip,skipchar,size))

        # make sure everything is on disk
        os.fsync(tmp)
        tmp.close()

        # make it accessible to everyone
        import stat
        os.chmod(tmp.name, stat.S_IROTH | stat.S_IRGRP | stat.S_IRUSR)

        # this is atomic on POSIX as we have created tmp in the same directory, 
        # therefore same filesystem
        os.rename(tmp.name,ipath)
项目:ClockworkVMs    作者:csd-dev-tools    | 项目源码 | 文件源码
def tearDown(self):
        '''
        Disconnect ramdisk, unloading data to pre-build location.
        '''
        self.mbl.chownR(self.keyuser, self.tmphome + "/src")

        # chmod so it's readable by everyone, writable by the group
        self.mbl.chmodR(stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH |
                        stat.S_IWGRP, self.tmphome + "/src", "append")
        self.libc.sync()
        self.libc.sync()
        # Copy back to pseudo-build directory
        call([self.RSYNC, "-aqp", self.tmphome + "/src/MacBuild/dmgs", self.buildHome])
        call([self.RSYNC, "-aqp", self.tmphome + "/src/", self.buildHome + "/builtSrc"])
        self.libc.sync()
        self.libc.sync()

        os.chdir(self.buildHome)
        self._exit(self.ramdisk, self.luggage, 0)
项目:deepsleepnet    作者:akaraspt    | 项目源码 | 文件源码
def _create_eae_zipfile(self, zip_file_name, main_file_path, data_files=None):

        to_zip = []
        if data_files is None:
            data_files = []

        # Handle main script
        to_zip.append(main_file_path)

        # Prepare the zip file
        zip_path = "/tmp/" + zip_file_name
        zipf = zipfile.ZipFile(zip_path, mode='w', compression=zipfile.ZIP_DEFLATED, allowZip64=True)
        for f in to_zip:
            zipf.write(f)
        zipf.close()

        # Handle other files & dirs
        for f in data_files:
            zipCommand = "zip -r -u -0 " + zip_path + " " + f
            call([zipCommand], shell=True)

        # Chmod 666 the zip file so it can be accessed
        os.chmod(zip_path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IWGRP | stat.S_IROTH | stat.S_IWOTH)

        return zip_path
项目:treadmill    作者:Morgan-Stanley    | 项目源码 | 文件源码
def test_create_script_perms(self):
        """this tests the create_script function (permissions).
        """
        script_file = os.path.join(self.root, 'script')
        # Test non-default mode (+x)
        mode = (stat.S_IRUSR |
                stat.S_IRGRP |
                stat.S_IROTH)

        utils.create_script(
            script_file,
            's6.run',
            mode=mode,
            user='testproid',
            home='home',
            shell='shell',
            _alias={
                's6_setuidgid': '/test/s6-setuidgid',
            }
        )

        self.assertEqual(utils.os.stat(script_file).st_mode, 33060)
项目:treadmill    作者:Morgan-Stanley    | 项目源码 | 文件源码
def save_app(manifest, container_dir, app_json=STATE_JSON):
    """Saves app manifest and freezes to object."""
    # Save the manifest with allocated vip and ports in the state
    state_file = os.path.join(container_dir, app_json)
    fs.write_safe(
        state_file,
        lambda f: json.dump(manifest, f)
    )
    # chmod for the file to be world readable.
    if os.name == 'posix':
        os.chmod(
            state_file,
            stat.S_IWUSR | stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
        )

    # Freeze the app data into a namedtuple object
    return utils.to_obj(manifest)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_execute_bit_not_copied(self):
        # Issue 6070: under posix .pyc files got their execute bit set if
        # the .py file had the execute bit set, but they aren't executable.
        oldmask = os.umask(022)
        sys.path.insert(0, os.curdir)
        try:
            fname = TESTFN + os.extsep + "py"
            f = open(fname, 'w').close()
            os.chmod(fname, (stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH |
                             stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH))
            __import__(TESTFN)
            fn = fname + 'c'
            if not os.path.exists(fn):
                fn = fname + 'o'
                if not os.path.exists(fn):
                    self.fail("__import__ did not result in creation of "
                              "either a .pyc or .pyo file")
            s = os.stat(fn)
            self.assertEqual(stat.S_IMODE(s.st_mode),
                             stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
        finally:
            os.umask(oldmask)
            remove_files(TESTFN)
            unload(TESTFN)
            del sys.path[0]
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_read_only_bytecode(self):
        # When bytecode is read-only but should be rewritten, fail silently.
        with source_util.create_modules('_temp') as mapping:
            # Create bytecode that will need to be re-created.
            py_compile.compile(mapping['_temp'])
            bytecode_path = self.util.cache_from_source(mapping['_temp'])
            with open(bytecode_path, 'r+b') as bytecode_file:
                bytecode_file.seek(0)
                bytecode_file.write(b'\x00\x00\x00\x00')
            # Make the bytecode read-only.
            os.chmod(bytecode_path,
                        stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
            try:
                # Should not raise OSError!
                self.import_(mapping['_temp'], '_temp')
            finally:
                # Make writable for eventual clean-up.
                os.chmod(bytecode_path, stat.S_IWUSR)
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_execute_bit_not_copied(self):
        # Issue 6070: under posix .pyc files got their execute bit set if
        # the .py file had the execute bit set, but they aren't executable.
        oldmask = os.umask(022)
        sys.path.insert(0, os.curdir)
        try:
            fname = TESTFN + os.extsep + "py"
            f = open(fname, 'w').close()
            os.chmod(fname, (stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH |
                             stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH))
            __import__(TESTFN)
            fn = fname + 'c'
            if not os.path.exists(fn):
                fn = fname + 'o'
                if not os.path.exists(fn):
                    self.fail("__import__ did not result in creation of "
                              "either a .pyc or .pyo file")
            s = os.stat(fn)
            self.assertEqual(stat.S_IMODE(s.st_mode),
                             stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
        finally:
            os.umask(oldmask)
            remove_files(TESTFN)
            unload(TESTFN)
            del sys.path[0]
项目:spark    作者:chenguolin    | 项目源码 | 文件源码
def _octal_to_perm(octal):
    perms = list("-" * 9)
    if octal & stat.S_IRUSR:
        perms[0] = "r"
    if octal & stat.S_IWUSR:
        perms[1] = "w"
    if octal & stat.S_IXUSR:
        perms[2] = "x"
    if octal & stat.S_IRGRP:
        perms[3] = "r"
    if octal & stat.S_IWGRP:
        perms[4] = "w"
    if octal & stat.S_IXGRP:
        perms[5] = "x"
    if octal & stat.S_IROTH:
        perms[6] = "r"
    if octal & stat.S_IWOTH:
        perms[7] = "w"
    if octal & stat.S_IXOTH:
        perms[8] = "x"
    return "".join(perms)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_read_only_bytecode(self):
        # When bytecode is read-only but should be rewritten, fail silently.
        with source_util.create_modules('_temp') as mapping:
            # Create bytecode that will need to be re-created.
            py_compile.compile(mapping['_temp'])
            bytecode_path = self.util.cache_from_source(mapping['_temp'])
            with open(bytecode_path, 'r+b') as bytecode_file:
                bytecode_file.seek(0)
                bytecode_file.write(b'\x00\x00\x00\x00')
            # Make the bytecode read-only.
            os.chmod(bytecode_path,
                        stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
            try:
                # Should not raise OSError!
                self.import_(mapping['_temp'], '_temp')
            finally:
                # Make writable for eventual clean-up.
                os.chmod(bytecode_path, stat.S_IWUSR)
项目:tensor2tensor    作者:tensorflow    | 项目源码 | 文件源码
def gunzip_file(gz_path, new_path):
  """Unzips from gz_path into new_path.

  Args:
    gz_path: path to the zipped file.
    new_path: path to where the file will be unzipped.
  """
  if tf.gfile.Exists(new_path):
    tf.logging.info("File %s already exists, skipping unpacking" % new_path)
    return
  tf.logging.info("Unpacking %s to %s" % (gz_path, new_path))
  # We may be unpacking into a newly created directory, add write mode.
  mode = stat.S_IRWXU or stat.S_IXGRP or stat.S_IRGRP or stat.S_IROTH
  os.chmod(os.path.dirname(new_path), mode)
  with gzip.open(gz_path, "rb") as gz_file:
    with tf.gfile.GFile(new_path, mode="wb") as new_file:
      for line in gz_file:
        new_file.write(line)
项目:OnlineSchemaChange    作者:facebookincubator    | 项目源码 | 文件源码
def is_file_readable(filepath):
    """
    Check if the file given is readable to the user we are currently running
    at
    """
    uid = os.getuid()
    euid = os.geteuid()
    gid = os.getgid()
    egid = os.getegid()

    # This is probably true most of the time, so just let os.access()
    # handle it.  Avoids potential bugs in the rest of this function.
    if uid == euid and gid == egid:
        return os.access(filepath, os.R_OK)

    st = os.stat(filepath)

    if st.st_uid == euid:
        return st.st_mode & stat.S_IRUSR != 0

    groups = os.getgroups()
    if st.st_gid == egid or st.st_gid in groups:
        return st.st_mode & stat.S_IRGRP != 0

    return st.st_mode & stat.S_IROTH != 0
项目:SoS    作者:vatlab    | 项目源码 | 文件源码
def kill_task(task):
    status = check_task(task)
    if status == 'pending':
        return 'cancelled'
    # remove job file as well
    job_file =  os.path.join(os.path.expanduser('~'), '.sos', 'tasks', task + '.sh')
    if os.path.isfile(job_file):
        try:
            os.remove(job_file)
        except Exception:
            pass
    if status != 'running':
        return status
    # job is running
    pulse_file =  os.path.join(os.path.expanduser('~'), '.sos', 'tasks', task + '.pulse')
    from stat import S_IREAD, S_IRGRP, S_IROTH
    os.chmod(pulse_file, S_IREAD|S_IRGRP|S_IROTH)
    return 'killed'
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def set_executable(top):
    error_count=0
    def set_exec(name):
        mode = os.stat(name).st_mode
        new_mode = mode
        if new_mode & stat.S_IRUSR:
            new_mode = new_mode | stat.S_IXUSR
        if new_mode & stat.S_IRGRP:
            new_mode = new_mode | stat.S_IXGRP
        if new_mode & stat.S_IROTH:
            new_mode = new_mode | stat.S_IXOTH
        if (mode != new_mode):
            print "Setting exec for '%s' (mode %o => %o)" % (name, mode, new_mode)
            os.chmod(name, new_mode)
    def unset_exec(name):
        mode = os.stat(name).st_mode
        new_mode = mode & ~(stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)
        if (mode != new_mode):
            print "Unsetting exec for '%s' (mode %o => %o)" % (name, mode, new_mode)
            os.chmod(name, new_mode)
    for root, dirs, files in os.walk(top):
        for name in files:
            complete_name = os.path.join(root, name)
            if os.path.islink(complete_name): continue
            try: 
                f = open(complete_name, 'r')
                header = f.read(4)
                f.close()
                if header[0:2] == '#!' or header[1:4] == 'ELF':
                    set_exec(complete_name)
                else:
                    unset_exec(complete_name)
            except Exception as e:
                print "%s: %s" %  (complete_name, e.__str__())
                error_count += 1
        for name in dirs:
            complete_name = os.path.join(root, name)
            set_exec(complete_name)
    return error_count
项目:ave    作者:sonyxperiadev    | 项目源码 | 文件源码
def begin_handover(self, fdtx_dir='/tmp/ave'):
        # call this function on the broker that is going to be replaced.
        # steps:
        # stop listening for new connections from clients, stop the notifier,
        # disconnect all shares, disable further allocation (a session will
        # receive a Restarting exception if it manages to allocate at precisely
        # this moment and is expected to try again once or twice), create a
        # UNIX domain socket to transfer file descriptors (in a separate step),
        # serialize the state of the local allocator and all live sessions
        # (PIDs and RPC keys). finally return serialized data and the path to
        # UNIX domain socket.
        # first of all check that fdtx_dir is writable. otherwise the socket
        # will not be created in it and everything fails
        if os.path.exists(fdtx_dir) and os.path.isdir(fdtx_dir):
            if not os.access(fdtx_dir, os.R_OK | os.X_OK | os.W_OK):
                raise Exception('directory not writable: %s' % fdtx_dir)
        self.stop_listening()
        self.stop_sharing()
        self.drop_all_shares()
        self.stop_listers()
        self.allocating = False
        self.fdtx = FdTx(None)
        uds_path = self.fdtx.listen(fdtx_dir, 'handover-%s' % rand_authkey())
        # make sure the caller will be able to interact with the new socket by
        # making it world readable and world writable
        mode = (stat.S_IRUSR  # owner has read permission
             |  stat.S_IWUSR  # owner has write permission
             |  stat.S_IRGRP  # group has read permission
             |  stat.S_IWGRP  # group has write permission
             |  stat.S_IROTH  # others have read permission
             |  stat.S_IWOTH) # others have write permission
        os.chmod(uds_path, mode)
        return self.serialize(), self.config, uds_path
项目:ave    作者:sonyxperiadev    | 项目源码 | 文件源码
def begin_handover(self, fdtx_dir='/tmp/ave'):
        # call this function on the broker that is going to be replaced.
        # steps:
        # stop listening for new connections from clients, stop the notifier,
        # disconnect all shares, disable further allocation (a session will
        # receive a Restarting exception if it manages to allocate at precisely
        # this moment and is expected to try again once or twice), create a
        # UNIX domain socket to transfer file descriptors (in a separate step),
        # serialize the state of the local allocator and all live sessions
        # (PIDs and RPC keys). finally return serialized data and the path to
        # UNIX domain socket.
        # first of all check that fdtx_dir is writable. otherwise the socket
        # will not be created in it and everything fails
        if os.path.exists(fdtx_dir) and os.path.isdir(fdtx_dir):
            if not os.access(fdtx_dir, os.R_OK | os.X_OK | os.W_OK):
                raise Exception('directory not writable: %s' % fdtx_dir)
        self.stop_listening()
        self.stop_sharing()
        self.drop_all_shares()
        self.stop_listers()
        self.allocating = False
        self.fdtx = FdTx(None)
        uds_path = self.fdtx.listen(fdtx_dir, 'handover-%s' % rand_authkey())
        # make sure the caller will be able to interact with the new socket by
        # making it world readable and world writable
        mode = (stat.S_IRUSR  # owner has read permission
             |  stat.S_IWUSR  # owner has write permission
             |  stat.S_IRGRP  # group has read permission
             |  stat.S_IWGRP  # group has write permission
             |  stat.S_IROTH  # others have read permission
             |  stat.S_IWOTH) # others have write permission
        os.chmod(uds_path, mode)
        return self.serialize(), self.config, uds_path
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def assert_mode_644(mode):
    """Verify given mode is 644"""
    assert (mode & stat.S_IROTH) and (mode & stat.S_IRGRP)
    assert (mode & stat.S_IWUSR) and (mode & stat.S_IRUSR) and not (mode & stat.S_IXUSR)
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def assert_mode_755(mode):
    """Verify given mode is 755"""
    assert (mode & stat.S_IROTH) and (mode & stat.S_IRGRP) and (mode & stat.S_IXOTH) and (mode & stat.S_IXGRP)
    assert (mode & stat.S_IWUSR) and (mode & stat.S_IRUSR) and (mode & stat.S_IXUSR)
项目:argosd    作者:danielkoster    | 项目源码 | 文件源码
def run(self):
        """Runs the default installer and sets rights on log directory."""
        install.run(self)

        # Make argosd user owned of log directory
        uid = getpwnam('argosd').pw_uid
        gid = getpwnam('argosd').pw_gid
        os.chown(settings.LOG_PATH, uid, gid)

        # User has full access, others can read
        mode = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR | \
            stat.S_IRGRP | stat.S_IROTH
        os.chmod(settings.LOG_PATH, mode)
项目:BotBot    作者:jackstanek    | 项目源码 | 文件源码
def file_groupreadable(path):
    """Check whether a given path has bad permissons."""
    if not bool(stat.S_IRGRP & path.stat().mode):
        return 'PROB_FILE_NOT_GRPRD'
项目:DevOps    作者:YoLoveLife    | 项目源码 | 文件源码
def statinfo(st):
    return {
        'mode'     : "%04o" % stat.S_IMODE(st.st_mode),
        'isdir'    : stat.S_ISDIR(st.st_mode),
        'ischr'    : stat.S_ISCHR(st.st_mode),
        'isblk'    : stat.S_ISBLK(st.st_mode),
        'isreg'    : stat.S_ISREG(st.st_mode),
        'isfifo'   : stat.S_ISFIFO(st.st_mode),
        'islnk'    : stat.S_ISLNK(st.st_mode),
        'issock'   : stat.S_ISSOCK(st.st_mode),
        'uid'      : st.st_uid,
        'gid'      : st.st_gid,
        'size'     : st.st_size,
        'inode'    : st.st_ino,
        'dev'      : st.st_dev,
        'nlink'    : st.st_nlink,
        'atime'    : st.st_atime,
        'mtime'    : st.st_mtime,
        'ctime'    : st.st_ctime,
        'wusr'     : bool(st.st_mode & stat.S_IWUSR),
        'rusr'     : bool(st.st_mode & stat.S_IRUSR),
        'xusr'     : bool(st.st_mode & stat.S_IXUSR),
        'wgrp'     : bool(st.st_mode & stat.S_IWGRP),
        'rgrp'     : bool(st.st_mode & stat.S_IRGRP),
        'xgrp'     : bool(st.st_mode & stat.S_IXGRP),
        'woth'     : bool(st.st_mode & stat.S_IWOTH),
        'roth'     : bool(st.st_mode & stat.S_IROTH),
        'xoth'     : bool(st.st_mode & stat.S_IXOTH),
        'isuid'    : bool(st.st_mode & stat.S_ISUID),
        'isgid'    : bool(st.st_mode & stat.S_ISGID),
    }
项目:pycsvw    作者:bloomberg    | 项目源码 | 文件源码
def test_tmp_files():
    tmp_dir = tempfile.mkdtemp(dir="/tmp")
    assert len(os.listdir(tmp_dir)) == 0
    csvw = CSVW(csv_path="./tests/books.csv",
                metadata_path="./tests/books.csv-metadata.json",
                temp_dir=tmp_dir)
    assert len(os.listdir(tmp_dir)) == 0

    csvw.to_rdf(fmt="nt")
    created_files = os.listdir(tmp_dir)
    assert len(created_files) == 1, "nt serialization should generate only 1 temp file"
    assert created_files[0].endswith(".nt")

    os.remove(os.path.join(tmp_dir, created_files[0]))
    assert len(os.listdir(tmp_dir)) == 0

    csvw.to_rdf(fmt="turtle")
    created_files = os.listdir(tmp_dir)
    assert len(created_files) == 2, "ttl serialization should generate two temps file"
    assert any([f.endswith(".nt") for f in created_files])
    assert any([f.endswith(".ttl") for f in created_files])
    # Check permissions
    expected_flags = [stat.S_IRUSR, stat.S_IRGRP, stat.S_IROTH]
    unexpected_flags = [stat.S_IWUSR, stat.S_IWGRP, stat.S_IWOTH]
    for f in created_files:
        st = os.stat(os.path.join(tmp_dir, f))
        for flag, non_flag in zip(expected_flags, unexpected_flags):
            assert bool(st.st_mode & flag)
            assert not bool(st.st_mode & non_flag)

    csvw.close()
    assert len(os.listdir(tmp_dir)) == 0
项目:loopchain    作者:theloopkr    | 项目源码 | 文件源码
def load_package_by_remote(self):
        """
        ??? ??????? ???? ??

        :return:
        """
        # socore package clone from remote repository
        repository_url = self.__score_base + ':' + self.__score_package + '.git'

        # Repository Key ? ?? ?? ???.
        logging.debug("git Path :"+self.__package_path)
        # repo = Repo(str(self.__package_path))
        git = Git(os.getcwd())
        # check deploy key
        if os.path.exists(conf.DEFAULT_SCORE_REPOSITORY_KEY):
            st = os.stat(conf.DEFAULT_SCORE_REPOSITORY_KEY)
            # owner read only
            if bool(st.st_mode & stat.S_IRGRP or st.st_mode & stat.S_IROTH):
                os.chmod(conf.DEFAULT_SCORE_REPOSITORY_KEY, 0o600)
            ssh_cmd = 'ssh -o StrictHostKeyChecking=no -i '+conf.DEFAULT_SCORE_REPOSITORY_KEY
            logging.debug("SSH KEY COMMAND : "+ssh_cmd)
            git.custom_environment(GIT_SSH_COMMAND=ssh_cmd)

        logging.debug(f"load_package_by_remote repository_url({repository_url}) package_path({self.__package_path})")
        self.__package_repository = Repo._clone(git, repository_url, self.__package_path, GitCmdObjectDB, None)
        logging.debug(f"load_package_by_remote result({self.__package_repository})")

        if conf.DEFAULT_SCORE_BRANCH != conf.DEFAULT_SCORE_BRANCH_MASTER:
            self.__package_repository.git.checkout(conf.DEFAULT_SCORE_BRANCH)

        return True
项目:sherlock.py    作者:Luavis    | 项目源码 | 文件源码
def save_code(path, code):
    with open(path, 'w') as f:
        f.write("#!/usr/bin/env bash\n")
        f.write(code)
    chmod(path, stat.S_IRWXU | stat.S_IRGRP | stat.S_IROTH)
项目:biocommons.seqrepo    作者:biocommons    | 项目源码 | 文件源码
def close(self):
        if self._fh:
            self._fh.close()
            self._fh = None
            subprocess.check_call([bgzip_exe, "--force", self._basepath])
            os.rename(self._basepath + ".gz", self.filename)

            # open file with FastaFile to create indexes, then make all read-only
            _fh = FastaFile(self.filename)
            _fh.close()
            os.chmod(self.filename, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
            os.chmod(self.filename + ".fai", stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
            os.chmod(self.filename + ".gzi", stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)

            logger.info("{} written; added {} sequences".format(self.filename, len(self._added)))
项目:build    作者:fuchsia-mirror    | 项目源码 | 文件源码
def main():
  parser = argparse.ArgumentParser(
      description='Generate a script that invokes the Dart tester')
  parser.add_argument('--out',
                      help='Path to the invocation file to generate',
                      required=True)
  parser.add_argument('--source-dir',
                      help='Path to test sources',
                      required=True)
  parser.add_argument('--dot-packages',
                      help='Path to the .packages file',
                      required=True)
  parser.add_argument('--test-runner',
                      help='Path to the test runner',
                      required=True)
  parser.add_argument('--flutter-shell',
                      help='Path to the Flutter shell',
                      required=True)
  args = parser.parse_args()

  test_file = args.out
  test_path = os.path.dirname(test_file)
  if not os.path.exists(test_path):
    os.makedirs(test_path)

  script_template = string.Template('''#!/bin/sh

$test_runner \\
  --packages=$dot_packages \\
  --shell=$flutter_shell \\
  --test-directory=$source_dir
''')
  with open(test_file, 'w') as file:
      file.write(script_template.substitute(args.__dict__))
  permissions = (stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR |
                 stat.S_IRGRP | stat.S_IWGRP | stat.S_IXGRP |
                 stat.S_IROTH)
  os.chmod(test_file, permissions)
项目:stor    作者:counsyl    | 项目源码 | 文件源码
def test_path_no_perms(self):
        with utils.NamedTemporaryDirectory() as tmp_d:
            os.chmod(tmp_d, stat.S_IREAD | stat.S_IRGRP | stat.S_IROTH)
            self.assertFalse(utils.is_writeable(tmp_d))
项目:mock    作者:rpm-software-management    | 项目源码 | 文件源码
def _selinuxCreateFauxFilesystems():
        (fd, path) = tempfile.mkstemp(prefix="mock-selinux-plugin.")
        with os.fdopen(fd, 'w') as out:
            with open("/proc/filesystems") as host:
                for line in host:
                    if "selinuxfs" not in line:
                        out.write(line)

        os.chmod(path, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)

        return path
项目:GenomicsSampleAPIs    作者:Intel-HLS    | 项目源码 | 文件源码
def setup_uwsgi_log(self):
        output_file = os.path.join('/var/log/uwsgi',
                                   'uwsgi_{}.log'.format(self.array_name))
        with open(output_file, 'a') as outFP:
            pass
        os.chmod(output_file, stat.S_IRUSR | stat.S_IWUSR
                 | stat.S_IRGRP | stat.S_IWGRP)
        print '[INFO] Updated: {}'.format(output_file)
项目:lshapy    作者:kuking    | 项目源码 | 文件源码
def stats_to_str(s):
    return "%s%s%s%s" % (dbpcs(s.st_mode),
        rwx(s.st_mode, stat.S_IRUSR, stat.S_IWUSR, stat.S_IXUSR),
        rwx(s.st_mode, stat.S_IRGRP, stat.S_IWGRP, stat.S_IXGRP),
        rwx(s.st_mode, stat.S_IROTH, stat.S_IWOTH, stat.S_IXOTH))
项目:scorep_binding_python    作者:score-p    | 项目源码 | 文件源码
def add_exec(file_path):
    """Add execution rights for user, group and others to the given file"""

    print("change permissions")
    # change Permission with bitwies or and the constants from stat modul
    os.chmod(file_path, stat.S_IRUSR | stat.S_IWUSR|  stat.S_IXUSR | \
            stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH)

# global vars are very bad but I think the only solution for this
项目:syntaxnet_wrapper    作者:livingbio    | 项目源码 | 文件源码
def make_pidfile(self):
        if not os.path.isdir(PIDFILE_PATH):
            os.mkdir(PIDFILE_PATH)
        pidfilename = os.path.join(PIDFILE_PATH, "{}.pid".format(self.name))
        for fn in os.listdir(PIDFILE_PATH):
            if not fn.endswith('.pid') or fn.count('_') != 2:
                continue
            pid, model, clsname = fn.split('_')
            if clsname == self.__class__.__name__ + '.pid' and model == self.model_name:
                self.kill_process(fn)
        with open(pidfilename, 'w+') as f:
            f.write(str(self.process.pid))
        os.chmod(pidfilename, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IWGRP | stat.S_IROTH | stat.S_IWOTH)
项目:anarel-manage    作者:slaclab    | 项目源码 | 文件源码
def checkFixPermissionsForPath(pth):
    mode = os.stat(pth).st_mode
    groupReadable = stat.S_IRGRP & mode == stat.S_IRGRP
    otherReadable = stat.S_IROTH & mode == stat.S_IROTH
    msg = ''
    if not groupReadable:
        msg += ' not group readable'
        mode |= stat.S_IRGRP
    if not otherReadable:
        msg += ' not other readable'
        mode |= stat.S_IROTH
    if len(msg.strip())>0:
        print("%s, fixing permissions for %s" % (msg, pth))
        os.chmod(pth, mode)
项目:labgrid    作者:labgrid-project    | 项目源码 | 文件源码
def test_consoleloggingreporter_dir_not_writeable(consolelogger, serial_driver, tmpdir):
    os.chmod(str(tmpdir), stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
    def return_test(self, size=1, timeout=0.0):
        return b"test"
    serial_driver.serial.in_waiting = 4
    serial_driver.serial.read = return_test
    serial_driver.read()
项目:obsoleted-vpduserv    作者:InfraSIM    | 项目源码 | 文件源码
def is_executable_file(path):
    """Checks that path is an executable regular file (or a symlink to a file).

    This is roughly ``os.path isfile(path) and os.access(path, os.X_OK)``, but
    on some platforms :func:`os.access` gives us the wrong answer, so this
    checks permission bits directly.
    """
    # follow symlinks,
    fpath = os.path.realpath(path)

    # return False for non-files (directories, fifo, etc.)
    if not os.path.isfile(fpath):
        return False

    # On Solaris, etc., "If the process has appropriate privileges, an
    # implementation may indicate success for X_OK even if none of the
    # execute file permission bits are set."
    #
    # For this reason, it is necessary to explicitly check st_mode

    # get file mode using os.stat, and check if `other',
    # that is anybody, may read and execute.
    mode = os.stat(fpath).st_mode
    if mode & stat.S_IROTH and mode & stat.S_IXOTH:
        return True

    # get current user's group ids, and check if `group',
    # when matching ours, may read and execute.
    user_gids = os.getgroups() + [os.getgid()]
    if (os.stat(fpath).st_gid in user_gids and
            mode & stat.S_IRGRP and mode & stat.S_IXGRP):
        return True

    # finally, if file owner matches our effective userid,
    # check if `user', may read and execute.
    user_gids = os.getgroups() + [os.getgid()]
    if (os.stat(fpath).st_uid == os.geteuid() and
            mode & stat.S_IRUSR and mode & stat.S_IXUSR):
        return True

    return False
项目:azure-cli    作者:Azure    | 项目源码 | 文件源码
def test_set_config_value_file_permissions(self):
        with mock.patch('azure.cli.core._config.GLOBAL_CONFIG_DIR', self.config_dir), \
        mock.patch('azure.cli.core._config.GLOBAL_CONFIG_PATH', self.config_path):
            set_global_config_value('test_section', 'test_option', 'a_value')
            file_mode = os.stat(self.config_path).st_mode
            self.assertTrue(bool(file_mode & stat.S_IRUSR))
            self.assertTrue(bool(file_mode & stat.S_IWUSR))
            self.assertFalse(bool(file_mode & stat.S_IXUSR))
            self.assertFalse(bool(file_mode & stat.S_IRGRP))
            self.assertFalse(bool(file_mode & stat.S_IWGRP))
            self.assertFalse(bool(file_mode & stat.S_IXGRP))
            self.assertFalse(bool(file_mode & stat.S_IROTH))
            self.assertFalse(bool(file_mode & stat.S_IWOTH))
            self.assertFalse(bool(file_mode & stat.S_IXOTH))
项目:l3overlay    作者:catalyst    | 项目源码 | 文件源码
def __init__(self, log, log_level, logger_name, logger_section=None):
        '''
        Set up the logger runtime state.
        '''

        super().__init__()

        self.log_file = log
        self.log_level = log_level
        self.logger_name = logger_name
        self.logger_section = logger_section

        self.name = "%s-%s" % (logger_name, logger_section) if logger_section else logger_name
        self.description = "logger '%s'" % self.name

        logf = str.format(
            "%(asctime)s {0}: <{1}> [%(levelname)s] %(message)s",
            logger_name,
            logger_section,
        ) if logger_section else "%(asctime)s %(name)s: [%(levelname)s] %(message)s"

        self.logger_formatter = logging.Formatter(logf)

        if self.log_file:
            util.directory_create(os.path.dirname(self.log_file))
            self.logger_handler = logging.FileHandler(self.log_file)
            os.chmod(self.log_file, stat.S_IRUSR|stat.S_IWUSR|stat.S_IRGRP|stat.S_IROTH)
        else:
            self.logger_handler = logging.NullHandler()

        self.logger_handler.setFormatter(self.logger_formatter)

        # Initialised in start().
        self.logger = None
项目:treadmill    作者:Morgan-Stanley    | 项目源码 | 文件源码
def _rename_file(src, dst):
    """Rename the specified file"""

    if os.name == 'nt':
        # TODO: check that fs.rm_safe works on windows, and if not, fix
        #       fs.rm_safe.
        fs.rm_safe(dst)
    os.rename(src, dst)
    mode = os.stat(dst).st_mode
    mode |= (stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
    if _is_executable(dst):
        mode |= (stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)
    os.chmod(dst, mode)