Python gevent 模块,wait() 实例源码

我们从Python开源项目中,提取了以下38个代码示例,用于说明如何使用gevent.wait()

项目:Dallinger    作者:Dallinger    | 项目源码 | 文件源码
def test_run(self, chat):
        client = Mock()

        chat.pubsub = Mock()
        chat.pubsub.channels = ['quorum']
        chat.pubsub.listen.return_value = [{
            'type': 'message',
            'channel': 'quorum',
            'data': 'Calloo! Callay!',
        }]
        chat.greenlet = Mock()

        chat.subscribe(client, 'quorum')
        chat.run()

        gevent.wait()  # wait for event loop
        client.send.assert_called_once_with('quorum:Calloo! Callay!')
项目:raiden    作者:raiden-network    | 项目源码 | 文件源码
def stop_and_wait(self):
        # Stop handling incoming packets, but don't close the socket. The
        # socket can only be safely closed after all outgoing tasks are stopped
        self.transport.stop_accepting()

        # Stop processing the outgoing queues
        self.event_stop.set()
        gevent.wait(self.greenlets)

        # All outgoing tasks are stopped. Now it's safe to close the socket. At
        # this point there might be some incoming message being processed,
        # keeping the socket open is not useful for these.
        self.transport.stop()

        # Set all the pending results to False
        for waitack in self.senthashes_to_states.itervalues():
            waitack.async_result.set(False)
项目:raiden    作者:raiden-network    | 项目源码 | 文件源码
def send_and_wait(self, recipient, message, timeout):
        """ Send `message` to `recipient` and wait for the response or `timeout`.

        Args:
            recipient (address): The address of the node that will receive the
                message.
            message: The transfer message.
            timeout (float): How long should we wait for a response from `recipient`.

        Returns:
            None: If the wait timed out
            object: The result from the event
        """
        if not isaddress(recipient):
            raise ValueError('recipient is not a valid address.')

        self.protocol.send_and_wait(recipient, message, timeout)
项目:raiden    作者:raiden-network    | 项目源码 | 文件源码
def test_leaving(raiden_network, token_addresses):
    token_address = token_addresses[0]
    connection_managers = [
        app.raiden.connection_manager_for_token(token_address) for app in raiden_network
    ]

    all_channels = list(
        itertools.chain.from_iterable(
            connection_manager.receiving_channels for connection_manager in connection_managers
        )
    )

    leaving_async = [
        app.raiden.leave_all_token_networks_async() for app in raiden_network[1:]
    ]

    gevent.wait(leaving_async, timeout=50)

    assert not connection_managers[0].receiving_channels
    assert all(
        channel.state == CHANNEL_STATE_SETTLED
        for channel in all_channels
    )
项目:easyblogger    作者:raghur    | 项目源码 | 文件源码
def runner(args):
    if args.command == "file":
        jobs = []
        files = frozenset()
        for fn in args.file:
            files = files.union(glob.iglob(fn))
        logger.info("Processing files: %s", files)
        for f in files:
            argsCopy = copy.deepcopy(args)
            with open(f, "r", newline="\n") as fh:
                argsCopy.file = fh
                contentArgs = ContentArgParser(fh)
                contentArgs.updateArgs(argsCopy)
                logger.debug("Updated args: %s", argsCopy)
                jobs.append(gevent.spawn(processItem, argsCopy, contentArgs))
        gevent.wait(jobs)
        return 0
    else:
        return processItem(args)
项目:herald    作者:helpshift    | 项目源码 | 文件源码
def main():
    parser = argparse.ArgumentParser(description="Haproxy agent check service")
    parser.add_argument("-c", "--config",
                        default="/etc/herald/config.yml",
                        type=str,
                        help="path to yaml configuraion file")
    parser.add_argument("-b", "--bind",
                        default='0.0.0.0',
                        type=str,
                        help="listen address")
    parser.add_argument("-p", "--port",
                        default=5555,
                        type=int,
                        help="listen port")
    parser.add_argument("-l", "--loglevel",
                        default='info',
                        choices=['info', 'warn', 'debug', 'critical'],
                        type=str,
                        help="set logging level")

    args = parser.parse_args()
    setup_logging(args)

    config = load_configuration(args.config)
    all_plugins = load_all_plugins(config['plugins_dir'])
    plugin = load_plugin(all_plugins, config['plugins'])
    start_plugin(plugin)

    server = start_server(args, config, plugin)
    setup_handlers(server, plugin)
    gevent.wait()
