我们从Python开源项目中,提取了以下8个代码示例,用于说明如何使用socket.getsockname()。
def findFreePort(interface='127.0.0.1', family=socket.AF_INET, type=socket.SOCK_STREAM): """ Ask the platform to allocate a free port on the specified interface, then release the socket and return the address which was allocated. @param interface: The local address to try to bind the port on. @type interface: C{str} @param type: The socket type which will use the resulting port. @return: A two-tuple of address and port, like that returned by L{socket.getsockname}. """ addr = socket.getaddrinfo(interface, 0)[0][4] probe = socket.socket(family, type) try: probe.bind(addr) return probe.getsockname() finally: probe.close()
def __init__(self, socket, proto): self.host = socket.getsockname() self.peer = socket.getpeername() self.protocall = proto self.recevied_time = time.time()
def get_port(socket): """Return the port to which a socket is bound.""" addr, port = socket.getsockname() return port
def handle_request(self, socket, address): ip, port = socket.getsockname() plugin = self.message_mapping.get(port) if plugin: session = EasySession(socket) plugin.handle_request(session) session.close() # ??unix_socket??
def handle_uds_request(self, socket, address): name = socket.getsockname() plugin = self.message_mapping.get(name) if plugin: session = EasySession(socket) plugin.handle_request(session) session.close() # ??????
def do_Send(): global currentState, user inputData = userentry.get() if len(inputData.strip(' ')) == 0: print('Detect nothing to send!\n') return CmdWin.insert(1.0, "\nPress Send") # check stage stateLock.acquire() checkFlag = currentState.isAfter(States['NAMED']) stateLock.release() if not checkFlag: CmdWin.insert(1.0, "\nSend Error: You are not in any chatroom, please join a chatroom first!") return # check for all back and forward link sendingList = [] stateLock.acquire() forwardLinkTuple = currentState._getforwardlink() if forwardLinkTuple is not None: forwardLink = forwardLinkTuple[1] sendingList.append(forwardLink) backwardLinkTupleList = currentState._getbackwardlinks() backwardLinks = [i[1] for i in backwardLinkTupleList] roomName = currentState._getroomname() stateLock.release() if len(backwardLinks) > 0 : sendingList = sendingList + backwardLinks # get all infos desired by sending Text message userInfoLock.acquire() userName = user._getname() userIp = user._getip() userPort = user._getport() # update msgID msgID = currentState.newMsgID() userInfoLock.release() # construct the protocal message originHID = sdbm_hash(userName+userIp+str(userPort)) message = [roomName, str(originHID), userName, str(msgID), str(len(inputData)), inputData] requestMessage = 'T:' + ':'.join(message) + PROTOCAL_END print("\nMain Thread: perform the sending process, dispatch data to other peers\n") print('Message:', requestMessage) for socket in sendingList: output = socketOperation(socket, requestMessage, receive = False) if output == Exceptions['SOCKET_ERROR']: print('Send Error: cannot sent the message to', socket.getsockname()) MsgWin.insert(1.0, '\n['+userName+']: '+inputData) # clear the entry if success userentry.delete(0, END)
def do_Send(): global currentState, user print("\nPress Send") # check stage stateLock.acquire() checkFlag = currentState.isAfter(States['NAMED']) stateLock.release() if not checkFlag: print("\nSend Error: You are not in any chatroom, please join a chatroom first!") userentry.delete(0, END) return inputData = userentry.get() if len(inputData.strip(' ')) == 0: print("\nSend Error: Invalid message!") return # check for all back and forward link sendingList = [] stateLock.acquire() forwardLinkTuple = currentState._getforwardlink() if forwardLinkTuple is not None: forwardLink = forwardLinkTuple[1] sendingList.append(forwardLink) backwardLinkTupleList = currentState._getbackwardlinks() backwardLinks = [i[1] for i in backwardLinkTupleList] roomName = currentState._getroomname() stateLock.release() if len(backwardLinks) > 0 : sendingList = sendingList + backwardLinks # get all infos desired by sending Textmessage print("locked at 1067") userInfoLock.acquire() userName = user._getname() userIp = user._getip() userPort = user._getport() # update msgID msgID = currentState.newMsgID() userInfoLock.release() print("1067 released") # construct the protocal message originHID = sdbm_hash(userName+userIp+str(userPort)) message = [roomName, str(originHID), userName, str(msgID), str(len(inputData)), inputData] requestMessage = 'T:' + ':'.join(message) + PROTOCAL_END print("\nMain Thread: perform the sending process, dispatch data to other peers\n") print('Message:', requestMessage) for socket in sendingList: output = socketOperation(socket, requestMessage, receive = False) if output == Exceptions['SOCKET_ERROR']: print('Send Error: cannot sent the message to', socket.getsockname()) print('\n['+userName+']: '+inputData) # clear the entry if success userentry.delete(0, END)