Python zmq 模块,LINGER 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用zmq.LINGER

项目:lustre_task_driven_monitoring_framework    作者:GSI-HPC    | 项目源码 | 文件源码
def disconnect(self):

        if self.is_connected:

            if self.socket:

                self.socket.setsockopt(zmq.LINGER, 0)

                if self.poller:
                    self.poller.unregister(self.socket)

                self.socket.close()

            if self.context:
                self.context.term()

            self.is_connected = False
项目:bqueryd    作者:visualfabriq    | 项目源码 | 文件源码
def __init__(self, data_dir=bqueryd.DEFAULT_DATA_DIR, redis_url='redis://127.0.0.1:6379/0', loglevel=logging.DEBUG):
        if not os.path.exists(data_dir) or not os.path.isdir(data_dir):
            raise Exception("Datadir %s is not a valid directory" % data_dir)
        self.worker_id = binascii.hexlify(os.urandom(8))
        self.node_name = socket.gethostname()
        self.data_dir = data_dir
        self.data_files = set()
        context = zmq.Context()
        self.socket = context.socket(zmq.ROUTER)
        self.socket.setsockopt(zmq.LINGER, 500)
        self.socket.identity = self.worker_id
        self.poller = zmq.Poller()
        self.poller.register(self.socket, zmq.POLLIN | zmq.POLLOUT)
        self.redis_server = redis.from_url(redis_url)
        self.controllers = {}  # Keep a dict of timestamps when you last spoke to controllers
        self.check_controllers()
        self.last_wrm = 0
        self.start_time = time.time()
        self.logger = bqueryd.logger.getChild('worker ' + self.worker_id)
        self.logger.setLevel(loglevel)
        self.msg_count = 0
        signal.signal(signal.SIGTERM, self.term_signal())
项目:enteletaor    作者:cr0hn    | 项目源码 | 文件源码
def brute_zmq(host, port=5555, user=None, password=None, db=0):

    context = zmq.Context()

    # Configure
    socket = context.socket(zmq.SUB)
    socket.setsockopt(zmq.SUBSCRIBE, b"")  # All topics
    socket.setsockopt(zmq.LINGER, 0)  # All topics
    socket.RCVTIMEO = 1000  # timeout: 1 sec

    # Connect
    socket.connect("tcp://%s:%s" % (host, port))

    # Try to receive
    try:
        socket.recv()

        return True
    except Exception:
        return False
    finally:
        socket.close()
项目:enteletaor    作者:cr0hn    | 项目源码 | 文件源码
def handle_zmq(host, port=5555, extra_config=None):

    # log.debug("      * Connection to ZeroMQ: %s : %s" % (host, port))

    context = zmq.Context()

    # Configure
    socket = context.socket(zmq.SUB)
    socket.setsockopt(zmq.SUBSCRIBE, b"")  # All topics
    socket.setsockopt(zmq.LINGER, 0)  # All topics
    socket.RCVTIMEO = 1000  # timeout: 1 sec

    # Connect
    socket.connect("tcp://%s:%s" % (host, port))

    # Try to receive
    try:
        socket.recv()

        return True
    except Exception:
        return False
    finally:
        socket.close()
项目:mercury    作者:jr0d    | 项目源码 | 文件源码
def __init__(self, bind_address, linger=-1, poll_timeout=2, loop=None):
        self.bind_address = bind_address
        self.loop = loop
        self.context = zmq.asyncio.Context()
        self.poll_timeout = poll_timeout
        self.socket = self.context.socket(zmq.ROUTER)
        self.socket.setsockopt(zmq.LINGER, linger)

        self.in_poller = zmq.asyncio.Poller()
        self.in_poller.register(self.socket, zmq.POLLIN)

        log.info('Bound to: ' + self.bind_address)

        self.socket.bind(self.bind_address)

        self._kill = False
项目:pymoku    作者:liquidinstruments    | 项目源码 | 文件源码
def _frame_worker(self):
        if(getattr(self, '_frame_class', None)):
            ctx = zmq.Context.instance()
            skt = ctx.socket(zmq.SUB)
            skt.connect("tcp://%s:27185" % self._moku._ip)
            skt.setsockopt_string(zmq.SUBSCRIBE, u'')
            skt.setsockopt(zmq.RCVHWM, 8)
            skt.setsockopt(zmq.LINGER, 5000)

            fr = self._frame_class(**self._frame_kwargs)

            try:
                while self._running:
                    if skt in zmq.select([skt], [], [], 1.0)[0]:
                        d = skt.recv()
                        fr.add_packet(d)

                        if fr._complete:
                            self._queue.put_nowait(fr)
                            fr = self._frame_class(**self._frame_kwargs)
            finally:
                skt.close()