项目:mist.api    作者:mistio    | 项目源码 | 文件源码
def amqp_consume(self):
        """Block on AMQP channel messages until an exception raises"""
        log.info("%s: Starting AMQP consumer.", self.lbl)
        try:
            while True:
                self.chan.wait()
        except BaseException as exc:
            log.error("%s: AMQP consumer exception %r, stopping.",
                      self.lbl, exc)
            self.close_chan()
            self.close_conn()
项目:mist.api    作者:mistio    | 项目源码 | 文件源码
def main(args=None, workers=None, client=EchoHubClient, worker_kwargs=None):
    gevent.monkey.patch_all()
    args = args if args else prepare_argparse().parse_args()
    prepare_logging(args.verbose or 1)

    if args.mode == 'server':
        hub = HubServer(workers=workers)
    elif args.mode == 'client':
        hub = client(worker_kwargs=worker_kwargs)
    else:
        raise Exception("Unknown mode '%s'." % args.mode)

    def sig_handler(sig=None, frame=None):
        log.warning("Hub process received SIGTERM/SIGINT")
        hub.stop()
        log.info("Sig handler completed.")

    gevent.signal(signal.SIGTERM, sig_handler)
    gevent.signal(signal.SIGINT, sig_handler)  # KeyboardInterrupt also

    hub.start()
    gevent.wait()
项目:Dallinger    作者:Dallinger    | 项目源码 | 文件源码
def chat(sockets, pubsub):
    chat = sockets.ChatBackend()
    chat.pubsub = pubsub

    yield chat

    gevent.wait()
项目:Dallinger    作者:Dallinger    | 项目源码 | 文件源码
def test_heartbeat(self, chat, sockets):
        client = Mock()
        client.closed = False
        chat.send = Mock()
        sockets.HEARTBEAT_DELAY = 1

        gevent.spawn(chat.heartbeat, client)
        gevent.sleep(2)
        client.closed = True
        gevent.wait()

        chat.send.assert_called_with(client, 'ping')
项目:raiden    作者:raiden-network    | 项目源码 | 文件源码
def retry(protocol, data, receiver_address, event_stop, timeout_backoff):
    """ Send data until it's acknowledged.

    Exits when the first of the following happen:

    - The packet is acknowledged.
    - Event_stop is set.
    - The iterator timeout_backoff runs out of values.

    Returns:
        bool: True if the message was acknowledged, False otherwise.
    """

    async_result = protocol.send_raw_with_result(
        data,
        receiver_address,
    )

    event_quit = event_first_of(
        async_result,
        event_stop,
    )

    for timeout in timeout_backoff:

        if event_quit.wait(timeout=timeout) is True:
            break

        protocol.send_raw_with_result(
            data,
            receiver_address,
        )

    return async_result.ready()
项目:raiden    作者:raiden-network    | 项目源码 | 文件源码
def wait_recovery(event_stop, event_healthy):
    event_first_of(
        event_stop,
        event_healthy,
    ).wait()

    if event_stop.is_set():
        return

    # There may be multiple threads waiting, do not restart them all at
    # once to avoid message flood.
    gevent.sleep(random.random())
项目:raiden    作者:raiden-network    | 项目源码 | 文件源码
def send_and_wait(self, receiver_address, message, timeout=None):
        """Sends a message and wait for the response ack."""
        async_result = self.send_async(receiver_address, message)
        return async_result.wait(timeout=timeout)
