Python socket 模块,getsockname() 实例源码

我们从Python开源项目中,提取了以下8个代码示例,用于说明如何使用socket.getsockname()

项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def findFreePort(interface='127.0.0.1', family=socket.AF_INET,
                 type=socket.SOCK_STREAM):
    """
    Ask the platform to allocate a free port on the specified interface, then
    release the socket and return the address which was allocated.

    @param interface: The local address to try to bind the port on.
    @type interface: C{str}

    @param type: The socket type which will use the resulting port.

    @return: A two-tuple of address and port, like that returned by
        L{socket.getsockname}.
    """
    addr = socket.getaddrinfo(interface, 0)[0][4]
    probe = socket.socket(family, type)
    try:
        probe.bind(addr)
        return probe.getsockname()
    finally:
        probe.close()
项目:PyCIP    作者:cpchrispye    | 项目源码 | 文件源码
def __init__(self, socket, proto):
        self.host = socket.getsockname()
        self.peer = socket.getpeername()
        self.protocall = proto
        self.recevied_time = time.time()
项目:systemfixtures    作者:testing-cabal    | 项目源码 | 文件源码
def get_port(socket):
    """Return the port to which a socket is bound."""
    addr, port = socket.getsockname()
    return port
项目:ops_agent    作者:sjqzhang    | 项目源码 | 文件源码
def handle_request(self, socket, address):
        ip, port = socket.getsockname()

        plugin = self.message_mapping.get(port)
        if plugin:
            session = EasySession(socket)
            plugin.handle_request(session)
            session.close()

    # ??unix_socket??
项目:ops_agent    作者:sjqzhang    | 项目源码 | 文件源码
def handle_uds_request(self, socket, address):
        name = socket.getsockname()

        plugin = self.message_mapping.get(name)
        if plugin:
            session = EasySession(socket)
            plugin.handle_request(session)
            session.close()

    # ??????
项目:maas    作者:maas    | 项目源码 | 文件源码
def get_port(socket):
    """Return the port to which a socket is bound."""
    addr, port = socket.getsockname()
    return port
项目:P2PChat    作者:whcacademy    | 项目源码 | 文件源码
def do_Send():
    global currentState, user
    inputData = userentry.get()
    if len(inputData.strip(' ')) == 0:
        print('Detect nothing to send!\n')
        return
    CmdWin.insert(1.0, "\nPress Send")
    # check stage
    stateLock.acquire()
    checkFlag = currentState.isAfter(States['NAMED'])
    stateLock.release()
    if not checkFlag:
        CmdWin.insert(1.0, "\nSend Error: You are not in any chatroom, please join a chatroom first!")
        return

    # check for all back and forward link
    sendingList = []
    stateLock.acquire()
    forwardLinkTuple = currentState._getforwardlink()
    if forwardLinkTuple is not None:
        forwardLink = forwardLinkTuple[1] 
        sendingList.append(forwardLink)
    backwardLinkTupleList = currentState._getbackwardlinks()
    backwardLinks = [i[1] for i in backwardLinkTupleList]
    roomName = currentState._getroomname()
    stateLock.release()
    if len(backwardLinks) > 0 :
        sendingList = sendingList + backwardLinks
    # get all infos desired by sending Text message
    userInfoLock.acquire()
    userName = user._getname()
    userIp = user._getip()
    userPort = user._getport()
    # update msgID
    msgID = currentState.newMsgID()
    userInfoLock.release()

    # construct the protocal message
    originHID = sdbm_hash(userName+userIp+str(userPort))
    message = [roomName, str(originHID), userName, str(msgID), str(len(inputData)), inputData]
    requestMessage = 'T:' + ':'.join(message) + PROTOCAL_END
    print("\nMain Thread: perform the sending process, dispatch data to other peers\n")

    print('Message:', requestMessage)
    for socket in sendingList:
        output = socketOperation(socket, requestMessage, receive = False)
        if output == Exceptions['SOCKET_ERROR']:
            print('Send Error: cannot sent the message to', socket.getsockname())
    MsgWin.insert(1.0, '\n['+userName+']: '+inputData)

    # clear the entry if success
    userentry.delete(0, END)
项目:P2PChat    作者:whcacademy    | 项目源码 | 文件源码
def do_Send():
    global currentState, user
    print("\nPress Send")
    # check stage
    stateLock.acquire()
    checkFlag = currentState.isAfter(States['NAMED'])
    stateLock.release()
    if not checkFlag:
        print("\nSend Error: You are not in any chatroom, please join a chatroom first!")
        userentry.delete(0, END)
        return
    inputData = userentry.get()
    if len(inputData.strip(' ')) == 0:
        print("\nSend Error: Invalid message!")
        return
    # check for all back and forward link
    sendingList = []
    stateLock.acquire()
    forwardLinkTuple = currentState._getforwardlink()
    if forwardLinkTuple is not None:
        forwardLink = forwardLinkTuple[1] 
        sendingList.append(forwardLink)
    backwardLinkTupleList = currentState._getbackwardlinks()
    backwardLinks = [i[1] for i in backwardLinkTupleList]
    roomName = currentState._getroomname()
    stateLock.release()
    if len(backwardLinks) > 0 :
        sendingList = sendingList + backwardLinks
    # get all infos desired by sending Textmessage
    print("locked at 1067")
    userInfoLock.acquire()
    userName = user._getname()
    userIp = user._getip()
    userPort = user._getport()
    # update msgID
    msgID = currentState.newMsgID()
    userInfoLock.release()
    print("1067 released")

    # construct the protocal message
    originHID = sdbm_hash(userName+userIp+str(userPort))
    message = [roomName, str(originHID), userName, str(msgID), str(len(inputData)), inputData]
    requestMessage = 'T:' + ':'.join(message) + PROTOCAL_END
    print("\nMain Thread: perform the sending process, dispatch data to other peers\n")

    print('Message:', requestMessage)
    for socket in sendingList:
        output = socketOperation(socket, requestMessage, receive = False)
        if output == Exceptions['SOCKET_ERROR']:
            print('Send Error: cannot sent the message to', socket.getsockname())
    print('\n['+userName+']: '+inputData)

    # clear the entry if success
    userentry.delete(0, END)