项目:pynutmeg    作者:kitizz    | 项目源码 | 文件源码
def reset_socket(self):
        # Close things if necessary
        if self.pubsock is not None:
            self.pubsock.close()

        print("Nutmeg connecting")
        print("\tPublishing to:", self.pub_address)
        self.pubsock = self.context.socket(zmq.PUB)
        # self.socket.setsockopt(zmq.LINGER, 0)
        self.pubsock.connect(self.pub_address)

        if not self.sub_running:
            self.sub_running = True
            self._subscribe()
        else:
            self.reset_sub = True

        # # Last time a disconnection occurred
        # self.disconnected_t = time.time()
        self.running = True
        self._poke_server()
项目:bearded-avenger-sdk-py    作者:csirtgadgets    | 项目源码 | 文件源码
def zcreate_pipe(ctx, hwm=1000):
    backend = zsocket.ZSocket(ctx, zmq.PAIR)
    frontend = zsocket.ZSocket(ctx, zmq.PAIR)
    backend.set_hwm(hwm)
    frontend.set_hwm(hwm)
    # close immediately on shutdown
    backend.setsockopt(zmq.LINGER, 0)
    frontend.setsockopt(zmq.LINGER, 0)

    endpoint = "inproc://zactor-%04x-%04x\n"\
                 %(random.randint(0, 0x10000), random.randint(0, 0x10000))
    while True:
        try:
            frontend.bind(endpoint)
        except:
            endpoint = "inproc://zactor-%04x-%04x\n"\
                 %(random.randint(0, 0x10000), random.randint(0, 0x10000))
        else:
            break
    backend.connect(endpoint)
    return (frontend, backend)
项目:ternarynet    作者:czhu95    | 项目源码 | 文件源码
def serve_data(ds, addr):
    ctx = zmq.Context()
    socket = ctx.socket(zmq.PUSH)
    socket.set_hwm(10)
    socket.bind(addr)
    ds = RepeatedData(ds, -1)
    try:
        ds.reset_state()
        logger.info("Serving data at {}".format(addr))
        while True:
            for dp in ds.get_data():
                socket.send(dumps(dp), copy=False)
    finally:
        socket.setsockopt(zmq.LINGER, 0)
        socket.close()
        if not ctx.closed:
            ctx.destroy(0)
项目:Pyslvs-PyQt5    作者:KmolYuan    | 项目源码 | 文件源码
def socket_fitness(self, chrom):
        if self.socket.closed:
            self.socket = self.context.socket(zmq.REQ)
            self.socket.bind(self.socket_port)
            self.poll.register(self.socket, zmq.POLLIN)
        self.socket.send_string(';'.join([
            self.func.get_Driving(),
            self.func.get_Follower(),
            self.func.get_Link(),
            self.func.get_Target(),
            self.func.get_ExpressionName(),
            self.func.get_Expression(),
            ','.join(["{}:{}".format(e[0], e[1]) for e in self.targetPath]),
            ','.join([str(e) for e in chrom])
            ]))
        while True:
            socks = dict(self.poll.poll(100))
            if socks.get(self.socket)==zmq.POLLIN:
                return float(self.socket.recv().decode('utf-8'))
            else:
                self.socket.setsockopt(zmq.LINGER, 0)
                self.socket.close()
                self.poll.unregister(self.socket)
                return self.func(chrom)
项目:Pyslvs-PyQt5    作者:KmolYuan    | 项目源码 | 文件源码
def socket_fitness(self, chrom):
        if self.socket.closed:
            self.socket = self.context.socket(zmq.REQ)
            self.socket.bind(self.socket_port)
            self.poll.register(self.socket, zmq.POLLIN)
        self.socket.send_string(';'.join([
            self.func.get_Driving(),
            self.func.get_Follower(),
            self.func.get_Link(),
            self.func.get_Target(),
            self.func.get_ExpressionName(),
            self.func.get_Expression(),
            ','.join(["{}:{}".format(e[0], e[1]) for e in self.targetPath]),
            ','.join([str(e) for e in chrom])
            ]))
        while True:
            socks = dict(self.poll.poll(100))
            if socks.get(self.socket)==zmq.POLLIN:
                return float(self.socket.recv().decode('utf-8'))
            else:
                self.socket.setsockopt(zmq.LINGER, 0)
                self.socket.close()
                self.poll.unregister(self.socket)
                return self.func(chrom)