项目:raiden    作者:raiden-network    | 项目源码 | 文件源码
def leave_all_token_networks_async(self):
        token_addresses = self.token_to_channelgraph.keys()
        leave_results = []
        for token_address in token_addresses:
            try:
                connection_manager = self.connection_manager_for_token(token_address)
            except InvalidAddress:
                pass
            leave_results.append(connection_manager.leave_async())
        combined_result = AsyncResult()
        gevent.spawn(gevent.wait, leave_results).link(combined_result)
        return combined_result
项目:raiden    作者:raiden-network    | 项目源码 | 文件源码
def test_throughput(apps, tokens, num_transfers, amount):
    def start_transfers(curr_app, curr_token, num_transfers):
        graph = curr_app.raiden.token_to_channelgraph[curr_token]

        all_paths = graph.get_paths_of_length(
            source=curr_app.raiden.address,
            num_hops=2,
        )
        path = all_paths[0]
        target = path[-1]

        api = curr_app.raiden.api
        events = list()

        for i in range(num_transfers):
            async_result = api.transfer_async(
                curr_token,
                amount,
                target,
                1)  # TODO: fill in identifier
            events.append(async_result)

        return events

    finished_events = []

    # Start all transfers
    start_time = time.time()
    for idx, curr_token in enumerate(tokens):
        curr_app = apps[idx]
        finished = start_transfers(curr_app, curr_token, num_transfers)
        finished_events.extend(finished)

    # Wait until the transfers for all tokens are done
    gevent.wait(finished_events)
    elapsed = time.time() - start_time

    completed_transfers = num_transfers * len(tokens)
    tps = completed_transfers / elapsed
    print('Completed {} transfers {:.5} tps / {:.5}s'.format(completed_transfers, tps, elapsed))
项目:raiden    作者:raiden-network    | 项目源码 | 文件源码
def stop_and_wait(self):
        self.stop_event.set(True)
        gevent.wait(self)
项目:raiden    作者:raiden-network    | 项目源码 | 文件源码
def stop(self):
        self.stop_signal = True
        self.greenlets.append(self.echo_worker_greenlet)
        gevent.wait(self.greenlets)
项目:spymanager    作者:delete    | 项目源码 | 文件源码
def flood_to(self, subscriptions):
        jobs = [
            gevent.spawn(self._pull_from, subscription)
            for subscription in subscriptions
        ]
        gevent.wait(jobs)
项目:http_heartbeat_proxy    作者:purepy    | 项目源码 | 文件源码
def main():
    server = MixedTCPServer(LISTEN_PORT, SS_PORT)
    gevent.signal(signal.SIGTERM, server.close)
    gevent.signal(signal.SIGINT, server.close)
    server.start()
    gevent.wait()
项目:web_develop    作者:dongweiming    | 项目源码 | 文件源码
def main():
    args = sys.argv[1:]
    if len(args) != 2:
        sys.exit('Usage: %s source-address destination-address' % __file__)
    source = args[0]
    dest = parse_address(args[1])
    server = PortForwarder(source, dest)
    log('Starting port forwarder %s:%s -> %s:%s', *(server.address[:2] + dest))
    gevent.signal(signal.SIGTERM, server.close)
    gevent.signal(signal.SIGINT, server.close)
    server.start()
    gevent.wait()
项目:disco    作者:b1naryth1ef    | 项目源码 | 文件源码
def get(self, timeout=None):
        return gevent.wait(self._parts, timeout=timeout)
项目:disco    作者:b1naryth1ef    | 项目源码 | 文件源码
def wait(self, timeout=None):
        gevent.joinall(self._parts, timeout=None)
项目:easyblogger    作者:raghur    | 项目源码 | 文件源码
def main(sysargv=sys.argv):
    args = parse_args(sysargv[1:])
    print(args)
    jobs = [gevent.spawn(convert, args.file, args.stdin, i)
            for i in range(args.count)]
    gevent.wait(jobs)
