我们从Python开源项目中,提取了以下26个代码示例,用于说明如何使用socket.SOCK_SEQPACKET。
def _connect_hfp(address, control_chan=True, audio_chan=True): connection = None # Connect to RFCOMM control channel on HF (car kit) if control_chan: port = find_service("hf", address) print("HFP connecting to %s on port %i" % (address, port)) connection = common.Socket() time.sleep(0.5) connection.connect((address, port)) if audio_chan and hasattr(socket, "BTPROTO_SCO"): asock = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_SCO) time.sleep(0.5) asock.connect(bytes(address, encoding="UTF-8")) print("HFP SCO audio socket established") return connection
def tncc_start(self): # tncc is the host checker app. It can check different # security policies of the host and report back. We have # to send it a preauth key (from the DSPREAUTH cookie) # and it sends back a new cookie value we submit. # After logging in, we send back another cookie to tncc. # Subsequently, it contacts https://<vpn_host:443 every # 10 minutes. if not self.tncc_jar: self.tncc_init() self.tncc_socket, sock = socket.socketpair(socket.AF_UNIX, socket.SOCK_SEQPACKET) null = open(os.devnull, 'w') self.tncc_process = subprocess.Popen(['java', '-classpath', self.tncc_jar + ':' + self.plugin_jar, self.class_name, 'log_level', '3', 'postRetries', '6', 'ivehost', self.vpn_host, 'home_dir', os.path.expanduser('~'), 'Parameter0', '', 'user_agent', self.user_agent, ], env={'LD_PRELOAD': self.tncc_preload}, stdin=sock, stdout=null)
def testCrucialConstants(self): # Testing for mission critical constants socket.AF_INET socket.SOCK_STREAM socket.SOCK_DGRAM socket.SOCK_RAW socket.SOCK_RDM socket.SOCK_SEQPACKET socket.SOL_SOCKET socket.SO_REUSEADDR
def _have_socket_rds(): """Check whether RDS sockets are supported on this host.""" try: s = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) except (AttributeError, OSError): return False else: s.close() return True
def setUp(self): self.serv = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) self.addCleanup(self.serv.close) try: self.port = support.bind_port(self.serv) except OSError: self.skipTest('unable to bind RDS socket')
def clientSetUp(self): self.cli = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) try: # RDS sockets must be bound explicitly to send or receive data self.cli.bind((HOST, 0)) self.cli_addr = self.cli.getsockname() except OSError: # skipTest should not be called here, and will be called in the # server instead pass
def testCreateSocket(self): with socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) as s: pass
def __init__(self, config, full_config): super(Dualshock4, self).__init__(config, full_config) # bluetooth control socket self.report_id = 0x11 self.report_size = 79 self.ctrl_socket = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_L2CAP) self.intr_socket = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_L2CAP) # Prepare packet self.packet = bytearray(79) self.packet[0] = 0x52 self.packet[1] = self.report_id self.packet[2] = 0x80 self.packet[4] = 0xFF # The deadzone of the analog sticks to ignore them self.deadzone = self.config['deadzone'] self.left_analog_active = False self.right_analog_active = False self.left_trigger_active = False self.right_trigger_active = False self.up_active = False self.down_active = False self.left_active = False self.right_active = False n_attemps = 5 for attempt in range(n_attemps): print("Attempt %i of %i" % (attempt + 1, n_attemps), end='\r') cmd = ["hcitool", "scan", "--flush"] res = subprocess.check_output(cmd, stderr=subprocess.STDOUT).decode("utf8") for _, address, name in [l.split("\t") for l in res.splitlines()[1:]]: if name == "Wireless Controller": self.ctrl_socket.connect((address, 0x11)) self.intr_socket.connect((address, 0x13)) self.intr_socket.setblocking(False) self.ready = True return
def dwr_handler(fuzzed, f): (own_plug, fuzzed_plug) = sk.socketpair(sk.AF_UNIX, sk.SOCK_SEQPACKET) child = WrappedThread(fuzzed_plug, target=fuzzed, args=[fuzzed_plug]) child.start() while True: (readable, _, _) = sl.select([own_plug, f], [], []) if own_plug in readable: try: b = own_plug.recv(dm.U24_MAX) if len(b) == 0: break except: break f.send(b) elif f in readable: b = f.recv(dm.U24_MAX) if len(b) == 0: break m = dm.Msg.decode(b) if m.code == 280 and m.R: dwa = dm.Msg(code=280, R=False, e2e_id=m.e2e_id, h2h_id=m.h2h_id, avps=[ dm.Avp(code=264, M=True, data=local_host), dm.Avp(code=296, M=True, data=local_realm), dm.Avp(code=268, M=True, u32=2001), dm.Avp(code=278, M=True, u32=0xcafebabe)]) f.send(dwa.encode()) else: own_plug.send(b) own_plug.close() return child.join()
def dwr_handler(scenario, f): msgs = [] (own_plug, fuzzed_plug) = sk.socketpair(sk.AF_UNIX, sk.SOCK_SEQPACKET) child = Thread(target=scenario, args=[fuzzed_plug]) child.start() while True: (readable, _, _) = sl.select([own_plug, f], [], []) if own_plug in readable: b = own_plug.recv(dm.U24_MAX) if len(b) == 0: break m = dm.Msg.decode(b) msgs.append((m, True)) f.send(b) elif f in readable: b = f.recv(dm.U24_MAX) if len(b) == 0: break m = dm.Msg.decode(b) if m.code == 280 and m.R: dwa = dm.Msg(code=280, R=False, e2e_id=m.e2e_id, h2h_id=m.h2h_id, avps=[ dm.Avp(code=264, M=True, data=local_host), dm.Avp(code=296, M=True, data=local_realm), dm.Avp(code=268, M=True, u32=2001), dm.Avp(code=278, M=True, u32=0xcafebabe)]) f.send(dwa.encode()) else: msgs.append((m, False)) own_plug.send(b) own_plug.close() exc_info = child.join() return (exc_info, msgs)
def fuzz_handler(scenario, f, fuzz): assert(isinstance(fuzz, FuzzScenario)) msgs = [] (own_plug, fuzzed_plug) = sk.socketpair(sk.AF_UNIX, sk.SOCK_SEQPACKET) fuzz.bind(f) child = WrappedThread(fuzzed_plug, target=scenario, args=[fuzzed_plug]) child.start() while True: (readable, _, _) = sl.select([own_plug, f], [], []) if own_plug in readable: b = own_plug.recv(dm.U24_MAX) if len(b) == 0: break m = dm.Msg.decode(b) msgs.append((m, True)) assert(isinstance(m, dm.Msg)) fuzz.send(m) elif f in readable: b = f.recv(dm.U24_MAX) if len(b) == 0: break m = dm.Msg.decode(b) if m.code == 280 and m.R: dwa = dm.Msg(code=280, R=False, e2e_id=m.e2e_id, h2h_id=m.h2h_id, avps=[ dm.Avp(code=264, M=True, data=local_host), dm.Avp(code=296, M=True, data=local_realm), dm.Avp(code=268, M=True, u32=2001), dm.Avp(code=278, M=True, u32=0xcafebabe)]) f.send(dwa.encode()) else: msgs.append((m, False)) own_plug.send(b) own_plug.close() exc_info = child.join() return (exc_info, msgs)