Python socket 模块,SOCK_SEQPACKET 实例源码

我们从Python开源项目中,提取了以下26个代码示例,用于说明如何使用socket.SOCK_SEQPACKET

项目:nOBEX    作者:nccgroup    | 项目源码 | 文件源码
def _connect_hfp(address, control_chan=True, audio_chan=True):
        connection = None

        # Connect to RFCOMM control channel on HF (car kit)
        if control_chan:
            port = find_service("hf", address)
            print("HFP connecting to %s on port %i" % (address, port))
            connection = common.Socket()
            time.sleep(0.5)
            connection.connect((address, port))

        if audio_chan and hasattr(socket, "BTPROTO_SCO"):
            asock = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_SCO)
            time.sleep(0.5)
            asock.connect(bytes(address, encoding="UTF-8"))
            print("HFP SCO audio socket established")

        return connection
项目:service-juniper-vpn    作者:docksal    | 项目源码 | 文件源码
def tncc_start(self):
        # tncc is the host checker app. It can check different
        # security policies of the host and report back. We have
        # to send it a preauth key (from the DSPREAUTH cookie)
        # and it sends back a new cookie value we submit.
        # After logging in, we send back another cookie to tncc.
        # Subsequently, it contacts https://<vpn_host:443 every
        # 10 minutes.

        if not self.tncc_jar:
            self.tncc_init()

        self.tncc_socket, sock = socket.socketpair(socket.AF_UNIX, socket.SOCK_SEQPACKET)
        null = open(os.devnull, 'w')

        self.tncc_process = subprocess.Popen(['java',
            '-classpath', self.tncc_jar + ':' + self.plugin_jar,
            self.class_name,
            'log_level', '3',
            'postRetries', '6',
            'ivehost', self.vpn_host,
            'home_dir', os.path.expanduser('~'),
            'Parameter0', '',
            'user_agent', self.user_agent,
            ], env={'LD_PRELOAD': self.tncc_preload}, stdin=sock, stdout=null)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def testCrucialConstants(self):
        # Testing for mission critical constants
        socket.AF_INET
        socket.SOCK_STREAM
        socket.SOCK_DGRAM
        socket.SOCK_RAW
        socket.SOCK_RDM
        socket.SOCK_SEQPACKET
        socket.SOL_SOCKET
        socket.SO_REUSEADDR
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def testCrucialConstants(self):
        # Testing for mission critical constants
        socket.AF_INET
        socket.SOCK_STREAM
        socket.SOCK_DGRAM
        socket.SOCK_RAW
        socket.SOCK_RDM
        socket.SOCK_SEQPACKET
        socket.SOL_SOCKET
        socket.SO_REUSEADDR
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def testCrucialConstants(self):
        # Testing for mission critical constants
        socket.AF_INET
        socket.SOCK_STREAM
        socket.SOCK_DGRAM
        socket.SOCK_RAW
        socket.SOCK_RDM
        socket.SOCK_SEQPACKET
        socket.SOL_SOCKET
        socket.SO_REUSEADDR
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def _have_socket_rds():
    """Check whether RDS sockets are supported on this host."""
    try:
        s = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0)
    except (AttributeError, OSError):
        return False
    else:
        s.close()
    return True
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def setUp(self):
        self.serv = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0)
        self.addCleanup(self.serv.close)
        try:
            self.port = support.bind_port(self.serv)
        except OSError:
            self.skipTest('unable to bind RDS socket')
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def clientSetUp(self):
        self.cli = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0)
        try:
            # RDS sockets must be bound explicitly to send or receive data
            self.cli.bind((HOST, 0))
            self.cli_addr = self.cli.getsockname()
        except OSError:
            # skipTest should not be called here, and will be called in the
            # server instead
            pass
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def testCrucialConstants(self):
        # Testing for mission critical constants
        socket.AF_INET
        socket.SOCK_STREAM
        socket.SOCK_DGRAM
        socket.SOCK_RAW
        socket.SOCK_RDM
        socket.SOCK_SEQPACKET
        socket.SOL_SOCKET
        socket.SO_REUSEADDR
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def testCreateSocket(self):
        with socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) as s:
            pass
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def testCrucialConstants(self):
        # Testing for mission critical constants
        socket.AF_INET
        socket.SOCK_STREAM
        socket.SOCK_DGRAM
        socket.SOCK_RAW
        socket.SOCK_RDM
        socket.SOCK_SEQPACKET
        socket.SOL_SOCKET
        socket.SO_REUSEADDR
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def _have_socket_rds():
    """Check whether RDS sockets are supported on this host."""
    try:
        s = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0)
    except (AttributeError, OSError):
        return False
    else:
        s.close()
    return True
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def setUp(self):
        self.serv = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0)
        self.addCleanup(self.serv.close)
        try:
            self.port = support.bind_port(self.serv)
        except OSError:
            self.skipTest('unable to bind RDS socket')
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def clientSetUp(self):
        self.cli = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0)
        try:
            # RDS sockets must be bound explicitly to send or receive data
            self.cli.bind((HOST, 0))
            self.cli_addr = self.cli.getsockname()
        except OSError:
            # skipTest should not be called here, and will be called in the
            # server instead
            pass
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def testCrucialConstants(self):
        # Testing for mission critical constants
        socket.AF_INET
        socket.SOCK_STREAM
        socket.SOCK_DGRAM
        socket.SOCK_RAW
        socket.SOCK_RDM
        socket.SOCK_SEQPACKET
        socket.SOL_SOCKET
        socket.SO_REUSEADDR
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def testCreateSocket(self):
        with socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) as s:
            pass
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def testCrucialConstants(self):
        # Testing for mission critical constants
        socket.AF_INET
        socket.SOCK_STREAM
        socket.SOCK_DGRAM
        socket.SOCK_RAW
        socket.SOCK_RDM
        socket.SOCK_SEQPACKET
        socket.SOL_SOCKET
        socket.SO_REUSEADDR
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def _have_socket_rds():
    """Check whether RDS sockets are supported on this host."""
    try:
        s = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0)
    except (AttributeError, OSError):
        return False
    else:
        s.close()
    return True
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def setUp(self):
        self.serv = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0)
        self.addCleanup(self.serv.close)
        try:
            self.port = support.bind_port(self.serv)
        except OSError:
            self.skipTest('unable to bind RDS socket')
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def clientSetUp(self):
        self.cli = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0)
        try:
            # RDS sockets must be bound explicitly to send or receive data
            self.cli.bind((HOST, 0))
            self.cli_addr = self.cli.getsockname()
        except OSError:
            # skipTest should not be called here, and will be called in the
            # server instead
            pass
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def testCrucialConstants(self):
        # Testing for mission critical constants
        socket.AF_INET
        socket.SOCK_STREAM
        socket.SOCK_DGRAM
        socket.SOCK_RAW
        socket.SOCK_RDM
        socket.SOCK_SEQPACKET
        socket.SOL_SOCKET
        socket.SO_REUSEADDR
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def testCreateSocket(self):
        with socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) as s:
            pass