项目:mist.api    作者:mistio    | 项目源码 | 文件源码
def amqp_consume(self):
        """Connect to Hub Server and set up and start AMQP consumer"""
        # define callback queue
        self.queue = self.chan.queue_declare(exclusive=True).queue
        self.chan.queue_bind(self.queue, self.exchange, self.queue)
        self.chan.basic_consume(self.queue, callback=self.amqp_handle_msg,
                                no_ack=True)
        log.debug("%s: Initialized amqp connection, channel, queue.", self.lbl)

        # send rpc request
        self.worker_id = None
        self.correlation_id = uuid.uuid4().hex
        reply_to = self.queue
        routing_key = '%s.worker.%s' % (self.key, self.worker_type)
        msg = amqp.Message(json.dumps(self.worker_kwargs),
                           correlation_id=self.correlation_id,
                           reply_to=reply_to,
                           content_type='application/json')
        self.amqp_send_msg(msg, routing_key)
        log.info("%s: sent RPC request, will wait for response.", self.lbl)

        # wait for rpc response
        try:
            while not self.worker_id:
                log.debug("%s: Waiting for RPC response.", self.lbl)
                self.chan.wait()
        except BaseException as exc:
            log.error("%s: Amqp consumer received %r while waiting for RPC "
                      "response. Stopping.", self.lbl, exc)
        log.info("%s: Finished waiting for RPC response.", self.lbl)
        super(HubClient, self).amqp_consume()
项目:mist.api    作者:mistio    | 项目源码 | 文件源码
def _send(self, command, payload=None):

        # send rpc request
        if self.correlation_id:
            raise Exception("Can't send second request while already waiting.")
        self.response = None
        self.correlation_id = uuid.uuid4().hex
        routing_key = '%s.%s' % (self.key, command)
        msg = amqp.Message(json.dumps(payload),
                           correlation_id=self.correlation_id,
                           reply_to=self.queue,
                           content_type='application/json')
        log.debug("Sending AMQP msg with routing key '%s' and body %r.",
                  routing_key, msg.body)
        self.chan.basic_publish(msg, self.exchange, routing_key)
        log.info("Sent RPC request, will wait for response.")

        # wait for rpc response
        try:
            while self.correlation_id:
                log.debug("Waiting for RPC response.")
                self.chan.wait()
        except BaseException as exc:
            log.error("Amqp consumer received %r while waiting for RPC "
                      "response. Stopping.", exc)
        log.info("Finished waiting for RPC response.")
        response = self.response
        self.response = None
        return response
项目:rill    作者:PermaData    | 项目源码 | 文件源码
def test_component_spec():
    assert GenerateTestData.get_spec() == {
        'name': 'tests.components/GenerateTestData',
        'description': '"Generates stream of packets under control of a counter',
        'inPorts': [
            {
                'addressable': False,
                'description': '',
                'id': 'wait',
                'required': False,
                'type': 'bang'
            },
            {
                'addressable': False,
                'default': 1,
                'description': 'Count of packets to be generated',
                'id': 'COUNT',
                'required': False,
                'schema': {'type': 'int'}
            }
        ],
        'outPorts': [
            {
                'addressable': False,
                'description': '',
                'id': 'done',
                'required': False,
                'type': 'bang'
            },
            {
                'addressable': False,
                'description': 'Generated stream',
                'id': 'OUT',
                'required': False,
                'schema': {'type': 'string'}
            }
        ],
        'subgraph': False
    }
项目:rill    作者:PermaData    | 项目源码 | 文件源码
def serve_runtime(runtime=None, host=DEFAULTS['host'], port=DEFAULTS['port'],
                  registry_host=DEFAULTS['registry_host'],
                  registry_port=DEFAULTS['registry_port']):

    runtime = runtime if runtime is not None else Runtime()
    address = 'ws://{}:{:d}'.format(host, port)

    def runtime_application_task():
        """
        This greenlet runs the websocket server that responds to remote commands
        that inspect/manipulate the Runtime.
        """
        print('Runtime listening at {}'.format(address))
        WebSocketRuntimeApplication.runtimes[port] = runtime
        try:
            r = geventwebsocket.Resource(
                OrderedDict([('/', WebSocketRuntimeApplication)]))
            s = geventwebsocket.WebSocketServer(('', port), r)
            s.serve_forever()
        finally:
            WebSocketRuntimeApplication.runtimes.pop(port)

    def local_registration_task():
        """
        This greenlet will run the rill registry to register the runtime with
        the ui.
        """
        from rill.registry import serve_registry
        serve_registry(registry_host, registry_port, host, port)

    tasks = [runtime_application_task, local_registration_task]

    # Start!
    gevent.wait([gevent.spawn(t) for t in tasks])