项目:Pyslvs-PyQt5    作者:KmolYuan    | 项目源码 | 文件源码
def socket_fitness(self, chrom):
        if self.socket.closed:
            self.socket = self.context.socket(zmq.REQ)
            self.socket.bind(self.socket_port)
            self.poll.register(self.socket, zmq.POLLIN)
        self.socket.send_string(';'.join([
            self.func.get_Driving(),
            self.func.get_Follower(),
            self.func.get_Link(),
            self.func.get_Target(),
            self.func.get_ExpressionName(),
            self.func.get_Expression(),
            ','.join(["{}:{}".format(e[0], e[1]) for e in self.targetPath]),
            ','.join([str(e) for e in chrom])
            ]))
        while True:
            socks = dict(self.poll.poll(100))
            if socks.get(self.socket)==zmq.POLLIN:
                return float(self.socket.recv().decode('utf-8'))
            else:
                self.socket.setsockopt(zmq.LINGER, 0)
                self.socket.close()
                self.poll.unregister(self.socket)
                return self.func(chrom)
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def test_bad_sockopts(self):
        """Test that appropriate errors are raised on bad socket options"""
        s = self.context.socket(zmq.PUB)
        self.sockets.append(s)
        s.setsockopt(zmq.LINGER, 0)
        # unrecognized int sockopts pass through to libzmq, and should raise EINVAL
        self.assertRaisesErrno(zmq.EINVAL, s.setsockopt, 9999, 5)
        self.assertRaisesErrno(zmq.EINVAL, s.getsockopt, 9999)
        # but only int sockopts are allowed through this way, otherwise raise a TypeError
        self.assertRaises(TypeError, s.setsockopt, 9999, b"5")
        # some sockopts are valid in general, but not on every socket:
        self.assertRaisesErrno(zmq.EINVAL, s.setsockopt, zmq.SUBSCRIBE, b'hi')
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def test_sockopt_roundtrip(self):
        "test set/getsockopt roundtrip."
        p = self.context.socket(zmq.PUB)
        self.sockets.append(p)
        p.setsockopt(zmq.LINGER, 11)
        self.assertEqual(p.getsockopt(zmq.LINGER), 11)
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def test_attr(self):
        """set setting/getting sockopts as attributes"""
        s = self.context.socket(zmq.DEALER)
        self.sockets.append(s)
        linger = 10
        s.linger = linger
        self.assertEqual(linger, s.linger)
        self.assertEqual(linger, s.getsockopt(zmq.LINGER))
        self.assertEqual(s.fd, s.getsockopt(zmq.FD))
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def test_term_hang(self):
        rep,req = self.create_bound_pair(zmq.ROUTER, zmq.DEALER)
        req.setsockopt(zmq.LINGER, 0)
        req.send(b'hello', copy=False)
        req.close()
        rep.close()
        self.context.term()
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def test_default_mq_args(self):
        self.device = dev = devices.ThreadMonitoredQueue(zmq.ROUTER, zmq.DEALER, zmq.PUB)
        dev.setsockopt_in(zmq.LINGER, 0)
        dev.setsockopt_out(zmq.LINGER, 0)
        dev.setsockopt_mon(zmq.LINGER, 0)
        # this will raise if default args are wrong
        dev.start()
        self.teardown_device()
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def create_bound_pair(self, type1=zmq.PAIR, type2=zmq.PAIR, interface='tcp://127.0.0.1'):
        """Create a bound socket pair using a random port."""
        s1 = self.context.socket(type1)
        s1.setsockopt(zmq.LINGER, 0)
        port = s1.bind_to_random_port(interface)
        s2 = self.context.socket(type2)
        s2.setsockopt(zmq.LINGER, 0)
        s2.connect('%s:%s' % (interface, port))
        self.sockets.extend([s1,s2])
        return s1, s2