项目:derplearning    作者:John-Ellis    | 项目源码 | 文件源码
def __init__(self, config, full_config):
        super(Dualshock4, self).__init__(config, full_config)

        # bluetooth control socket
        self.report_id = 0x11
        self.report_size = 79
        self.ctrl_socket = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET,
                                         socket.BTPROTO_L2CAP)
        self.intr_socket = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET,
                                         socket.BTPROTO_L2CAP)

        # Prepare packet
        self.packet = bytearray(79)
        self.packet[0] = 0x52
        self.packet[1] = self.report_id
        self.packet[2] = 0x80
        self.packet[4] = 0xFF

        # The deadzone of the analog sticks to ignore them
        self.deadzone = self.config['deadzone']        
        self.left_analog_active = False
        self.right_analog_active = False
        self.left_trigger_active = False
        self.right_trigger_active = False
        self.up_active = False
        self.down_active = False
        self.left_active = False
        self.right_active = False

        n_attemps = 5
        for attempt in range(n_attemps):
            print("Attempt %i of %i" % (attempt + 1, n_attemps), end='\r')
            cmd = ["hcitool", "scan", "--flush"]
            res = subprocess.check_output(cmd, stderr=subprocess.STDOUT).decode("utf8")
            for _, address, name in [l.split("\t") for l in res.splitlines()[1:]]:
                if name == "Wireless Controller":
                    self.ctrl_socket.connect((address, 0x11))
                    self.intr_socket.connect((address, 0x13))
                    self.intr_socket.setblocking(False)
                    self.ready = True
                    return