项目:pytest-vts    作者:bhodorog    | 项目源码 | 文件源码
def _yield_to_others(sleep):
    if any(
        [gevent.monkey.is_module_patched(mod)
         for mod in ["socket", "subprocess"]]):
        gevent.wait(timeout=sleep)
    else:
        time.sleep(sleep)
项目:server    作者:happypandax    | 项目源码 | 文件源码
def _run(self):
        # in_cpubound_thread is sentinel to prevent double thread dispatch
        thread_ctx = threading.local()
        thread_ctx.in_cpubound_thread = True
        try:
            self.in_async = gevent.get_hub().loop.async()
            self.in_q_has_data = gevent.event.Event()
            self.in_async.start(self.in_q_has_data.set)
            while not self.stopping:
                if not self.in_q:
                    # wait for more work
                    self.in_q_has_data.clear()
                    self.in_q_has_data.wait()
                    continue
                # arbitrary non-preemptive service discipline can go here
                # FIFO for now, but we should experiment with others
                jobid, func, args, kwargs = self.in_q.popleft()
                start_time = arrow.now()
                try:
                    with db.cleanup_session():
                        self.results[jobid] = func(*args, **kwargs)
                except Exception as e:
                    log.exception("Exception raised in cpubound_thread:")
                    self.results[jobid] = self._Caught(e)
                finished_time = arrow.now()
                run_delta = finished_time - start_time
                log.d("Function - '{}'\n".format(func.__name__),
                      "\tRunning time: {}\n".format(run_delta),
                      "\tJobs left:", len(self.in_q),
                      )
                self.out_q.append(jobid)
                self.out_async.send()
        except BaseException:
            self._error()
            # this may always halt the server process
项目:server    作者:happypandax    | 项目源码 | 文件源码
def apply(self, func, args, kwargs):
        done = gevent.event.Event()
        self.in_q.append((done, func, args, kwargs))
        while not self.in_async:
            gevent.sleep(0.01)  # poll until worker thread has initialized
        self.in_async.send()
        done.wait()
        res = self.results[done]
        del self.results[done]
        if isinstance(res, self._Caught):
            raise res.err
        return res
项目:server    作者:happypandax    | 项目源码 | 文件源码
def _notify(self):
        try:
            while not self.stopping:
                if not self.out_q:
                    # wait for jobs to complete
                    self.out_q_has_data.clear()
                    self.out_q_has_data.wait()
                    continue
                self.out_q.popleft().set()
        except BaseException:
            self._error()
项目:server    作者:happypandax    | 项目源码 | 文件源码
def get(self, block=True, timeout=None):
        if not self._value == self.NoValue:
            return self._value
        if block:
            gevent.wait([self._future], timeout)
            self._value = self._future.get()
        else:
            self._value = self._future.get(block, timeout)
        return self._value
项目:raiden    作者:raiden-network    | 项目源码 | 文件源码
def retry_with_recovery(
        protocol,
        data,
        receiver_address,
        event_stop,
        event_healthy,
        event_unhealthy,
        backoff):
    """ Send data while the node is healthy until it's acknowledged.

    Note:
        backoff must be an infinite iterator, otherwise this task will
        become a hot loop.
    """

    # The underlying unhealthy will be cleared, care must be taken to properly
    # clear stop_or_unhealthy too.
    stop_or_unhealthy = event_first_of(
        event_stop,
        event_unhealthy,
    )

    acknowledged = False
    while not event_stop.is_set() and not acknowledged:

        # Packets must not be sent to an unhealthy node, nor should the task
        # wait for it to become available if the message has been acknowledged.
        if event_unhealthy.is_set():
            wait_recovery(
                event_stop,
                event_healthy,
            )

            # Assume wait_recovery returned because unhealthy was cleared and
            # continue execution, this is safe to do because event_stop is
            # checked below.
            stop_or_unhealthy.clear()

            if event_stop.is_set():
                return

        acknowledged = retry(
            protocol,
            data,
            receiver_address,

            # retry will stop when this event is set, allowing this task to
            # wait for recovery when the node becomes unhealthy or to quit if
            # the stop event is set.
            stop_or_unhealthy,

            # Intentionally reusing backoff to restart from the last
            # timeout/number of iterations.
            backoff,
        )

    return acknowledged
