我们从Python开源项目中,提取了以下6个代码示例,用于说明如何使用gevent.socket.SOL_SOCKET。
def _get_local_ip(): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) # not using <broadcast> because gevents getaddrinfo doesn't like that # using port 1 as per hobbldygoop's comment about port 0 not working on osx: # https://github.com/sirMackk/ZeroNet/commit/fdcd15cf8df0008a2070647d4d28ffedb503fba2#commitcomment-9863928 s.connect(('239.255.255.250', 1)) return s.getsockname()[0]
def _listen(self): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind((self.host, self.port)) sock.listen(256) self.slave_index = 0 slaves = [] def dispatch_command(cmd): _send_obj(slaves[self.slave_index], cmd) self.slave_index += 1 if self.slave_index == len(slaves): self.slave_index = 0 def handle_slave(sock): try: while True: self.event_queue.put_nowait(_recv_obj(sock)) except Exception as e: logger.info("Slave disconnected") slaves.remove(sock) if self.slave_index == len(slaves) and len(slaves) > 0: self.slave_index -= 1 try: sock.close() except: pass def listener(): while True: _socket, _addr = sock.accept() logger.info("Slave connected") slaves.append(_socket) gevent.spawn(lambda: handle_slave(_socket)) gevent.spawn(listener) return dispatch_command
def _get_local_ip(self): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) # not using <broadcast> because gevents getaddrinfo doesn't like that # using port 1 as per hobbldygoop's comment about port 0 not working on osx: # https://github.com/sirMackk/ZeroNet/commit/fdcd15cf8df0008a2070647d4d28ffedb503fba2#commitcomment-9863928 s.connect(('239.255.255.250', 1)) return s.getsockname()[0]
def __init__(self, p_socket=None): self.socket = p_socket if self.socket is not None: self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) self.request_sequnce = 0 self.buffer = ""
def __init__(self, host, port, app): self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.server.bind((host, port)) self.server.listen() self.app = app self.base_env = dict(os.environ.items())
def _get_local_ips(): local_ips = [] # get local ip using UDP and a broadcast address s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) # Not using <broadcast> because gevents getaddrinfo doesn't like that # using port 1 as per hobbldygoop's comment about port 0 not working on osx: # https://github.com/sirMackk/ZeroNet/commit/fdcd15cf8df0008a2070647d4d28ffedb503fba2#commitcomment-9863928 s.connect(('239.255.255.250', 1)) local_ips.append(s.getsockname()[0]) # Get ip by using UDP and a normal address (google dns ip) try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(('8.8.8.8', 0)) local_ips.append(s.getsockname()[0]) except BaseException: pass # Get ip by '' hostname . Not supported on all platforms. try: local_ips += socket.gethostbyname_ex('')[2] except BaseException: pass # Delete duplicates local_ips = list(set(local_ips)) logging.debug("Found local ips: %s" % local_ips) return local_ips