项目:ParlAI    作者:facebookresearch    | 项目源码 | 文件源码
def connect(self):
        """Bind or connect to ZMQ socket. Requires package zmq."""
        context = zmq.Context()
        self.socket = context.socket(self.socket_type)
        self.socket.setsockopt(zmq.LINGER, 1)
        host = 'tcp://{}:{}'.format(self.address, self.port)
        if self.socket_type == zmq.REP:
            self.socket.bind(host)
        else:
            self.socket.connect(host)
        print('python thread connected to ' + host)
项目:CommunityCellularManager    作者:facebookincubator    | 项目源码 | 文件源码
def setup_socket(self):
    """Sets up the ZMQ socket."""
    context = zmq.Context()
    # The component inheriting from BaseComponent should self.socket.connect
    # with the appropriate address.
    self.socket = context.socket(zmq.REQ)
    # LINGER sets a timeout for socket.send.
    self.socket.setsockopt(zmq.LINGER, 0)
    # RCVTIME0 sets a timeout for socket.recv.
    self.socket.setsockopt(zmq.RCVTIMEO, 500)  # milliseconds
项目:zerolog    作者:TheGhouls    | 项目源码 | 文件源码
def test_forwarder(forwarder, tcp_sender, context):
    """Monitor should correctly send data"""
    sender = tcp_sender

    mon = context.socket(zmq.SUB)
    mon.setsockopt_string(zmq.SUBSCRIBE, "")
    mon.setsockopt(zmq.LINGER, 0)
    mon.connect("tcp://localhost:6500")

    recv = context.socket(zmq.SUB)
    recv.setsockopt_string(zmq.SUBSCRIBE, "")
    recv.setsockopt(zmq.LINGER, 0)
    recv.connect("tcp://localhost:6002")

    server_address = ('localhost', 6001)
    sender.connect(server_address)

    # waiting for warmup
    time.sleep(1)

    sender.sendall(b"test test")

    data = mon.recv()
    assert data is not None

    sender.sendall(b"test test")

    data = recv.recv()
    assert data is not None
    sender.close()
    forwarder.terminate()
项目:zerolog    作者:TheGhouls    | 项目源码 | 文件源码
def sender(context):
    s = context.socket(zmq.PUSH)
    s.setsockopt(zmq.LINGER, 0)
    s.bind("tcp://*:6800")
    time.sleep(1)
    return s
项目:bqueryd    作者:visualfabriq    | 项目源码 | 文件源码
def __init__(self, redis_url='redis://127.0.0.1:6379/0', loglevel=logging.INFO):

        self.redis_url = redis_url
        self.redis_server = redis.from_url(redis_url)
        self.context = zmq.Context()
        self.socket = self.context.socket(zmq.ROUTER)
        self.socket.setsockopt(zmq.LINGER, 500)
        self.socket.setsockopt(zmq.ROUTER_MANDATORY, 1)  # Paranoid for debugging purposes
        self.socket.setsockopt(zmq.SNDTIMEO, 1000)  # Short timeout
        self.poller = zmq.Poller()
        self.poller.register(self.socket, zmq.POLLIN | zmq.POLLOUT)

        self.node_name = socket.gethostname()
        self.address = bind_to_random_port(self.socket, 'tcp://' + get_my_ip(), min_port=14300, max_port=14399,
                                           max_tries=100)
        with open(os.path.join(RUNFILES_LOCATION, 'bqueryd_controller.address'), 'w') as F:
            F.write(self.address)
        with open(os.path.join(RUNFILES_LOCATION, 'bqueryd_controller.pid'), 'w') as F:
            F.write(str(os.getpid()))

        self.logger = bqueryd.logger.getChild('controller').getChild(self.address)
        self.logger.setLevel(loglevel)

        self.msg_count_in = 0
        self.rpc_results = []  # buffer of results that are ready to be returned to callers
        self.rpc_segments = {}  # Certain RPC calls get split and divided over workers, this dict tracks the original RPCs
        self.worker_map = {}  # maintain a list of connected workers TODO get rid of unresponsive ones...
        self.files_map = {}  # shows on which workers a file is available on
        self.worker_out_messages = {None: []}  # A dict of buffers, used to round-robin based on message affinity
        self.worker_out_messages_sequence = [None]  # used to round-robin the outgoing messages
        self.is_running = True
        self.last_heartbeat = 0
        self.others = {}  # A dict of other Controllers running on other DQE nodes
        self.start_time = time.time()