项目:raiden    作者:raiden-network    | 项目源码 | 文件源码
def test_latency(apps, tokens, num_transfers, amount):
    def start_transfers(idx, curr_token, num_transfers):
        curr_app = apps[idx]
        graph = curr_app.raiden.token_to_channelgraph[curr_token]

        all_paths = graph.get_paths_of_length(
            source=curr_app.raiden.address,
            num_hops=2,
        )
        path = all_paths[0]
        target = path[-1]

        finished = gevent.event.Event()

        def _transfer():
            api = curr_app.raiden.api
            for i in range(num_transfers):
                async_result = api.transfer_async(
                    curr_token,
                    amount,
                    target,
                    1  # TODO: fill in identifier
                )
                async_result.wait()

            finished.set()

        gevent.spawn(_transfer)
        return finished

    finished_events = []

    # Start all transfers
    start_time = time.time()
    for idx, curr_token in enumerate(tokens):
        finished = start_transfers(idx, curr_token, num_transfers)
        finished_events.append(finished)

    # Wait until the transfers for all tokens are done
    gevent.wait(finished_events)
    elapsed = time.time() - start_time
    completed_transfers = num_transfers * len(tokens)

    tps = completed_transfers / elapsed
    print('Completed {} transfers. tps:{:.5} latency:{:.5} time:{:.5}s'.format(
        completed_transfers,
        tps,
        elapsed / completed_transfers,
        elapsed,
    ))
项目:easyblogger    作者:raghur    | 项目源码 | 文件源码
def processItem(args, contentArgs=None):
    blogger = EasyBlogger(args.clientid, args.secret, args.blogid,
                          args.url)
    try:
        if args.command == "post":
            newPost = blogger.post(args.title,
                                   args.content or args.file,
                                   args.labels,
                                   args.filters,
                                   isDraft=not args.publish,
                                   fmt=args.format)
            postId = newPost['id']
            logger.debug("Created post: %s", postId)
            if contentArgs:
                contentArgs.updateFileWithPostId(postId)
            print(newPost['url'])

        if args.command == 'delete':
            logger.debug("Deleting post: %s", args.postIds)
            for postId in args.postIds:
                blogger.deletePost(postId)

        if args.command == 'update':
            logger.debug("Updating post: %s", args.postId)
            updated = blogger.updatePost(
                args.postId,
                args.title,
                args.content or args.file,
                args.labels,
                args.filters,
                isDraft=not args.publish,
                fmt=args.format)
            print(updated['url'])

        if args.command == "get":
            if args.postId:
                posts = blogger.getPosts(postId=args.postId)
            elif args.query:
                posts = blogger.getPosts(
                    query=args.query,
                    maxResults=args.count)
            elif args.u:
                posts = blogger.getPosts(
                    url=args.u)
            else:
                posts = blogger.getPosts(
                    labels=args.labels,
                    maxResults=args.count)
            jobs = [gevent.spawn(printPosts,
                                 item, args.fields, args.doc, args.tofiles)
                    for item in posts]
            gevent.wait(jobs)
    except AccessTokenRefreshError:
        # The AccessTokenRefreshError exception is raised if the credentials
        # have been revoked by the user or they have expired.
        print('The credentials have been revoked or expired, please re-run'
              ' the application to re-authorize')
        return -1
    return 0
项目:lalascan    作者:blackye    | 项目源码 | 文件源码
def _wait(self):
        gevent.wait()
