我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用stat.S_ISSOCK。
def __init__(self, loop, pipe, protocol, waiter=None, extra=None): super().__init__(extra) self._extra['pipe'] = pipe self._loop = loop self._pipe = pipe self._fileno = pipe.fileno() mode = os.fstat(self._fileno).st_mode if not (stat.S_ISFIFO(mode) or stat.S_ISSOCK(mode) or stat.S_ISCHR(mode)): raise ValueError("Pipe transport is for pipes/sockets only.") _set_nonblocking(self._fileno) self._protocol = protocol self._closing = False self._loop.call_soon(self._protocol.connection_made, self) # only start reading when connection_made() has been called self._loop.call_soon(self._loop.add_reader, self._fileno, self._read_ready) if waiter is not None: # only wake up the waiter when connection_made() has been called self._loop.call_soon(waiter._set_result_unless_cancelled, None)
def _check_systemd(self): if self._user: systemd_socket = '/run/user/{}/systemd/private'.format(self._uid) else: systemd_socket = '/run/systemd/private' try: if not os.path.exists(systemd_socket) and stat.S_ISSOCK(os.stat(systemd_socket).st_mode): self._systemd_error = 'No systemd socket found' return except OSError as e: self._systemd_error = str(e) return self._is_systemd = self._check_systemd_reval([]) if not self._is_systemd: self._systemd_error = 'Systemd is not available or not working properly'
def is_special_file(path): """ This function checks to see if a special file. It checks if the file is a character special device, block special device, FIFO, or socket. """ mode = os.stat(path).st_mode # Character special device. if stat.S_ISCHR(mode): return True # Block special device if stat.S_ISBLK(mode): return True # FIFO. if stat.S_ISFIFO(mode): return True # Socket. if stat.S_ISSOCK(mode): return True return False
def __init__(self, addr, conf, log, fd=None): if fd is None: try: st = os.stat(addr) except OSError as e: if e.args[0] != errno.ENOENT: raise else: if stat.S_ISSOCK(st.st_mode): os.remove(addr) else: raise ValueError("%r is not a socket" % addr) self.parent = os.getpid() super(UnixSocket, self).__init__(addr, conf, log, fd=fd) # each arbiter grabs a shared lock on the unix socket. fcntl.lockf(self.sock, fcntl.LOCK_SH | fcntl.LOCK_NB)
def __init__(self, loop, pipe, protocol, waiter=None, extra=None): super().__init__(extra) self._extra['pipe'] = pipe self._loop = loop self._pipe = pipe self._fileno = pipe.fileno() mode = os.fstat(self._fileno).st_mode if not (stat.S_ISFIFO(mode) or stat.S_ISSOCK(mode) or stat.S_ISCHR(mode)): raise ValueError("Pipe transport is for pipes/sockets only.") _set_nonblocking(self._fileno) self._protocol = protocol self._closing = False self._loop.call_soon(self._protocol.connection_made, self) # only start reading when connection_made() has been called self._loop.call_soon(self._loop.add_reader, self._fileno, self._read_ready) if waiter is not None: # only wake up the waiter when connection_made() has been called self._loop.call_soon(futures._set_result_unless_cancelled, waiter, None)
def test_pathServer(self): """ The I{--path} option to L{makeService} causes it to return a service which will listen on the server address given by the I{--port} option. """ path = FilePath(self.mktemp()) path.makedirs() port = self.mktemp() options = Options() options.parseOptions(['--port', 'unix:' + port, '--path', path.path]) service = makeService(options) service.startService() self.addCleanup(service.stopService) self.assertIsInstance(service.services[0].factory.resource, File) self.assertEqual(service.services[0].factory.resource.path, path.path) self.assertTrue(os.path.exists(port)) self.assertTrue(stat.S_ISSOCK(os.stat(port).st_mode))
def __init__(self, loop, pipe, protocol, waiter=None, extra=None): super().__init__(extra) self._extra['pipe'] = pipe self._loop = loop self._pipe = pipe self._fileno = pipe.fileno() mode = os.fstat(self._fileno).st_mode if not (stat.S_ISFIFO(mode) or stat.S_ISSOCK(mode) or stat.S_ISCHR(mode)): raise ValueError("Pipe transport is for pipes/sockets only.") _set_nonblocking(self._fileno) self._protocol = protocol self._closing = False self._loop.add_reader(self._fileno, self._read_ready) self._loop.call_soon(self._protocol.connection_made, self) if waiter is not None: # wait until protocol.connection_made() has been called self._loop.call_soon(waiter._set_result_unless_cancelled, None)
def _ftypelet(mode): if stat.S_ISREG(mode) or not stat.S_IFMT(mode): return '-' if stat.S_ISBLK(mode): return 'b' if stat.S_ISCHR(mode): return 'c' if stat.S_ISDIR(mode): return 'd' if stat.S_ISFIFO(mode): return 'p' if stat.S_ISLNK(mode): return 'l' if stat.S_ISSOCK(mode): return 's' return '?'
def bind_unix_socket(file, mode=0o600, backlog=_DEFAULT_BACKLOG): """Creates a listening unix socket. If a socket with the given name already exists, it will be deleted. If any other file with that name exists, an exception will be raised. Returns a socket object (not a list of socket objects like `bind_sockets`) """ sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) set_close_exec(sock.fileno()) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.setblocking(0) try: st = os.stat(file) except OSError as err: if errno_from_exception(err) != errno.ENOENT: raise else: if stat.S_ISSOCK(st.st_mode): os.remove(file) else: raise ValueError("File %s exists and is not a socket", file) sock.bind(file) os.chmod(file, mode) sock.listen(backlog) return sock
def supervisor_is_running(): import stat try: return stat.S_ISSOCK(os.stat('/tmp/vpnsupervisor.sock').st_mode) except: return False
def __init__(self, loop, pipe, protocol, waiter=None, extra=None): super().__init__(extra, loop) self._extra['pipe'] = pipe self._pipe = pipe self._fileno = pipe.fileno() mode = os.fstat(self._fileno).st_mode is_socket = stat.S_ISSOCK(mode) if not (is_socket or stat.S_ISFIFO(mode) or stat.S_ISCHR(mode)): raise ValueError("Pipe transport is only for " "pipes, sockets and character devices") _set_nonblocking(self._fileno) self._protocol = protocol self._buffer = [] self._conn_lost = 0 self._closing = False # Set when close() or write_eof() called. self._loop.call_soon(self._protocol.connection_made, self) # On AIX, the reader trick (to be notified when the read end of the # socket is closed) only works for sockets. On other platforms it # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.) if is_socket or not sys.platform.startswith("aix"): # only start reading when connection_made() has been called self._loop.call_soon(self._loop.add_reader, self._fileno, self._read_ready) if waiter is not None: # only wake up the waiter when connection_made() has been called self._loop.call_soon(waiter._set_result_unless_cancelled, None)
def is_special_file(cls, filename): """Checks to see if a file is a special UNIX file. It checks if the file is a character special device, block special device, FIFO, or socket. :param filename: Name of the file :returns: True if the file is a special file. False, if is not. """ # If it does not exist, it must be a new file so it cannot be # a special file. if not os.path.exists(filename): return False mode = os.stat(filename).st_mode # Character special device. if stat.S_ISCHR(mode): return True # Block special device if stat.S_ISBLK(mode): return True # Named pipe / FIFO if stat.S_ISFIFO(mode): return True # Socket. if stat.S_ISSOCK(mode): return True return False
def interfaces(self): """Get the wifi interface lists.""" ifaces = [] for f in os.listdir(CTRL_IFACE_DIR): sock_file = '/'.join([CTRL_IFACE_DIR, f]) mode = os.stat(sock_file).st_mode if stat.S_ISSOCK(mode): iface = {} iface['name'] = f ifaces.append(iface) self._connect_to_wpa_s(f) return ifaces
def _remove_existed_sock(self, sock_file): if os.path.exists(sock_file): mode = os.stat(sock_file).st_mode if stat.S_ISSOCK(mode): os.remove(sock_file)
def authorization_proxy(options): auth_sock = start_agent() # Remove auth sock if is a socket try: stat_info = os.stat(options.socket) if stat.S_ISSOCK(stat_info.st_mode): os.remove(options.socket) else: LOG.error("auth socket %s exist", options.socket) except OSError: pass # Open socket with permissive permissions, socket permission enforcement # isn't consistant across operating systems, so administrator should # enforce permissions on enclosing directory saved_umask = os.umask(0) server = AuthorizationProxy(options.socket, options.database, auth_sock) os.umask(saved_umask) # Remove listening socket at exit atexit.register(lambda: os.remove(options.socket)) def sighup_handler(signo, frame): server.authorizer.reload() signal.signal(signal.SIGHUP, sighup_handler) server.serve_forever()
def __init__(self, addr, conf, log, fd=None): if fd is None: try: st = os.stat(addr) except OSError as e: if e.args[0] != errno.ENOENT: raise else: if stat.S_ISSOCK(st.st_mode): os.remove(addr) else: raise ValueError("%r is not a socket" % addr) self.parent = os.getpid() super(UnixSocket, self).__init__(addr, conf, log, fd=fd)
def createClient(): myclient = None # Find docker-machine environment variables docker_host = os.getenv("DOCKER_HOST") docker_cert_path = os.getenv("DOCKER_CERT_PATH") docker_machine_name = os.getenv("DOCKER_MACHINE_NAME") # Look for linux docker socket file path = "/var/run/docker.sock" isSocket = False if os.path.exists(path): mode = os.stat(path).st_mode isSocket = stat.S_ISSOCK(mode) if isSocket: docker_socket_file ="unix://"+path myclient = client.scClient(base_url=docker_socket_file, version="auto") return myclient elif (docker_host and docker_cert_path and docker_machine_name): tls_config = tls.TLSConfig( client_cert=(os.path.join(docker_cert_path, 'cert.pem'), os.path.join(docker_cert_path,'key.pem')), ca_cert=os.path.join(docker_cert_path, 'ca.pem'), verify=True, assert_hostname = False ) docker_host_https = docker_host.replace("tcp","https") myclient = client.scClient(base_url=docker_host_https, tls=tls_config, version="auto") return myclient # If we fall through, myclient is set to none and we should fail. return myclient
def _init_socket(self): sock = socket.socket(socket.AF_UNIX, type=socket.SOCK_STREAM) if (not os.path.exists(self._socket_path) or not stat.S_ISSOCK(os.stat(self._socket_path).st_mode)): logger.error("File %s doesn't exist or isn't a socket. Is Parsec Core running?" % self._socket_path) sys.exit(1) sock.connect(self._socket_path) logger.debug('Init socket') self._socket = sock
def access_log_dest(self): """ Return the destination for access logs If we're running this as a systemd service, systemd makes /dev/stdout and friends be sockets that lead into journald, rather than files. This confuses nginx, which expects to `open()` them like normal files. nginx supports 'stderr' as a directive for error_log, but *not* access log. Since we still want to get them out into stderr, we hack this in. If /dev/stdout is a socket, we then write to syslog. JournalD also picks up syslog, so the end result is the same from an admin's pov. Just uglier and hackier I guess the 'right' thing here is for nginx to support `stderr` as a target in access_log just like for `error_log` """ path = '/dev/stdout' if stat.S_ISSOCK(os.stat(path).st_mode): return 'syslog:server=unix:/dev/log,nohostname' return path # performance tuning!
def statinfo(st): return { 'mode' : "%04o" % stat.S_IMODE(st.st_mode), 'isdir' : stat.S_ISDIR(st.st_mode), 'ischr' : stat.S_ISCHR(st.st_mode), 'isblk' : stat.S_ISBLK(st.st_mode), 'isreg' : stat.S_ISREG(st.st_mode), 'isfifo' : stat.S_ISFIFO(st.st_mode), 'islnk' : stat.S_ISLNK(st.st_mode), 'issock' : stat.S_ISSOCK(st.st_mode), 'uid' : st.st_uid, 'gid' : st.st_gid, 'size' : st.st_size, 'inode' : st.st_ino, 'dev' : st.st_dev, 'nlink' : st.st_nlink, 'atime' : st.st_atime, 'mtime' : st.st_mtime, 'ctime' : st.st_ctime, 'wusr' : bool(st.st_mode & stat.S_IWUSR), 'rusr' : bool(st.st_mode & stat.S_IRUSR), 'xusr' : bool(st.st_mode & stat.S_IXUSR), 'wgrp' : bool(st.st_mode & stat.S_IWGRP), 'rgrp' : bool(st.st_mode & stat.S_IRGRP), 'xgrp' : bool(st.st_mode & stat.S_IXGRP), 'woth' : bool(st.st_mode & stat.S_IWOTH), 'roth' : bool(st.st_mode & stat.S_IROTH), 'xoth' : bool(st.st_mode & stat.S_IXOTH), 'isuid' : bool(st.st_mode & stat.S_ISUID), 'isgid' : bool(st.st_mode & stat.S_ISGID), }
def __countFiles(self, which=None): # this will count links AND sockets #self.total_files = sum((len(f) for _, _, f in os.walk(self.cwd))) total_files = 0 i = 0 for path, _, filenames in os.walk(self.cwd): for filename in filenames: if i >= len(self.wait_symbols): i = 0 print 'Counting Files.... %s \r' % self.wait_symbols[i], i += 1 filename_path = os.path.join(path, filename) if os.path.islink(filename_path): # TODO: may be we should use 'and' instead of 'or' ?? if self.cwd not in os.path.realpath(filename_path) or not self.args.symlinks: continue if os.path.exists(filename_path) and os.access(filename_path, os.R_OK) and not S_ISSOCK(os.stat(filename_path).st_mode): total_files += 1 if which == 'dst': print 'Counting DST Files.....Done \r' elif which == 'src': print 'Counting SRC Files.....Done \r' else: print 'Counting Files.........Done \r' return total_files # ------------------------------------------------------------------
def __init__(self, addr, conf, log, fd=None): if fd is None: try: st = os.stat(addr) except OSError as e: if e.args[0] != errno.ENOENT: raise else: if stat.S_ISSOCK(st.st_mode): os.remove(addr) else: raise ValueError("%r is not a socket" % addr) super(UnixSocket, self).__init__(addr, conf, log, fd=fd)
def is_socket(self): """ Whether this path is a socket. """ try: return S_ISSOCK(self.stat().st_mode) except OSError as e: if e.errno != ENOENT: raise # Path doesn't exist or is a broken symlink # (see https://bitbucket.org/pitrou/pathlib/issue/12/) return False
def is_unix_socket(path): """ Check if path is a valid UNIX socket. Arguments: path (str): A file name path Returns: True if path is a valid UNIX socket otherwise False. """ mode = os.stat(path).st_mode return stat.S_ISSOCK(mode)
def bind_unix_socket(file, mode=0600, backlog=128): """Creates a listening unix socket. If a socket with the given name already exists, it will be deleted. If any other file with that name exists, an exception will be raised. Returns a socket object (not a list of socket objects like `bind_sockets`) """ sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) set_close_exec(sock.fileno()) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.setblocking(0) try: st = os.stat(file) except OSError, err: if err.errno != errno.ENOENT: raise else: if stat.S_ISSOCK(st.st_mode): os.remove(file) else: raise ValueError("File %s exists and is not a socket", file) sock.bind(file) os.chmod(file, mode) sock.listen(backlog) return sock
def __init__(self, loop, pipe, protocol, waiter=None, extra=None): super().__init__(extra) self._extra['pipe'] = pipe self._loop = loop self._pipe = pipe self._fileno = pipe.fileno() self._protocol = protocol self._closing = False mode = os.fstat(self._fileno).st_mode if not (stat.S_ISFIFO(mode) or stat.S_ISSOCK(mode) or stat.S_ISCHR(mode)): self._pipe = None self._fileno = None self._protocol = None raise ValueError("Pipe transport is for pipes/sockets only.") _set_nonblocking(self._fileno) self._loop.call_soon(self._protocol.connection_made, self) # only start reading when connection_made() has been called self._loop.call_soon(self._loop._add_reader, self._fileno, self._read_ready) if waiter is not None: # only wake up the waiter when connection_made() has been called self._loop.call_soon(futures._set_result_unless_cancelled, waiter, None)
def __init__(self, loop, pipe, protocol, waiter=None, extra=None): super().__init__(extra, loop) self._extra['pipe'] = pipe self._pipe = pipe self._fileno = pipe.fileno() self._protocol = protocol self._buffer = bytearray() self._conn_lost = 0 self._closing = False # Set when close() or write_eof() called. mode = os.fstat(self._fileno).st_mode is_char = stat.S_ISCHR(mode) is_fifo = stat.S_ISFIFO(mode) is_socket = stat.S_ISSOCK(mode) if not (is_char or is_fifo or is_socket): self._pipe = None self._fileno = None self._protocol = None raise ValueError("Pipe transport is only for " "pipes, sockets and character devices") _set_nonblocking(self._fileno) self._loop.call_soon(self._protocol.connection_made, self) # On AIX, the reader trick (to be notified when the read end of the # socket is closed) only works for sockets. On other platforms it # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.) if is_socket or (is_fifo and not sys.platform.startswith("aix")): # only start reading when connection_made() has been called self._loop.call_soon(self._loop._add_reader, self._fileno, self._read_ready) if waiter is not None: # only wake up the waiter when connection_made() has been called self._loop.call_soon(futures._set_result_unless_cancelled, waiter, None)
def bind_unix_socket(file, mode=0o600, backlog=128): """Creates a listening unix socket. If a socket with the given name already exists, it will be deleted. If any other file with that name exists, an exception will be raised. Returns a socket object (not a list of socket objects like `bind_sockets`) """ sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) set_close_exec(sock.fileno()) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.setblocking(0) try: st = os.stat(file) except OSError as err: if err.errno != errno.ENOENT: raise else: if stat.S_ISSOCK(st.st_mode): os.remove(file) else: raise ValueError("File %s exists and is not a socket", file) sock.bind(file) os.chmod(file, mode) sock.listen(backlog) return sock
def __init__(self, loop, pipe, protocol, waiter=None, extra=None): super().__init__(extra, loop) self._extra['pipe'] = pipe self._pipe = pipe self._fileno = pipe.fileno() mode = os.fstat(self._fileno).st_mode is_socket = stat.S_ISSOCK(mode) if not (is_socket or stat.S_ISFIFO(mode) or stat.S_ISCHR(mode)): raise ValueError("Pipe transport is only for " "pipes, sockets and character devices") _set_nonblocking(self._fileno) self._protocol = protocol self._buffer = [] self._conn_lost = 0 self._closing = False # Set when close() or write_eof() called. self._loop.call_soon(self._protocol.connection_made, self) # On AIX, the reader trick (to be notified when the read end of the # socket is closed) only works for sockets. On other platforms it # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.) if is_socket or not sys.platform.startswith("aix"): # only start reading when connection_made() has been called self._loop.call_soon(self._loop.add_reader, self._fileno, self._read_ready) if waiter is not None: # only wake up the waiter when connection_made() has been called self._loop.call_soon(futures._set_result_unless_cancelled, waiter, None)
def is_socket(self): """ Whether this path is a socket. """ try: return S_ISSOCK(self.stat().st_mode) except OSError as e: if e.errno not in (ENOENT, ENOTDIR): raise # Path doesn't exist or is a broken symlink # (see https://bitbucket.org/pitrou/pathlib/issue/12/) return False
def set_from_stat(self): """Set the value of self.type, self.mode from self.stat""" if not self.stat: self.type = None st_mode = self.stat.st_mode if stat.S_ISREG(st_mode): self.type = "reg" elif stat.S_ISDIR(st_mode): self.type = "dir" elif stat.S_ISLNK(st_mode): self.type = "sym" elif stat.S_ISFIFO(st_mode): self.type = "fifo" elif stat.S_ISSOCK(st_mode): raise PathException(util.ufn(self.get_relative_path()) + u"is a socket, unsupported by tar") self.type = "sock" elif stat.S_ISCHR(st_mode): self.type = "chr" elif stat.S_ISBLK(st_mode): self.type = "blk" else: raise PathException("Unknown type") self.mode = stat.S_IMODE(st_mode) if self.type in ("chr", "blk"): try: self.devnums = (os.major(self.stat.st_rdev), os.minor(self.stat.st_rdev)) except: log.Warn(_("Warning: %s invalid devnums (0x%X), treating as (0, 0).") % (util.ufn(self.get_relative_path()), self.stat.st_rdev)) self.devnums = (0, 0)