我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用itertools.cycle()。
def __init__(self, msShowTimeBetweenSlides=1500): # initialize tkinter super class Tk.__init__(self) # time each slide will be shown self.showTime = msShowTimeBetweenSlides # look for images in current working directory where this module lives chapter_folder = path.realpath(path.dirname(__file__)) resources_folder = path.join(chapter_folder, 'Resources') listOfSlides = [slide for slide in listdir(resources_folder) if slide.endswith('gif') or slide.endswith('jpg')] # endlessly read in the slides so we can show them on the tkinter Label chdir(resources_folder) self.iterableCycle = cycle((ImageTk.PhotoImage(file=slide), slide) for slide in listOfSlides) # create tkinter Label widget which can also display images self.slidesLabel = Label(self) # create the Frame widget self.slidesLabel.pack()
def pieces_under_over(path, segs_to_points, xings): """Produce all the pieces of the path, with a bool indicating if each leads to under or over.""" pieces = list(path_pieces(path, segs_to_points)) for i, piece in enumerate(pieces): xing = xings.get(piece[-1]) if xing is None: continue if xing.under is not None: over = (xing.under != path) else: assert xing.over is not None over = (xing.over == path) ou = [over, not over] if i % 2: ou = ou[::-1] break else: ou = [True, False] yield from zip(pieces, itertools.cycle(ou))
def test_jitter(self): # Jitter cycles between +1 & -1 times time delta jitter_iter = itertools.cycle((1, -1)) def jitter(dt): return dt * next(jitter_iter) physics = particle.PhysicsJitter(y=2, jitter=jitter) p = particle.Particle(0, 0) self.assertEquals(p.y, 0) # Jitter one second; uses jitter value 1 * 1 second * scale 2, +2 to y physics(1, p) self.assertEquals(p.y, 2) # Jitter 2s; uses jitter value -1 * 2 second * scale 2, -4 to y physics(2, p) self.assertEquals(p.y, -2) # Jitter 5s; uses jitter value 1 * 5 second * scale 2, +10 to y physics(5, p) self.assertEquals(p.y, 8)
def __init__(self, console_or_stream=None, **kwargs): if isinstance(console_or_stream, Console): self.console = console_or_stream else: self.console = Console(console_or_stream) self.template = kwargs.get('template', self.TEMPLATE) self.progress_template = \ kwargs.get('progress_template', self.PROGRESS_TEMPLATE) self.progress_brick = kwargs.get('progress_brick', self.PROGRESS_BRICK) self.progress_num_bricks = \ kwargs.get('progress_num_bricks', self.PROGRESS_NUM_BRICKS) self.percent_template = \ kwargs.get('percent_template', self.PERCENT_TEMPLATE) self.current_template = \ kwargs.get('current_template', self.CURRENT_TEMPLATE) self.total_template = \ kwargs.get('total_template', self.TOTAL_TEMPLATE) self.eta_template = kwargs.get('eta_template', self.ETA_TEMPLATE) self.screw_template = kwargs.get('screw_template', self.SCREW_TEMPLATE) self.refresh_every = kwargs.get('refresh_every', self.REFRESH_EVERY) self.line_template = self.build_line_template() self.start_datetime = None self.screw_cycle = itertools.cycle(('|', '/', '-', '\\'))
def _match_label_with_color(label, colors, bg_label, bg_color): """Return `unique_labels` and `color_cycle` for label array and color list. Colors are cycled for normal labels, but the background color should only be used for the background. """ # Temporarily set background color; it will be removed later. if bg_color is None: bg_color = (0, 0, 0) bg_color = _rgb_vector([bg_color]) unique_labels = list(set(label.flat)) # Ensure that the background label is in front to match call to `chain`. if bg_label in unique_labels: unique_labels.remove(bg_label) unique_labels.insert(0, bg_label) # Modify labels and color cycle so background color is used only once. color_cycle = itertools.cycle(colors) color_cycle = itertools.chain(bg_color, color_cycle) return unique_labels, color_cycle
def __init__(self, pid: int, name: str, parser, fields: [list, str]): """ A class representing an FMSPID and its conversion :param pid: The PID :param name: A friendly name of this PID :param parser: a function that turns a byte array of at most length 8 into one or more readable values :param fields: the friendly name of the outputs of this PID eg. 'RPM' """ self.pid = pid self.name = name self.parser = parser if type(fields) is str: self.fields = [fields] else: self.fields = fields self.fieldnames = ['{} ({})'.format(name, unit) for pid_name, unit in zip(cycle([self.name]), self.fields)]
def insert_and_validate_list_results(self, reverse, slowdown): """ This utility method will execute submit various statements for execution using the ConcurrentExecutorListResults, then invoke a separate thread to execute the callback associated with the futures registered for those statements. The parameters will toggle various timing, and ordering changes. Finally it will validate that the results were returned in the order they were submitted :param reverse: Execute the callbacks in the opposite order that they were submitted :param slowdown: Cause intermittent queries to perform slowly """ our_handler = MockResponseResponseFuture(reverse=reverse) mock_session = Mock() statements_and_params = zip(cycle(["INSERT INTO test3rf.test (k, v) VALUES (%s, 0)"]), [(i, ) for i in range(100)]) mock_session.execute_async.return_value = our_handler t = TimedCallableInvoker(our_handler, slowdown=slowdown) t.start() results = execute_concurrent(mock_session, statements_and_params) while(not our_handler.pending_callbacks.empty()): time.sleep(.01) t.stop() self.validate_result_ordering(results)
def insert_and_validate_list_generator(self, reverse, slowdown): """ This utility method will execute submit various statements for execution using the ConcurrentExecutorGenResults, then invoke a separate thread to execute the callback associated with the futures registered for those statements. The parameters will toggle various timing, and ordering changes. Finally it will validate that the results were returned in the order they were submitted :param reverse: Execute the callbacks in the opposite order that they were submitted :param slowdown: Cause intermittent queries to perform slowly """ our_handler = MockResponseResponseFuture(reverse=reverse) mock_session = Mock() statements_and_params = zip(cycle(["INSERT INTO test3rf.test (k, v) VALUES (%s, 0)"]), [(i, ) for i in range(100)]) mock_session.execute_async.return_value = our_handler t = TimedCallableInvoker(our_handler, slowdown=slowdown) t.start() try: results = execute_concurrent(mock_session, statements_and_params, results_generator=True) self.validate_result_ordering(results) finally: t.stop()
def test_wrap_round_robin(self): cluster = Mock(spec=Cluster) cluster.metadata = Mock(spec=Metadata) hosts = [Host(str(i), SimpleConvictionPolicy) for i in range(4)] for host in hosts: host.set_up() def get_replicas(keyspace, packed_key): index = struct.unpack('>i', packed_key)[0] return list(islice(cycle(hosts), index, index + 2)) cluster.metadata.get_replicas.side_effect = get_replicas policy = TokenAwarePolicy(RoundRobinPolicy()) policy.populate(cluster, hosts) for i in range(4): query = Statement(routing_key=struct.pack('>i', i), keyspace='keyspace_name') qplan = list(policy.make_query_plan(None, query)) replicas = get_replicas(None, struct.pack('>i', i)) other = set(h for h in hosts if h not in replicas) self.assertEqual(replicas, qplan[:2]) self.assertEqual(other, set(qplan[2:])) # Should use the secondary policy for i in range(4): qplan = list(policy.make_query_plan()) self.assertEqual(set(qplan), set(hosts))
def test_execute_concurrent(self): for num_statements in (0, 1, 2, 7, 10, 99, 100, 101, 199, 200, 201): # write statement = SimpleStatement( "INSERT INTO test3rf.test (k, v) VALUES (%s, %s)", consistency_level=ConsistencyLevel.QUORUM) statements = cycle((statement, )) parameters = [(i, i) for i in range(num_statements)] results = self.execute_concurrent_helper(self.session, list(zip(statements, parameters))) self.assertEqual(num_statements, len(results)) for success, result in results: self.assertTrue(success) self.assertFalse(result) # read statement = SimpleStatement( "SELECT v FROM test3rf.test WHERE k=%s", consistency_level=ConsistencyLevel.QUORUM) statements = cycle((statement, )) parameters = [(i, ) for i in range(num_statements)] results = self.execute_concurrent_helper(self.session, list(zip(statements, parameters))) self.assertEqual(num_statements, len(results)) self.assertEqual([(True, [(i,)]) for i in range(num_statements)], results)
def test_no_raise_on_first_failure(self): statement = SimpleStatement( "INSERT INTO test3rf.test (k, v) VALUES (%s, %s)", consistency_level=ConsistencyLevel.QUORUM) statements = cycle((statement, )) parameters = [(i, i) for i in range(100)] # we'll get an error back from the server parameters[57] = ('efefef', 'awefawefawef') results = execute_concurrent(self.session, list(zip(statements, parameters)), raise_on_first_error=False) for i, (success, result) in enumerate(results): if i == 57: self.assertFalse(success) self.assertIsInstance(result, InvalidRequest) else: self.assertTrue(success) self.assertFalse(result)
def test_no_raise_on_first_failure_client_side(self): statement = SimpleStatement( "INSERT INTO test3rf.test (k, v) VALUES (%s, %s)", consistency_level=ConsistencyLevel.QUORUM) statements = cycle((statement, )) parameters = [(i, i) for i in range(100)] # the driver will raise an error when binding the params parameters[57] = 1 results = execute_concurrent(self.session, list(zip(statements, parameters)), raise_on_first_error=False) for i, (success, result) in enumerate(results): if i == 57: self.assertFalse(success) self.assertIsInstance(result, TypeError) else: self.assertTrue(success) self.assertFalse(result)
def make_query_plan(self, working_keyspace=None, query=None): # not thread-safe, but we don't care much about lost increments # for the purposes of load balancing pos = self._position self._position += 1 local_live = self._dc_live_hosts.get(self.local_dc, ()) pos = (pos % len(local_live)) if local_live else 0 for host in islice(cycle(local_live), pos, pos + len(local_live)): yield host # the dict can change, so get candidate DCs iterating over keys of a copy other_dcs = [dc for dc in self._dc_live_hosts.copy().keys() if dc != self.local_dc] for dc in other_dcs: remote_live = self._dc_live_hosts.get(dc, ()) for host in remote_live[:self.used_hosts_per_remote_dc]: yield host
def add_plot_args(parser): parser.add_argument('-d', '--data', help="""Append a PATH to a privcount tallies.json file, and the LABEL we should use for the graph legend for this set of experimental results""", metavar=("PATH", "LABEL"), nargs=2, required="True", action=PlotDataAction, dest="experiments") parser.add_argument('-p', '--prefix', help="a STRING filename prefix for graphs we generate", metavar="STRING", action="store", dest="prefix", default=None) parser.add_argument('-f', '--format', help="""A comma-separated LIST of color/line format strings to cycle to matplotlib's plot command (see matplotlib.pyplot.plot)""", metavar="LIST", action="store", dest="lineformats", default=LINEFORMATS)
def _copy_from(self, curs, nrecs, srec, copykw): f = StringIO() for i, c in zip(range(nrecs), cycle(string.ascii_letters)): l = c * srec f.write("%s\t%s\n" % (i, l)) f.seek(0) curs.copy_from(MinimalRead(f), "tcopy", **copykw) curs.execute("select count(*) from tcopy") self.assertEqual(nrecs, curs.fetchone()[0]) curs.execute("select data from tcopy where id < %s order by id", (len(string.ascii_letters),)) for i, (l,) in enumerate(curs): self.assertEqual(l, string.ascii_letters[i] * srec)
def _copy_from(self, curs, nrecs, srec, copykw): f = StringIO() for i, c in izip(xrange(nrecs), cycle(string.ascii_letters)): l = c * srec f.write("%s\t%s\n" % (i, l)) f.seek(0) curs.copy_from(MinimalRead(f), "tcopy", **copykw) curs.execute("select count(*) from tcopy") self.assertEqual(nrecs, curs.fetchone()[0]) curs.execute("select data from tcopy where id < %s order by id", (len(string.ascii_letters),)) for i, (l,) in enumerate(curs): self.assertEqual(l, string.ascii_letters[i] * srec)
def testLineBuffering(self): """ Test creating a LineBuffer and feeding it some lines. The lines should build up in its internal buffer for a while and then get spat out to the writer. """ output = [] input = iter(itertools.cycle(['012', '345', '6', '7', '8', '9'])) c = pop3._IteratorBuffer(output.extend, input, 6) i = iter(c) self.assertEquals(output, []) # nothing is buffer i.next() self.assertEquals(output, []) # '012' is buffered i.next() self.assertEquals(output, []) # '012345' is buffered i.next() self.assertEquals(output, ['012', '345', '6']) # nothing is buffered for n in range(5): i.next() self.assertEquals(output, ['012', '345', '6', '7', '8', '9', '012', '345'])
def slidesCallback(self): # get next slide from iterable cycle currentInstance, nameOfSlide = next(self.iterableCycle) # assign next slide to Label widget self.slidesLabel.config(image=currentInstance) # update Window title with current slide self.title(nameOfSlide) # recursively repeat the Show self.after(self.showTime, self.slidesCallback) #================================= # Start GUI #=================================
def __init__(self, msShowTimeBetweenSlides=1500): # initialize tkinter super class Tk.__init__(self) # time each slide will be shown self.showTime = msShowTimeBetweenSlides # look for images in current working directory where this module lives chapter_folder = path.realpath(path.dirname(__file__)) resources_folder = path.join(chapter_folder, 'Resources') listOfSlides = [slide for slide in listdir(resources_folder) if slide.endswith('gif')] # endlessly read in the slides so we can show them on the tkinter Label chdir(resources_folder) self.iterableCycle = cycle((PhotoImage(file=slide), slide) for slide in listOfSlides) # create tkinter Label widget which can also display images self.slidesLabel = Label(self) # create the Frame widget self.slidesLabel.pack()
def __init__(self, msShowTimeBetweenSlides=1500): # initialize tkinter super class Tk.__init__(self) # time each slide will be shown self.showTime = msShowTimeBetweenSlides # look for images in current working directory where this module lives # try: .jpeg chapter_folder = path.realpath(path.dirname(__file__)) resources_folder = path.join(chapter_folder, 'Resources') listOfSlides = [slide for slide in listdir(resources_folder) if slide.endswith('gif') or slide.endswith('jpg')] # endlessly read in the slides so we can show them on the tkinter Label chdir(resources_folder) self.iterableCycle = cycle((PhotoImage(file=slide), slide) for slide in listOfSlides) # create tkinter Label widget which can also display images self.slidesLabel = Label(self) # create the Frame widget self.slidesLabel.pack()
def get_api_servers(): """Return iterator of glance api_servers. Return iterator of glance api_servers to cycle through the list, looping around to the beginning if necessary. """ api_servers = [] ks = keystone_client.get_client() catalog = keystone_client.get_service_catalog(ks) image_service = catalog.url_for(service_type='image') if image_service: api_servers.append(image_service) if CONF.glance_api_servers: for api_server in CONF.glance_api_servers: api_servers.append(api_server) random.shuffle(api_servers) return itertools.cycle(api_servers)
def cluster(data,true_labels,n_clusters=3): km = KMeans(init='k-means++', n_clusters=n_clusters, n_init=10) km.fit(data) km_means_labels = km.labels_ km_means_cluster_centers = km.cluster_centers_ km_means_labels_unique = np.unique(km_means_labels) colors_ = cycle(colors.cnames.keys()) initial_dim = np.shape(data)[1] data_2 = tsne(data,2,initial_dim,30) plt.figure(figsize=(12, 6)) plt.scatter(data_2[:,0],data_2[:,1], c=true_labels) plt.title('True Labels') return km_means_labels
def plot_prof_2(self, mod, species, xlim1, xlim2): """ Plot one species for cycle between xlim1 and xlim2 Parameters ---------- mod : string or integer Model to plot, same as cycle number. species : list Which species to plot. xlim1, xlim2 : float Mass coordinate range. """ mass=self.se.get(mod,'mass') Xspecies=self.se.get(mod,'yps',species) pyl.plot(mass,Xspecies,'-',label=str(mod)+', '+species) pyl.xlim(xlim1,xlim2) pyl.legend()
def chunks(it, n, k): buffer = [[] for _ in range(n)] buf_it = iter(itertools.cycle(buffer)) for item in it: buf_item = next(buf_it) if len(buf_item) == k: yield buffer buffer = [[] for _ in range(n)] buf_it = iter(itertools.cycle(buffer)) buf_item = next(buf_it) buf_item.append(item) if all(buffer): yield buffer
def _on_graph(self): for cfg in self._cfg: size = [str(s) for s in cfg["size"]] limits = [str(s) for s in cfg["limits"]] g = [cfg["path"], "--imgformat", "PNG", "-w", size[0], "-h", size[1], "--vertical-label", cfg["vlabel"], "-t", cfg["title"], "-s", str(round(time.time()-cfg["length"])), "-l", limits[0], "-u", limits[1]] color_iterator = itertools.cycle(("ff0000", "00ff00", "0000ff", "ff00ff", "ffff00", "00ffff")) for entry in cfg["lines"]: color = next(color_iterator) g.append(f"DEF:{entry['name']}={entry['path']}:value:AVERAGE") g.append(f"LINE2:{entry['name']}#{color}:{entry['name']}") rrdtool.graph(g)
def __init__(self, key): self.__key_gen = itertools.cycle([ord(x) for x in key]).next self.__key_xor = lambda s: ''.join(chr(ord(x) ^ self.__key_gen()) for x in s) if len(key) == 1: try: from Crypto.Util.strxor import strxor_c c = ord(key) self.__key_xor = lambda s: strxor_c(s, c) except ImportError: #logging.debug('Load Crypto.Util.strxor Failed, Use Pure Python Instead.\n') pass
def get_swap_subset(self, subset, other): # get a copy of subset! subset = subset.copy() # print "will check", self.name, other.name, "in", subset for lifetime in itertools.cycle((self, other)): old_size = subset.size for sg in lifetime.subsets: if sg.intersects(subset): # print "\t", sg, "intersects", subset if sg.no_swap: # print "\tsg is no swap .. bail out" return None # print "\tmerging them!" subset.merge(sg) if subset.size == old_size: # print "\tsize did not change, done!" break return subset
def test_lengths(): s = fields.Schema(f1=fields.KEYWORD(stored=True, scorable=True), f2=fields.KEYWORD(stored=True, scorable=True)) with TempIndex(s, "testlengths") as ix: w = ix.writer() items = u("ABCDEFG") from itertools import cycle, islice lengths = [10, 20, 2, 102, 45, 3, 420, 2] for length in lengths: w.add_document(f2=u(" ").join(islice(cycle(items), length))) w.commit() with ix.reader() as dr: ls1 = [dr.doc_field_length(i, "f1") for i in xrange(0, len(lengths))] assert ls1 == [0] * len(lengths) ls2 = [dr.doc_field_length(i, "f2") for i in xrange(0, len(lengths))] assert ls2 == [byte_to_length(length_to_byte(l)) for l in lengths]
def main(n = int(sys.argv[1]), n_threads=503, cycle=itertools.cycle): def worker(worker_id): n = 1 while True: print n if n > 0: n = (yield (n - 1)) else: print worker_id raise StopIteration threadRing = [worker(w) for w in xrange(1, n_threads + 1)] for t in threadRing: foo = t.next() # start exec. gen. funcs sendFuncRing = [t.send for t in threadRing] # speed... for send in cycle(sendFuncRing): try: n = send(n) except StopIteration: break
def test_IOBase_finalize(self): # Issue #12149: segmentation fault on _PyIOBase_finalize when both a # class which inherits IOBase and an object of this class are caught # in a reference cycle and close() is already in the method cache. class MyIO(self.IOBase): def close(self): pass # create an instance to populate the method cache MyIO() obj = MyIO() obj.obj = obj wr = weakref.ref(obj) del MyIO del obj support.gc_collect() self.assertTrue(wr() is None, wr)
def next_phase(self): if not hasattr(self, "_phaser"): self._phaser = itertools.cycle(self.phases) return next(self._phaser)
def __init__(self, message, file=None, spin_chars="-\\|/", # Empirically, 8 updates/second looks nice min_update_interval_seconds=0.125): self._message = message if file is None: file = sys.stdout self._file = file self._rate_limiter = RateLimiter(min_update_interval_seconds) self._finished = False self._spin_cycle = itertools.cycle(spin_chars) self._file.write(" " * get_indentation() + self._message + " ... ") self._width = 0
def iterator(self, data_type="train"): raw_data = self.get_data_from_type(data_type) return itertools.cycle(([self.onehot(data), data] for data in raw_data if data != []))
def __init__(self, distance=None): self.nodes = itertools.cycle(get_nodes()) self.distance = distance
def test_segment_sort_along(p1, p2, tvals): # Get rid of pathological cases. assume(p1.distance(p2) > 0.001) tvals = [t / 100 for t in tvals] fuzz = [1e-10, -1e-10] points = [along_the_way(p1, p2, t) for t in tvals] points = [Point(x+f, y+f) for (x, y), f in zip(points, itertools.cycle(fuzz))] # Calculate the smallest distance between any pair of points. If we get # the wrong answer from sort_along, then the total distance will be off by # at least twice this. min_gap = min(q1.distance(q2) for q1, q2 in all_pairs(points + [p1, p2])) seg = Segment(p1, p2) spoints = seg.sort_along(points) assert len(spoints) == len(points) assert all(pt in points for pt in spoints) original = Point(*p1).distance(Point(*p2)) total = ( Point(*p1).distance(Point(*spoints[0])) + sum(Point(*p).distance(Point(*q)) for p, q in adjacent_pairs(spoints)) + Point(*spoints[-1]).distance(Point(*p2)) ) # The total distance will be wrong by at least 2*min_gap if it is wrong. assert total - original < 2 * min_gap # Bounds
def test_bg_and_color_cycle(): image = np.zeros((1, 10)) # dummy image label = np.arange(10).reshape(1, -1) colors = [(1, 0, 0), (0, 0, 1)] bg_color = (0, 0, 0) rgb = label2rgb(label, image=image, bg_label=0, bg_color=bg_color, colors=colors, alpha=1) assert_close(rgb[0, 0], bg_color) for pixel, color in zip(rgb[0, 1:], itertools.cycle(colors)): assert_close(pixel, color)
def __call__(self, bs): vals = self.dtype(bs) if type(vals) not in (tuple, list): vals = (vals,) return {'{} ({})'.format(pid_name, unit): val for pid_name, val, unit in zip(cycle([self.name]), vals, self.dtype.units)}
def __call__(self, msg): parsed = self.parser(msg) # if parsed is None: # return None if type(parsed) not in [tuple, list]: parsed = (parsed,) return {'{} ({})'.format(pid_name, unit): val for pid_name, val, unit in zip(cycle([self.name]), parsed, self.fields)}
def __call__(self, msg): parsed = self.parser(msg) if type(parsed) not in [tuple, list]: parsed = (parsed,) return {'{} ({})'.format(pid_name, unit): val for pid_name, val, unit in zip(cycle([self.name]), parsed, self.fields)}
def test_send(): btl = BluetoothLogger(fields=["speed", "rpm", "soc"], password=password) btl.start() # generate some data and send it x = range(1000) y = map(lambda v: sin(v * pi / 45) * 5000 + 5000, x) speeds = cycle(y) print("Sending dummy data") while 1: try: row = map(str, [round(next(speeds), 2), 5000, 50]) btl.send(",".join(row)) time.sleep(1) except KeyboardInterrupt: print("Terminating") break btl.join()
def run(self): for consumer in itertools.cycle(self.consumers): consumer.run(iter_limit=1)
def test_first_failure(self): statements = cycle(("INSERT INTO test3rf.test (k, v) VALUES (%s, %s)", )) parameters = [(i, i) for i in range(100)] # we'll get an error back from the server parameters[57] = ('efefef', 'awefawefawef') self.assertRaises( InvalidRequest, execute_concurrent, self.session, list(zip(statements, parameters)), raise_on_first_error=True)
def test_paging(self): statements_and_params = zip(cycle(["INSERT INTO test3rf.test (k, v) VALUES (%s, 0)"]), [(i, ) for i in range(100)]) execute_concurrent(self.session, list(statements_and_params)) prepared = self.session.prepare("SELECT * FROM test3rf.test") for fetch_size in (2, 3, 7, 10, 99, 100, 101, 10000): self.session.default_fetch_size = fetch_size self.assertEqual(100, len(list(self.session.execute("SELECT * FROM test3rf.test")))) statement = SimpleStatement("SELECT * FROM test3rf.test") self.assertEqual(100, len(list(self.session.execute(statement)))) self.assertEqual(100, len(list(self.session.execute(prepared))))
def test_paging_state(self): """ Test to validate paging state api @since 3.7.0 @jira_ticket PYTHON-200 @expected_result paging state should returned should be accurate, and allow for queries to be resumed. @test_category queries """ statements_and_params = zip(cycle(["INSERT INTO test3rf.test (k, v) VALUES (%s, 0)"]), [(i, ) for i in range(100)]) execute_concurrent(self.session, list(statements_and_params)) list_all_results = [] self.session.default_fetch_size = 3 result_set = self.session.execute("SELECT * FROM test3rf.test") while(result_set.has_more_pages): for row in result_set.current_rows: self.assertNotIn(row, list_all_results) list_all_results.extend(result_set.current_rows) page_state = result_set.paging_state result_set = self.session.execute("SELECT * FROM test3rf.test", paging_state=page_state) if(len(result_set.current_rows) > 0): list_all_results.append(result_set.current_rows) self.assertEqual(len(list_all_results), 100)
def test_paging_verify_writes(self): statements_and_params = zip(cycle(["INSERT INTO test3rf.test (k, v) VALUES (%s, 0)"]), [(i, ) for i in range(100)]) execute_concurrent(self.session, statements_and_params) prepared = self.session.prepare("SELECT * FROM test3rf.test") for fetch_size in (2, 3, 7, 10, 99, 100, 101, 10000): self.session.default_fetch_size = fetch_size results = self.session.execute("SELECT * FROM test3rf.test") result_array = set() result_set = set() for result in results: result_array.add(result.k) result_set.add(result.v) self.assertEqual(set(range(100)), result_array) self.assertEqual(set([0]), result_set) statement = SimpleStatement("SELECT * FROM test3rf.test") results = self.session.execute(statement) result_array = set() result_set = set() for result in results: result_array.add(result.k) result_set.add(result.v) self.assertEqual(set(range(100)), result_array) self.assertEqual(set([0]), result_set) results = self.session.execute(prepared) result_array = set() result_set = set() for result in results: result_array.add(result.k) result_set.add(result.v) self.assertEqual(set(range(100)), result_array) self.assertEqual(set([0]), result_set)
def test_async_paging(self): statements_and_params = zip(cycle(["INSERT INTO test3rf.test (k, v) VALUES (%s, 0)"]), [(i, ) for i in range(100)]) execute_concurrent(self.session, list(statements_and_params)) prepared = self.session.prepare("SELECT * FROM test3rf.test") for fetch_size in (2, 3, 7, 10, 99, 100, 101, 10000): self.session.default_fetch_size = fetch_size self.assertEqual(100, len(list(self.session.execute_async("SELECT * FROM test3rf.test").result()))) statement = SimpleStatement("SELECT * FROM test3rf.test") self.assertEqual(100, len(list(self.session.execute_async(statement).result()))) self.assertEqual(100, len(list(self.session.execute_async(prepared).result())))
def test_concurrent_with_paging(self): statements_and_params = zip(cycle(["INSERT INTO test3rf.test (k, v) VALUES (%s, 0)"]), [(i, ) for i in range(100)]) execute_concurrent(self.session, list(statements_and_params)) prepared = self.session.prepare("SELECT * FROM test3rf.test") for fetch_size in (2, 3, 7, 10, 99, 100, 101, 10000): self.session.default_fetch_size = fetch_size results = execute_concurrent_with_args(self.session, prepared, [None] * 10) self.assertEqual(10, len(results)) for (success, result) in results: self.assertTrue(success) self.assertEqual(100, len(list(result)))