项目:diafuzzer    作者:phil777    | 项目源码 | 文件源码
def dwr_handler(fuzzed, f):
  (own_plug, fuzzed_plug) = sk.socketpair(sk.AF_UNIX, sk.SOCK_SEQPACKET)

  child = WrappedThread(fuzzed_plug, target=fuzzed, args=[fuzzed_plug])
  child.start()

  while True:
    (readable, _, _) = sl.select([own_plug, f], [], [])

    if own_plug in readable:
      try:
        b = own_plug.recv(dm.U24_MAX)
        if len(b) == 0:
          break
      except:
        break
      f.send(b)
    elif f in readable:
      b = f.recv(dm.U24_MAX)
      if len(b) == 0:
        break

      m = dm.Msg.decode(b)
      if m.code == 280 and m.R:
        dwa = dm.Msg(code=280, R=False, e2e_id=m.e2e_id, h2h_id=m.h2h_id, avps=[
          dm.Avp(code=264, M=True, data=local_host),
          dm.Avp(code=296, M=True, data=local_realm),
          dm.Avp(code=268, M=True, u32=2001),
          dm.Avp(code=278, M=True, u32=0xcafebabe)])
        f.send(dwa.encode())
      else:
        own_plug.send(b)

  own_plug.close()
  return child.join()
项目:diafuzzer    作者:phil777    | 项目源码 | 文件源码
def dwr_handler(scenario, f):
  msgs = []

  (own_plug, fuzzed_plug) = sk.socketpair(sk.AF_UNIX, sk.SOCK_SEQPACKET)

  child = Thread(target=scenario, args=[fuzzed_plug])
  child.start()

  while True:
    (readable, _, _) = sl.select([own_plug, f], [], [])

    if own_plug in readable:
      b = own_plug.recv(dm.U24_MAX)
      if len(b) == 0:
        break
      m = dm.Msg.decode(b)
      msgs.append((m, True))
      f.send(b)
    elif f in readable:
      b = f.recv(dm.U24_MAX)
      if len(b) == 0:
        break

      m = dm.Msg.decode(b)
      if m.code == 280 and m.R:
        dwa = dm.Msg(code=280, R=False, e2e_id=m.e2e_id, h2h_id=m.h2h_id, avps=[
          dm.Avp(code=264, M=True, data=local_host),
          dm.Avp(code=296, M=True, data=local_realm),
          dm.Avp(code=268, M=True, u32=2001),
          dm.Avp(code=278, M=True, u32=0xcafebabe)])
        f.send(dwa.encode())
      else:
        msgs.append((m, False))
        own_plug.send(b)

  own_plug.close()
  exc_info = child.join()

  return (exc_info, msgs)
项目:diafuzzer    作者:phil777    | 项目源码 | 文件源码
def fuzz_handler(scenario, f, fuzz):
  assert(isinstance(fuzz, FuzzScenario))

  msgs = []

  (own_plug, fuzzed_plug) = sk.socketpair(sk.AF_UNIX, sk.SOCK_SEQPACKET)

  fuzz.bind(f)

  child = WrappedThread(fuzzed_plug, target=scenario, args=[fuzzed_plug])
  child.start()

  while True:
    (readable, _, _) = sl.select([own_plug, f], [], [])

    if own_plug in readable:
      b = own_plug.recv(dm.U24_MAX)
      if len(b) == 0:
        break
      m = dm.Msg.decode(b)
      msgs.append((m, True))
      assert(isinstance(m, dm.Msg))
      fuzz.send(m)
    elif f in readable:
      b = f.recv(dm.U24_MAX)
      if len(b) == 0:
        break

      m = dm.Msg.decode(b)
      if m.code == 280 and m.R:
        dwa = dm.Msg(code=280, R=False, e2e_id=m.e2e_id, h2h_id=m.h2h_id, avps=[
          dm.Avp(code=264, M=True, data=local_host),
          dm.Avp(code=296, M=True, data=local_realm),
          dm.Avp(code=268, M=True, u32=2001),
          dm.Avp(code=278, M=True, u32=0xcafebabe)])
        f.send(dwa.encode())
      else:
        msgs.append((m, False))
        own_plug.send(b)

  own_plug.close()
  exc_info = child.join()

  return (exc_info, msgs)