Python socket 模块,SOCK_CLOEXEC 实例源码

我们从Python开源项目中,提取了以下13个代码示例,用于说明如何使用socket.SOCK_CLOEXEC

项目:mauzr    作者:eqrx    | 项目源码 | 文件源码
def __init__(self, core):
        self._address = os.environ.get('NOTIFY_SOCKET', None)
        if self._address[0] == "@":
            self._address = '\0' + self._address[1:]
        self._socket = socket.socket(socket.AF_UNIX,
                                     socket.SOCK_DGRAM | socket.SOCK_CLOEXEC)
        self._watchdog_task = core.scheduler(self._watchdog, 5000,
                                             single=False)
        self._watchdog_task.enable(instant=True)
        self._start_task = core.scheduler(self._start, 0, single=True)
        self._core = core
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_SOCK_CLOEXEC(self):
        v = linux_version()
        if v < (2, 6, 28):
            self.skipTest("Linux kernel 2.6.28 or higher required, not %s"
                          % ".".join(map(str, v)))
        with socket.socket(socket.AF_INET,
                           socket.SOCK_STREAM | socket.SOCK_CLOEXEC) as s:
            self.assertTrue(s.type & socket.SOCK_CLOEXEC)
            self.assertTrue(fcntl.fcntl(s, fcntl.F_GETFD) & fcntl.FD_CLOEXEC)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_SOCK_CLOEXEC(self):
        with socket.socket(socket.AF_INET,
                           socket.SOCK_STREAM | socket.SOCK_CLOEXEC) as s:
            self.assertTrue(s.type & socket.SOCK_CLOEXEC)
            self.assertTrue(fcntl.fcntl(s, fcntl.F_GETFD) & fcntl.FD_CLOEXEC)
项目:rpi-rx5808-stream    作者:xythobuz    | 项目源码 | 文件源码
def notify_socket(clean_environment=True):
    """Return a tuple of address, socket for future use.
    clean_environment removes the variables from env to prevent children
    from inheriting it and doing something wrong.
    """
    _empty = None, None
    address = os.environ.get("NOTIFY_SOCKET", None)
    if clean_environment:
        address = os.environ.pop("NOTIFY_SOCKET", None)

    if not address:
        return _empty

    if len(address) == 1:
        return _empty

    if address[0] not in ("@", "/"):
        return _empty

    if address[0] == "@":
        address = "\0" + address[1:]

    # SOCK_CLOEXEC was added in Python 3.2 and requires Linux >= 2.6.27.
    # It means "close this socket after fork/exec()
    try:
        sock = socket.socket(socket.AF_UNIX,
                             socket.SOCK_DGRAM | socket.SOCK_CLOEXEC)
    except AttributeError:
        sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)

    return address, sock
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def testNewAttributes(self):
        # testing .family, .type and .protocol

        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.assertEqual(sock.family, socket.AF_INET)
        if hasattr(socket, 'SOCK_CLOEXEC'):
            self.assertIn(sock.type,
                          (socket.SOCK_STREAM | socket.SOCK_CLOEXEC,
                           socket.SOCK_STREAM))
        else:
            self.assertEqual(sock.type, socket.SOCK_STREAM)
        self.assertEqual(sock.proto, 0)
        sock.close()
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_SOCK_CLOEXEC(self):
        with socket.socket(socket.AF_INET,
                           socket.SOCK_STREAM | socket.SOCK_CLOEXEC) as s:
            self.assertTrue(s.type & socket.SOCK_CLOEXEC)
            self.assertFalse(s.get_inheritable())
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_create_socket(self):
        s = asyncore.dispatcher()
        s.create_socket(self.family)
        self.assertEqual(s.socket.family, self.family)
        SOCK_NONBLOCK = getattr(socket, 'SOCK_NONBLOCK', 0)
        sock_type = socket.SOCK_STREAM | SOCK_NONBLOCK
        if hasattr(socket, 'SOCK_CLOEXEC'):
            self.assertIn(s.socket.type,
                          (sock_type | socket.SOCK_CLOEXEC, sock_type))
        else:
            self.assertEqual(s.socket.type, sock_type)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_check_resolved_sock_type(self):
        # Ensure we ignore extra flags in sock.type.
        if hasattr(socket, 'SOCK_NONBLOCK'):
            sock = socket.socket(type=socket.SOCK_STREAM | socket.SOCK_NONBLOCK)
            with sock:
                base_events._check_resolved_address(sock, ('1.2.3.4', 1))

        if hasattr(socket, 'SOCK_CLOEXEC'):
            sock = socket.socket(type=socket.SOCK_STREAM | socket.SOCK_CLOEXEC)
            with sock:
                base_events._check_resolved_address(sock, ('1.2.3.4', 1))
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def _check_resolved_address(sock, address):
    # Ensure that the address is already resolved to avoid the trap of hanging
    # the entire event loop when the address requires doing a DNS lookup.
    family = sock.family
    if family == socket.AF_INET:
        host, port = address
    elif family == socket.AF_INET6:
        host, port = address[:2]
    else:
        return

    type_mask = 0
    if hasattr(socket, 'SOCK_NONBLOCK'):
        type_mask |= socket.SOCK_NONBLOCK
    if hasattr(socket, 'SOCK_CLOEXEC'):
        type_mask |= socket.SOCK_CLOEXEC
    # Use getaddrinfo(flags=AI_NUMERICHOST) to ensure that the address is
    # already resolved.
    try:
        socket.getaddrinfo(host, port,
                           family=family,
                           type=(sock.type & ~type_mask),
                           proto=sock.proto,
                           flags=socket.AI_NUMERICHOST)
    except socket.gaierror as err:
        raise ValueError("address must be resolved (IP address), got %r: %s"
                         % (address, err))
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def testNewAttributes(self):
        # testing .family, .type and .protocol

        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.assertEqual(sock.family, socket.AF_INET)
        if hasattr(socket, 'SOCK_CLOEXEC'):
            self.assertIn(sock.type,
                          (socket.SOCK_STREAM | socket.SOCK_CLOEXEC,
                           socket.SOCK_STREAM))
        else:
            self.assertEqual(sock.type, socket.SOCK_STREAM)
        self.assertEqual(sock.proto, 0)
        sock.close()
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_SOCK_CLOEXEC(self):
        with socket.socket(socket.AF_INET,
                           socket.SOCK_STREAM | socket.SOCK_CLOEXEC) as s:
            self.assertTrue(s.type & socket.SOCK_CLOEXEC)
            self.assertFalse(s.get_inheritable())
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_create_socket(self):
        s = asyncore.dispatcher()
        s.create_socket(self.family)
        self.assertEqual(s.socket.family, self.family)
        SOCK_NONBLOCK = getattr(socket, 'SOCK_NONBLOCK', 0)
        sock_type = socket.SOCK_STREAM | SOCK_NONBLOCK
        if hasattr(socket, 'SOCK_CLOEXEC'):
            self.assertIn(s.socket.type,
                          (sock_type | socket.SOCK_CLOEXEC, sock_type))
        else:
            self.assertEqual(s.socket.type, sock_type)
项目:python-ptrace    作者:vstinner    | 项目源码 | 文件源码
def formatSocketType(argument):
    value = argument.value
    text = []
    if hasattr(socket, 'SOCK_CLOEXEC'):
        cloexec = value & socket.SOCK_CLOEXEC
        value &= ~socket.SOCK_CLOEXEC
    else:
        cloexec = False
    text = SOCKET_TYPE.get(value, str(value))
    if cloexec:
        text += '|SOCK_CLOEXEC'
    return text