Python socket 模块,IPV6_JOIN_GROUP 实例源码

我们从Python开源项目中,提取了以下4个代码示例,用于说明如何使用socket.IPV6_JOIN_GROUP

项目:vagus    作者:privacore    | 项目源码 | 文件源码
def setup_ipv6_multicast_socket(ifaddrs, if_name, addr):
    #todo: if_name ignored
    s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
    s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    s.bind(("", Config.udp_multicast.port))

    s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, Config.udp_multicast.ttl)
    s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_LOOP, 0)

    mreq = struct.pack("16s16s", socket.inet_pton(socket.AF_INET6,addr), chr(0)*16)

    s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq)

    s.setblocking(0)

    multicast_socket_ipv6.append((s,addr))

    return True
项目:maas    作者:maas    | 项目源码 | 文件源码
def join_ipv6_beacon_group(sock, ifindex):
    """Joins the MAAS IPv6 multicast group using the specified UDP socket.

    :param sock: An opened IPv6 UDP socket.
    :param ifindex: The interface index that should join the multicast group.
    """
    # XXX mpontillo 2017-06-21: Twisted doesn't support IPv6 here yet.
    # It would be nice to do this:
    # transport.joinGroup(BEACON_IPV6_MULTICAST)
    ipv6_join_sockopt_args = (
        socket.inet_pton(socket.AF_INET6, BEACON_IPV6_MULTICAST) +
        struct.pack("I", ifindex)
    )
    try:
        sock.setsockopt(
            socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP,
            ipv6_join_sockopt_args)
    except OSError:
        # Do this on a best-effort basis. We might get an "Address already in
        # use" error if the group is already joined, or (for whatever reason)
        # it is not possible to join a multicast group using this interface.
        pass
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def receiver(group):
    # Look up multicast group address in name server and find out IP version
    addrinfo = socket.getaddrinfo(group, None)[0]

    # Create a socket
    s = socket.socket(addrinfo[0], socket.SOCK_DGRAM)

    # Allow multiple copies of this program on one machine
    # (not strictly needed)
    s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

    # Bind it to the port
    s.bind(('', MYPORT))

    group_bin = socket.inet_pton(addrinfo[0], addrinfo[4][0])
    # Join group
    if addrinfo[0] == socket.AF_INET: # IPv4
        mreq = group_bin + struct.pack('=I', socket.INADDR_ANY)
        s.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
    else:
        mreq = group_bin + struct.pack('@I', 0)
        s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq)

    # Loop, printing any data we receive
    while True:
        data, sender = s.recvfrom(1500)
        while data[-1:] == '\0': data = data[:-1] # Strip trailing \0's
        print (str(sender) + '  ' + repr(data))
项目:multicastclient    作者:MickMack1983    | 项目源码 | 文件源码
def __init__(self, clientId, port, addr):
        self.PORT = port  # 26000
        self.ADDR = addr  # 'ff01::1' #IPV6 Multicast Address
        self.clientId = clientId
        self.closing = False

        addrInfo = socket.getaddrinfo(self.ADDR, None)[0]
        self.socket = socket.socket(addrInfo[0], socket.SOCK_DGRAM)
        self.socket.setsockopt(IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, 1)
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.socket.bind(('', self.PORT))

        #Join Multicast grp.
        group = socket.inet_pton(addrInfo[0], addrInfo[4][0])
        mreq = group + struct.pack('@I', 0)
        self.socket.setsockopt(IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq)

        self.__registeredBusInterfaces = dict()
        self.subPatterns = dict()
        self.subPatternLock = Lock()
        self.requestQueues = dict()
        self.inboxLock = Lock()
        self.inbox = []
        if not os.name == 'nt':
            r, w = os.pipe()
            self.sigKill = os.fdopen(w, 'w')
            self.isKilled = os.fdopen(r, 'r')