我们从Python开源项目中,提取了以下45个代码示例,用于说明如何使用hypothesis.strategies.lists()。
def combinable_paths_no_loops(draw): """Makes varying-length paths, but no loops.""" path_points = draw(lists(ipoints, min_size=2, max_size=200, unique_by=tuple)) rand = draw(randoms()) paths = [[]] length = rand.randint(2, 4) for pt in path_points: paths[-1].append(pt) length -= 1 if length == 0: paths.append([]) length = rand.randint(2, 4) joinable = (rand.random() > .5) if joinable: paths[-1].append(pt) if len(paths[-1]) < 2: paths.pop() rand.shuffle(paths) return [Path(p) for p in paths]
def test_structure_simple_from_dict_default(converter, cl_and_vals, data): """Test structuring non-nested attrs classes with default value.""" cl, vals = cl_and_vals obj = cl(*vals) attrs_with_defaults = [a for a in fields(cl) if a.default is not NOTHING] to_remove = data.draw(lists(elements=sampled_from(attrs_with_defaults), unique=True)) for a in to_remove: if isinstance(a.default, Factory): setattr(obj, a.name, a.default.factory()) else: setattr(obj, a.name, a.default) dumped = asdict(obj) for a in to_remove: del dumped[a.name] assert obj == converter.structure(dumped, cl)
def spark_application(app_id): """Mock of the Spark jobs REST resource.""" if 'last' in request.args: return jsonify(redis.get(request.base_url)) d = st.fixed_dictionaries({ 'jobId': st.integers(0), 'name': st.text(), 'submissionTime': st.text(), 'completionTime': st.text(), 'stageIds': st.lists(st.integers(0), average_size=3), 'status': st.sampled_from(['SUCCEEDED', 'RUNNING', 'FAILED']), 'numTasks': st.integers(0), 'numActiveTasks': st.integers(0), 'numCompletedTasks': st.integers(0), 'numSkippedTasks': st.integers(0), 'numFailedTasks': st.integers(0), 'numActiveStages': st.integers(0), 'numCompletedStages': st.integers(0), 'numSkippedStages': st.integers(0), 'numFailedStages': st.integers(0), }) result = json.dumps(st.lists(d, average_size=3).example()) redis.set(request.base_url, result) return jsonify(result)
def pattern_to_statements(pattern): if isinstance(pattern, template): return lists(just(pattern), min_size=1, max_size=1) rule, value = pattern if rule == 'sequence': return tuples(*map(pattern_to_statements, value)).map(unpack_list).map(list) elif rule == 'alternates': return one_of(*map(pattern_to_statements, value)) elif rule == 'zeroOrMore': return lists(pattern_to_statements(value)).map(unpack_list).map(list) elif rule == 'oneOrMore': return lists(pattern_to_statements(value), min_size=1).map(unpack_list).map(list) elif rule == 'optional': return lists(pattern_to_statements(value), min_size=0, max_size=1).map(unpack_list).map(list) else: raise Exception("impossible!", rule) # this replicates the current scorm pattern, a realistic example of medium # complexity. Note it has repeated elements, just not in ambiguous ways.
def arguments_node(draw, annotated=False): n = draw(hs.integers(min_value=1, max_value=5)) args = draw(hs.lists(name_node(None), min_size=n, max_size=n)) if annotated: annotations = draw(hs.lists(name_node(annotation), min_size=n, max_size=n)) else: annotations = None node = astroid.Arguments() node.postinit( args, None, None, None, annotations ) return node
def gen_data_from_z(z): """ Given the z axis values (2d list of failures) generate data that would have given us this z value. :param z: a list of lists of failures. :return: the data that would have given this z value, the z value, the x value (stages) and the y value (projects). """ data = [] projects_len = len(z) stages_len = 0 if projects_len == 0 else len(z[0]) projects = [uuid4() for _ in xrange(projects_len)] stages = [uuid4() for _ in xrange(stages_len)] for pidx, project in enumerate(projects): for sidx, stage in enumerate(stages): failures = z[pidx][sidx] repo = strings.example() for _ in xrange(failures): data.append(MockPipelineRun(stage_failed=stage, project=project, repository=repo)) return data, z, stages, projects
def build_failure_map(stages, projects, z): """ Build a dict mapping the key "projects + stages" to the number of failures. :param stages: list of stages. :param projects: list of projects. :param z: a list of lists of failures. :return: the failures map. """ failure_lookup = dict() for stages_index, failures_list in enumerate(z): for failures_list_index, failures in enumerate(failures_list): project = projects[stages_index] stage = stages[failures_list_index] failure_lookup[str(project) + str(stage)] = failures return failure_lookup
def two_band_eo_dataset(draw): crs, height, width, times = draw(dataset_shape()) coordinates = {dim: np.arange(size) for dim, size in zip(crs.dimensions, (height, width))} coordinates['time'] = times dimensions = ('time',) + crs.dimensions shape = (len(times), height, width) arr = np.random.random_sample(size=shape) data1 = xr.DataArray(arr, dims=dimensions, coords=coordinates, attrs={'crs': crs}) arr = np.random.random_sample(size=shape) data2 = xr.DataArray(arr, dims=dimensions, coords=coordinates, attrs={'crs': crs}) name1, name2 = draw(st.lists(variable_name, min_size=2, max_size=2, unique=True)) dataset = xr.Dataset(data_vars={name1: data1, name2: data2}, attrs={'crs': crs}) return dataset
def combinable_paths_maybe_loops(draw): """Makes single-segment paths, with loops a possibility.""" endpoints = draw(lists(ipoints, min_size=2, max_size=200, unique_by=tuple)) rand = draw(randoms()) paths = set() point_use = collections.defaultdict(int) target_number = len(endpoints) / 3 if target_number < 1: target_number = 1 while len(paths) < target_number: # Choose two points at random from the possible endpoints, and make a # segment. a, b = rand.sample(endpoints, k=2) if (a, b) in paths: continue paths.add((a, b)) # Track how many times the points have been used. point_use[a] += 1 point_use[b] += 1 # Any point in two segments is no longer a candidate as an endpoint. if point_use[a] == 2: endpoints.remove(a) if point_use[b] == 2: endpoints.remove(b) return [Path(p) for p in paths]
def random_prefix(draw): #limited to unicode letters, see #https://en.wikipedia.org/wiki/Unicode_character_property#General_Category categories = ['Ll', 'Lt', 'Lm', 'Lo'] characters = st.lists(st.characters(whitelist_categories=categories), min_size = 1) prefix = st.text(alphabet = draw(characters), min_size = 1) return draw(prefix)
def totally_random_references(draw): '''generates random sorted lists of references like ['IASDHAH1', 'ZKJDJAD1569', ...]''' parts = draw(st.lists(random_reference())) parts.sort() return list(map(toRef, parts))
def random_references(draw): '''generates random sorted lists of references with the same prefix like ['IASDHAH1', 'IASDHAH1569', ...]''' prefix = st.just(draw(random_prefix())) parts = draw(st.lists(random_reference(prefix = prefix))) parts.sort() return list(map(toRef, parts))
def add_bools(list_of_lists): """ Given recursive list that can contain other lists, return tuple of that plus a booleans strategy for each list. """ l = [] def count(recursive): l.append(1) for child in recursive: if isinstance(child, list): count(child) count(list_of_lists) return st.tuples(st.just(list_of_lists), st.tuples(*[st.sampled_from([True, False]) for i in l]))
def steps(self): result = add_strategy | replace_strategy # Replace or add to a known service cluster: if self.fake.services: result |= st.tuples(st.just("replace"), st.tuples(st.sampled_from(list(self.fake.services.keys())), st.lists(nice_strings))) result |= st.tuples(st.just("add"), st.tuples(st.sampled_from(list(self.fake.services.keys())), nice_strings)) # Remove a known address from known cluster: if not self.fake.is_empty(): result |= self.remove_strategy() return result
def default(self, obj): """ """ if isinstance(obj, dict) and _is_swagger_parameter(obj): parameter_type = obj.get('format', obj.get('type')) parameter_schema = obj.get('schema') parameter_ref = obj.get('$ref') if parameter_type in SWAGGER_FORMAT_MAPPING: return SWAGGER_FORMAT_MAPPING[parameter_type] elif parameter_ref: return self.transform(self.get_ref(parameter_ref, self.spec)) elif parameter_type == 'array': if obj['items'].get('enum'): return st.lists(elements=st.sampled_from(obj['items']['enum'])) elif obj['items'].get('type'): return st.lists(elements=SWAGGER_FORMAT_MAPPING[obj['items']['type']]) elif obj['items'].get('$ref'): schema = self.get_ref(obj['items']['$ref'], self.spec) return st.lists(elements=self.transform(schema)) raise Exception('array', obj) elif parameter_type == 'object': properties = {} for property_name, property_ in obj['properties'].items(): properties[property_name] = self.transform(property_) return st.fixed_dictionaries(properties) elif parameter_schema: if parameter_schema.get('type') == 'array': schema = self.get_ref(parameter_schema['items']['$ref'], self.spec) return st.lists(elements=self.transform(schema)) else: schema = self.get_ref(parameter_schema['$ref'], self.spec) transformed = self.transform(schema) return transformed else: raise Exception("Invalid", obj, parameter_type) return obj
def lists_of_primitives(draw): """Generate a strategy that yields tuples of list of primitives and types. For example, a sample value might be ([1,2], List[int]). """ prim_strat, t = draw(primitive_strategies) list_t = draw(list_types.map(lambda list_t: list_t[t]) | list_types) return draw(st.lists(prim_strat)), list_t
def lists_of_attrs(defaults=None): # Python functions support up to 255 arguments. return (st.lists(simple_attrs(defaults), average_size=5, max_size=20) .map(lambda l: sorted(l, key=lambda t: t[0]._default is not NOTHING)))
def _device_list(minimum): """ Get a device generating strategy. :param int minimum: the minimum number of devices, must be at least 0 """ return strategies.lists( strategies.text( alphabet=string.ascii_letters + "/", min_size=1 ), min_size=minimum )
def service_lists( draw, min_size=None, average_size=None, max_size=None, unique_by=None, unique=False ) -> _ServiceList: service_generator = lists( services(), min_size=min_size, average_size=average_size, max_size=max_size, unique_by=unique_by, unique=unique ) return _ServiceList(draw(service_generator))
def job_lists( draw, min_size=None, max_size=None, average_size=None, jobs=jobs() ) -> JobListInterface: job_list_maker = lists(jobs, min_size=min_size, max_size=max_size, average_size=average_size) return JobList(draw(job_list_maker))
def user_actions(draw, skip=None, **lists_kwargs): if skip is None: skip = [] return draw(lists(sampled_from(k for k in USER_ACTIONS if k not in skip), **lists_kwargs))
def test_continious_bools_can_be_packed_unpacked(data): bools = data.draw(hs.lists(strategies.bools)) stream = io.BytesIO() at.Bool.many_to_bytestream(bools, stream) stream.seek(0) new = at.Bool.many_from_bytestream(stream, len(bools)) assert bools == new
def file(draw, filename=st.lists( st.characters(blacklist_categories=('Cc',), blacklist_characters=('\0\n\r/\\|><')), min_size=1, average_size=20, max_size=80), content=st.binary(max_size=10000, average_size=100)): return {'filename': ''.join(draw(filename)), 'content': draw(content)}
def files(draw): # TODO: use st.recursive to generate files in folders return draw(st.lists(valid_file, average_size=5, max_size=MAX_FILES)\ .filter(has_no_duplicate)) #valid_vault = vault().filter(has_no_duplicate)
def create_dummy_rate_file(rate_file): rates = lists(floats(min_value=0.00001, allow_nan=False, allow_infinity=False), min_size=0, max_size=100).example() max_year = datetime.datetime.now().year date_times = lists(datetimes(min_year=2016, max_year=max_year), min_size=len(rates), max_size=len(rates)).map(sorted).example() with open(rate_file, 'a') as f: for date_time, rate in zip(date_times, rates): writer = csv.writer(f, lineterminator='\n') market_data = [date_time.strftime("%Y-%m-%d %H:%M:%S"), rate] writer.writerow(market_data) return rates, date_times
def homogeneous_list(**kwargs): """Return a strategy which generates a list of uniform type.""" return primitive_types.flatmap(lambda s: hs.lists(s(), **kwargs))
def random_list(**kwargs): """Return a strategy which generates a random list.""" return hs.lists(primitive_values, **kwargs)
def boolop_node(draw, value=None, op=binary_bool_operator, **kwargs): value = value or const_node() node = astroid.BoolOp(draw(op)) if kwargs.get('min_size', 0) < 2: kwargs['min_size'] = 2 node.postinit(draw(hs.lists(value, **kwargs))) return node
def setcomp_node(draw, elt=const_node(), generators=hs.lists(comprehension_node(), min_size=1, average_size=1)): node = astroid.SetComp() node.postinit(draw(elt), draw(generators)) return node
def list_node(draw, elt=const_node(), **kwargs): """Return a List node with elements drawn from elt. """ node = astroid.List() node.postinit(draw(hs.lists(elt, **kwargs))) return node
def tuple_node(draw, elt=const_node, **kwargs): """Return a Tuple node with elements drawn from elt. """ elts = draw(hs.lists(elt(), **kwargs)) node = astroid.Tuple() node.postinit(elts) return node
def func_wrap_strategy(args, func): min_size = func.arity[0] max_size = func.arity[1] and func.arity[0] or 4 return st.lists(args, min_size=min_size, max_size=max_size).map(lambda a: func(*a))
def dns_names(): """ Strategy for generating limited charset DNS names. """ return ( s.lists(dns_labels(), min_size=1, max_size=10) .map(u'.'.join))
def urls(): """ Strategy for generating ``twisted.python.url.URL``\s. """ return s.builds( URL, scheme=s.just(u'https'), host=dns_names(), path=s.lists(s.text( max_size=64, alphabet=s.characters(blacklist_characters=u'/?#', blacklist_categories=('Cs',)) ), min_size=1, max_size=10))
def pem_objects(draw): """ Strategy for generating ``pem`` objects. """ key = RSAPrivateKey(( b'-----BEGIN RSA PRIVATE KEY-----\n' + encodebytes(draw(s.binary(min_size=1))) + b'-----END RSA PRIVATE KEY-----\n')) return [key] + [ Certificate(( b'-----BEGIN CERTIFICATE-----\n' + encodebytes(cert) + b'-----END CERTIFICATE-----\n')) for cert in draw(s.lists(s.binary(min_size=1), min_size=1))]
def panicing_certs_fixture(draw): now = draw(datetimes(min_year=1971, max_year=2030, timezones=[])) panic = timedelta(seconds=draw( s.integers(min_value=60, max_value=60 * 60 * 24))) certs = dict( draw( s.lists( panicing_cert(now, panic), min_size=1, unique_by=lambda i: i[0]))) return AcmeFixture(now=now, panic_interval=panic, certs=certs)
def test_unit_interval(self): """lists of random values in [0, 1] chosen. Minimum distance cannot be greater than 1.""" n = numpy.random.rand(5, 3) m = numpy.random.rand(10, 3) min_dist = isambard.geometry.closest_distance(points1=n, points2=m) self.assertLessEqual(min_dist, 1.0)
def ordered_dates(num): return st.lists(st.datetimes(datetime(1970, 1, 1), datetime(2050, 1, 1)), min_size=num, max_size=num)
def provide_require_st(draw, filter_=True): # pragma: no cover commands = draw(range_intagers_st) provides = draw( st.lists( st.lists(range_intagers_st, max_size=10), min_size = commands, max_size = commands ), ) is_func = draw( st.lists( st.booleans(), min_size = commands, max_size = commands ) ) provides_set = set() for command in provides: provides_set.update(command) requires = [] if provides_set: for command in provides: if command: max_prov = max(command) else: max_prov = 0 if filter_: provides_filter = [x for x in provides_set if x > max_prov] else: provides_filter = provides_set if provides_filter: sample = st.sampled_from(provides_filter) requires.append(draw(st.lists(sample, max_size=10))) else: requires.append([]) else: requires = [[]] * commands return (provides, requires, is_func)
def test_nestedInteractions(self, values): """ Nested interactions operate independently of parent interactions. :param values: a two-tuple composed of: - a recursive list of unicode and other recursive lists - list start means begin interaction, string means node resolve, list end means finish interaction. - list of False/True; True means failed interaction """ requested_interactions, failures = values failures = iter(failures) assume(not isinstance(requested_interactions, unicode)) self.init() ws_actor = self.connector.expectSocket() self.connector.connect(ws_actor) failures = iter(failures) created_services = {} expected_success_nodes = Counter() expected_failed_nodes = Counter() def run_interaction(children): should_fail = next(failures) failed = [] succeeded = [] self.session.start_interaction() for child in children: if isinstance(child, unicode): # Make sure disco knows about the node: if child in created_services: node = created_services[child] else: node = create_node(child, child) created_services[child] = node self.disco.onMessage(None, NodeActive(node)) # Make sure the child Node is resolved in the interaction self.session.resolve(node.service, "1.0") if should_fail: expected_failed_nodes[node] += 1 failed.append(node) else: expected_success_nodes[node] += 1 succeeded.append(node) else: run_interaction(child) if should_fail: self.session.fail_interaction("OHNO") self.session.finish_interaction() self.connector.advance_time(5.0) # Make sure interaction is sent ws_actor.swallowLogMessages() self.connector.expectInteraction( self, ws_actor, self.session, failed, succeeded) run_interaction(requested_interactions) for node in set(expected_failed_nodes) | set(expected_success_nodes): policy = self.disco.failurePolicy(node) self.assertEqual((policy.successes, policy.failures), (expected_success_nodes[node], expected_failed_nodes[node]))
def applications(): """Mock of the YARN cluster apps REST resource.""" if 'last' in request.args: return jsonify(redis.get(request.base_url)) d = st.fixed_dictionaries({ 'allocatedMB': st.integers(-1), 'allocatedVCores': st.integers(-1), 'amContainerLogs': st.text(), 'amHostHttpAddress': st.text(), 'applicationTags': st.text(), 'applicationType': st.sampled_from(['MAPREDUCE', 'SPARK']), 'clusterId': st.integers(0), 'diagnostics': st.text(), 'elapsedTime': st.integers(0), 'finalStatus': st.sampled_from(['UNDEFINED', 'SUCCEEDED', 'FAILED', 'KILLED']), 'finishedTime': st.integers(0), 'id': st.text(string.ascii_letters, min_size=5, max_size=25), 'memorySeconds': st.integers(0), 'name': st.text(min_size=5), 'numAMContainerPreempted': st.integers(0), 'numNonAMContainerPreempted': st.integers(0), 'preemptedResourceMB': st.integers(0), 'preemptedResourceVCores': st.integers(0), 'progress': st.floats(0, 100), 'queue': st.text(), 'runningContainers': st.integers(-1), 'startedTime': st.integers(0), 'state': st.sampled_from(['NEW', 'NEW_SAVING', 'SUBMITTED', 'ACCEPTED', 'RUNNING', 'FINISHED', 'FAILED', 'KILLED']), 'trackingUI': st.text(), 'trackingUrl': st.just(os.environ['YARN_ENDPOINT']), 'user': st.text(), 'vcoreSeconds': st.integers(0) }) result = json.dumps({ 'apps': { 'app': st.lists(d, min_size=4, average_size=10).example() } }) redis.set(request.base_url, result) return jsonify(result)
def mapreduce_application(): """Mock of the mapreduce jobs REST resource.""" if 'last' in request.args: return jsonify(redis.get(request.base_url)) d = st.fixed_dictionaries({ 'startTime': st.integers(0), 'finishTime': st.integers(0), 'elapsedTime': st.integers(0), 'id': st.integers(0), 'name': st.text(), 'user': st.text(), 'state': st.sampled_from(['NEW', 'SUCCEEDED', 'RUNNING', 'FAILED', 'KILLED']), 'mapsTotal': st.integers(0), 'mapsCompleted': st.integers(0), 'reducesTotal': st.integers(0), 'reducesCompleted': st.integers(0), 'mapProgress': st.floats(0, 100), 'reduceProgress': st.floats(0, 100), 'mapsPending': st.integers(0), 'mapsRunning': st.integers(0), 'reducesPending': st.integers(0), 'reducesRunning': st.integers(0), 'uberized': st.booleans(), 'diagnostics': st.text(), 'newReduceAttempts': st.integers(0), 'runningReduceAttempts': st.integers(0), 'failedReduceAttempts': st.integers(0), 'killedReduceAttempts': st.integers(0), 'successfulReduceAttempts': st.integers(0), 'newMapAttempts': st.integers(0), 'runningMapAttempts': st.integers(0), 'failedMapAttempts': st.integers(0), 'killedMapAttempts': st.integers(0), 'successfulMapAttempts': st.integers(0) }) result = json.dumps({ 'jobs': { 'job': st.lists(d, average_size=3).example() } }) redis.set(request.base_url, result) return jsonify(result)