项目:birdseye-server    作者:DramaticLiberty    | 项目源码 | 文件源码
def main(filename):
    print('Importing {}'.format(filename))
    group_singular = {
        'conifers': [
            'conifer', 'plant', 'land plant', 'botany'],
        'reptiles': [
            'reptile', 'animal', 'cold blood', 'cold bloded', 'vertebrate',
            'fauna'],
        'turtles (non-marine)': [
            'turtle', 'animal', 'non-marine', 'cold blood', 'cold bloded',
            'vertebrate', 'fauna'],
        'butterflies': [
            'butterfly', 'animal', 'insect', 'moths and butterflies', 'fauna',
            'invertebrate'],
        'dragonflies': [
            'dragonfly', 'animal', 'insect', 'dragonflies and damseflies',
            'invertebrate', 'fauna'],
        'mammals': [
            'mammal', 'animal', 'warm blood', 'warm blooded', 'vertebrate',
            'fauna'],
        'birds': [
            'bird', 'animal', 'warm blood', 'warm blooded', 'vertebrate',
            'fauna'],
        'amphibians': [
            'amfibian', 'animal', 'vertebrate', 'fauna'],
        'sphingid moths': [
            'sphingid moth', 'moth', 'animal', 'insect', 'invertebrate',
            'fauna', 'moths and butterflies'],
        'bumblebees': [
            'bumblebee', 'bee', 'bees', 'animal', 'insect', 'invertebrate'],
    }
    with open(filename, newline='') as f:
        count = 0
        # "Scientific Name","Common Name","Family","Taxonomic Group"
        for row in csv.reader(f, delimiter=',', quotechar='"'):
            count += 1
            common = row[1]
            if common == 'null':
                common = row[0]
            gevent.spawn(
                post_species, row[0], common,
                [row[2], row[3]] + group_singular[row[3].lower()])
            if count >= 100:
                gevent.wait()
                count = 0
    gevent.wait()
    return 0
项目:rill    作者:PermaData    | 项目源码 | 文件源码
def test_pickle(graph):
    graph.add_component("Generate", GenerateTestData, COUNT=5)
    passthru = graph.add_component("Pass", SlowPass, DELAY=0.1)
    count = graph.add_component("Counter", Counter)
    dis1 = graph.add_component("Discard1", Discard)
    dis2 = graph.add_component("Discard2", Discard)

    graph.connect("Generate.OUT", "Pass.IN")
    graph.connect("Pass.OUT", "Counter.IN")
    graph.connect("Counter.COUNT", "Discard1.IN")
    graph.connect("Counter.OUT", "Discard2.IN")

    net = Network(graph)
    netrunner = gevent.spawn(net.go)
    try:
        with gevent.Timeout(.35) as timeout:
            gevent.wait([netrunner])
    except gevent.Timeout:
        print(count.execute)

    assert count.count == 4
    assert dis2.values == ['000005', '000004', '000003', '000002']

    import pickle
    # dump before terminating to get the runner statuses
    data = pickle.dumps(net)
    # FIXME: do we need to auto-terminate inside wait_for_all if there is an error?
    net.terminate()
    net.wait_for_all()
    # gevent.wait([netrunner])  # this causes more packets to be sent. no good.
    net2 = pickle.loads(data)
    assert net2.graph.component('Counter').count == 4
    assert net2.graph.component('Discard2').values == ['000005', '000004', '000003', '000002']
    net2.go(resume=True)
    assert net2.graph.component('Counter').count == 5
    assert net2.graph.component('Discard2').values == ['000005', '000004', '000003', '000002', '000001']

    # FIXME: test the case where a packet is lost due to being shut-down
    # packet counting should catch it.  to test, use this component, which
    # can be killed while it sleeps holding a packet:
    # @component
    # @outport("OUT")
    # @inport("IN")
    # @inport("DELAY", type=float, required=True)
    # def SlowPass(IN, DELAY, OUT):
    #     """
    #     Pass a stream of packets to an output stream with a delay between packets
    #     """
    #     delay = DELAY.receive_once()
    #     for p in IN:
    #         time.sleep(delay)
    #         OUT.send(p)