项目:bqueryd    作者:visualfabriq    | 项目源码 | 文件源码
def connect_socket(self):
        reply = None
        for c in self.controllers:
            self.logger.debug('Establishing socket connection to %s' % c)
            tmp_sock = self.context.socket(zmq.REQ)
            tmp_sock.setsockopt(zmq.RCVTIMEO, 2000)
            tmp_sock.setsockopt(zmq.LINGER, 0)
            tmp_sock.identity = self.identity
            tmp_sock.connect(c)
            # first ping the controller to see if it responds at all
            msg = RPCMessage({'payload': 'ping'})
            tmp_sock.send_json(msg)
            try:
                reply = msg_factory(tmp_sock.recv_json())
                self.address = c
                break
            except:
                traceback.print_exc()
                continue
        if reply:
            # Now set the timeout to the actual requested
            self.logger.debug("Connection OK, setting network timeout to %s milliseconds", self.timeout*1000)
            self.controller = tmp_sock
            self.controller.setsockopt(zmq.RCVTIMEO, self.timeout*1000)
        else:
            raise Exception('No controller connection')
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_bad_sockopts(self):
        """Test that appropriate errors are raised on bad socket options"""
        s = self.context.socket(zmq.PUB)
        self.sockets.append(s)
        s.setsockopt(zmq.LINGER, 0)
        # unrecognized int sockopts pass through to libzmq, and should raise EINVAL
        self.assertRaisesErrno(zmq.EINVAL, s.setsockopt, 9999, 5)
        self.assertRaisesErrno(zmq.EINVAL, s.getsockopt, 9999)
        # but only int sockopts are allowed through this way, otherwise raise a TypeError
        self.assertRaises(TypeError, s.setsockopt, 9999, b"5")
        # some sockopts are valid in general, but not on every socket:
        self.assertRaisesErrno(zmq.EINVAL, s.setsockopt, zmq.SUBSCRIBE, b'hi')
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_sockopt_roundtrip(self):
        "test set/getsockopt roundtrip."
        p = self.context.socket(zmq.PUB)
        self.sockets.append(p)
        p.setsockopt(zmq.LINGER, 11)
        self.assertEqual(p.getsockopt(zmq.LINGER), 11)
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_attr(self):
        """set setting/getting sockopts as attributes"""
        s = self.context.socket(zmq.DEALER)
        self.sockets.append(s)
        linger = 10
        s.linger = linger
        self.assertEqual(linger, s.linger)
        self.assertEqual(linger, s.getsockopt(zmq.LINGER))
        self.assertEqual(s.fd, s.getsockopt(zmq.FD))
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_term_hang(self):
        rep,req = self.create_bound_pair(zmq.ROUTER, zmq.DEALER)
        req.setsockopt(zmq.LINGER, 0)
        req.send(b'hello', copy=False)
        req.close()
        rep.close()
        self.context.term()
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_default_mq_args(self):
        self.device = dev = devices.ThreadMonitoredQueue(zmq.ROUTER, zmq.DEALER, zmq.PUB)
        dev.setsockopt_in(zmq.LINGER, 0)
        dev.setsockopt_out(zmq.LINGER, 0)
        dev.setsockopt_mon(zmq.LINGER, 0)
        # this will raise if default args are wrong
        dev.start()
        self.teardown_device()
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def create_bound_pair(self, type1=zmq.PAIR, type2=zmq.PAIR, interface='tcp://127.0.0.1'):
        """Create a bound socket pair using a random port."""
        s1 = self.context.socket(type1)
        s1.setsockopt(zmq.LINGER, 0)
        port = s1.bind_to_random_port(interface)
        s2 = self.context.socket(type2)
        s2.setsockopt(zmq.LINGER, 0)
        s2.connect('%s:%s' % (interface, port))
        self.sockets.extend([s1,s2])
        return s1, s2
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_bad_sockopts(self):
        """Test that appropriate errors are raised on bad socket options"""
        s = self.context.socket(zmq.PUB)
        self.sockets.append(s)
        s.setsockopt(zmq.LINGER, 0)
        # unrecognized int sockopts pass through to libzmq, and should raise EINVAL
        self.assertRaisesErrno(zmq.EINVAL, s.setsockopt, 9999, 5)
        self.assertRaisesErrno(zmq.EINVAL, s.getsockopt, 9999)
        # but only int sockopts are allowed through this way, otherwise raise a TypeError
        self.assertRaises(TypeError, s.setsockopt, 9999, b"5")
        # some sockopts are valid in general, but not on every socket:
        self.assertRaisesErrno(zmq.EINVAL, s.setsockopt, zmq.SUBSCRIBE, b'hi')
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_attr(self):
        """set setting/getting sockopts as attributes"""
        s = self.context.socket(zmq.DEALER)
        self.sockets.append(s)
        linger = 10
        s.linger = linger
        self.assertEqual(linger, s.linger)
        self.assertEqual(linger, s.getsockopt(zmq.LINGER))
        self.assertEqual(s.fd, s.getsockopt(zmq.FD))
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_term_hang(self):
        rep,req = self.create_bound_pair(zmq.ROUTER, zmq.DEALER)
        req.setsockopt(zmq.LINGER, 0)
        req.send(b'hello', copy=False)
        req.close()
        rep.close()
        self.context.term()
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_sockopts(self):
        """setting socket options with ctx attributes"""
        ctx = self.Context()
        ctx.linger = 5
        self.assertEqual(ctx.linger, 5)
        s = ctx.socket(zmq.REQ)
        self.assertEqual(s.linger, 5)
        self.assertEqual(s.getsockopt(zmq.LINGER), 5)
        s.close()
        # check that subscribe doesn't get set on sockets that don't subscribe:
        ctx.subscribe = b''
        s = ctx.socket(zmq.REQ)
        s.close()

        ctx.term()
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_default_mq_args(self):
        self.device = dev = devices.ThreadMonitoredQueue(zmq.ROUTER, zmq.DEALER, zmq.PUB)
        dev.setsockopt_in(zmq.LINGER, 0)
        dev.setsockopt_out(zmq.LINGER, 0)
        dev.setsockopt_mon(zmq.LINGER, 0)
        # this will raise if default args are wrong
        dev.start()
        self.teardown_device()
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_bad_sockopts(self):
        """Test that appropriate errors are raised on bad socket options"""
        s = self.context.socket(zmq.PUB)
        self.sockets.append(s)
        s.setsockopt(zmq.LINGER, 0)
        # unrecognized int sockopts pass through to libzmq, and should raise EINVAL
        self.assertRaisesErrno(zmq.EINVAL, s.setsockopt, 9999, 5)
        self.assertRaisesErrno(zmq.EINVAL, s.getsockopt, 9999)
        # but only int sockopts are allowed through this way, otherwise raise a TypeError
        self.assertRaises(TypeError, s.setsockopt, 9999, b"5")
        # some sockopts are valid in general, but not on every socket:
        self.assertRaisesErrno(zmq.EINVAL, s.setsockopt, zmq.SUBSCRIBE, b'hi')
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_sockopt_roundtrip(self):
        "test set/getsockopt roundtrip."
        p = self.context.socket(zmq.PUB)
        self.sockets.append(p)
        p.setsockopt(zmq.LINGER, 11)
        self.assertEqual(p.getsockopt(zmq.LINGER), 11)
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_attr(self):
        """set setting/getting sockopts as attributes"""
        s = self.context.socket(zmq.DEALER)
        self.sockets.append(s)
        linger = 10
        s.linger = linger
        self.assertEqual(linger, s.linger)
        self.assertEqual(linger, s.getsockopt(zmq.LINGER))
        self.assertEqual(s.fd, s.getsockopt(zmq.FD))
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_term_hang(self):
        rep,req = self.create_bound_pair(zmq.ROUTER, zmq.DEALER)
        req.setsockopt(zmq.LINGER, 0)
        req.send(b'hello', copy=False)
        req.close()
        rep.close()
        self.context.term()
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_default_mq_args(self):
        self.device = dev = devices.ThreadMonitoredQueue(zmq.ROUTER, zmq.DEALER, zmq.PUB)
        dev.setsockopt_in(zmq.LINGER, 0)
        dev.setsockopt_out(zmq.LINGER, 0)
        dev.setsockopt_mon(zmq.LINGER, 0)
        # this will raise if default args are wrong
        dev.start()
        self.teardown_device()
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def create_bound_pair(self, type1=zmq.PAIR, type2=zmq.PAIR, interface='tcp://127.0.0.1'):
        """Create a bound socket pair using a random port."""
        s1 = self.context.socket(type1)
        s1.setsockopt(zmq.LINGER, 0)
        port = s1.bind_to_random_port(interface)
        s2 = self.context.socket(type2)
        s2.setsockopt(zmq.LINGER, 0)
        s2.connect('%s:%s' % (interface, port))
        self.sockets.extend([s1,s2])
        return s1, s2
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_bad_sockopts(self):
        """Test that appropriate errors are raised on bad socket options"""
        s = self.context.socket(zmq.PUB)
        self.sockets.append(s)
        s.setsockopt(zmq.LINGER, 0)
        # unrecognized int sockopts pass through to libzmq, and should raise EINVAL
        self.assertRaisesErrno(zmq.EINVAL, s.setsockopt, 9999, 5)
        self.assertRaisesErrno(zmq.EINVAL, s.getsockopt, 9999)
        # but only int sockopts are allowed through this way, otherwise raise a TypeError
        self.assertRaises(TypeError, s.setsockopt, 9999, b"5")
        # some sockopts are valid in general, but not on every socket:
        self.assertRaisesErrno(zmq.EINVAL, s.setsockopt, zmq.SUBSCRIBE, b'hi')
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_attr(self):
        """set setting/getting sockopts as attributes"""
        s = self.context.socket(zmq.DEALER)
        self.sockets.append(s)
        linger = 10
        s.linger = linger
        self.assertEqual(linger, s.linger)
        self.assertEqual(linger, s.getsockopt(zmq.LINGER))
        self.assertEqual(s.fd, s.getsockopt(zmq.FD))
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_term_hang(self):
        rep,req = self.create_bound_pair(zmq.ROUTER, zmq.DEALER)
        req.setsockopt(zmq.LINGER, 0)
        req.send(b'hello', copy=False)
        req.close()
        rep.close()
        self.context.term()
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_sockopts(self):
        """setting socket options with ctx attributes"""
        ctx = self.Context()
        ctx.linger = 5
        self.assertEqual(ctx.linger, 5)
        s = ctx.socket(zmq.REQ)
        self.assertEqual(s.linger, 5)
        self.assertEqual(s.getsockopt(zmq.LINGER), 5)
        s.close()
        # check that subscribe doesn't get set on sockets that don't subscribe:
        ctx.subscribe = b''
        s = ctx.socket(zmq.REQ)
        s.close()

        ctx.term()
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_default_mq_args(self):
        self.device = dev = devices.ThreadMonitoredQueue(zmq.ROUTER, zmq.DEALER, zmq.PUB)
        dev.setsockopt_in(zmq.LINGER, 0)
        dev.setsockopt_out(zmq.LINGER, 0)
        dev.setsockopt_mon(zmq.LINGER, 0)
        # this will raise if default args are wrong
        dev.start()
        self.teardown_device()
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_bad_sockopts(self):
        """Test that appropriate errors are raised on bad socket options"""
        s = self.context.socket(zmq.PUB)
        self.sockets.append(s)
        s.setsockopt(zmq.LINGER, 0)
        # unrecognized int sockopts pass through to libzmq, and should raise EINVAL
        self.assertRaisesErrno(zmq.EINVAL, s.setsockopt, 9999, 5)
        self.assertRaisesErrno(zmq.EINVAL, s.getsockopt, 9999)
        # but only int sockopts are allowed through this way, otherwise raise a TypeError
        self.assertRaises(TypeError, s.setsockopt, 9999, b"5")
        # some sockopts are valid in general, but not on every socket:
        self.assertRaisesErrno(zmq.EINVAL, s.setsockopt, zmq.SUBSCRIBE, b'hi')
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_sockopt_roundtrip(self):
        "test set/getsockopt roundtrip."
        p = self.context.socket(zmq.PUB)
        self.sockets.append(p)
        p.setsockopt(zmq.LINGER, 11)
        self.assertEqual(p.getsockopt(zmq.LINGER), 11)
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_attr(self):
        """set setting/getting sockopts as attributes"""
        s = self.context.socket(zmq.DEALER)
        self.sockets.append(s)
        linger = 10
        s.linger = linger
        self.assertEqual(linger, s.linger)
        self.assertEqual(linger, s.getsockopt(zmq.LINGER))
        self.assertEqual(s.fd, s.getsockopt(zmq.FD))
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_term_hang(self):
        rep,req = self.create_bound_pair(zmq.ROUTER, zmq.DEALER)
        req.setsockopt(zmq.LINGER, 0)
        req.send(b'hello', copy=False)
        req.close()
        rep.close()
        self.context.term()