我们从Python开源项目中,提取了以下18个代码示例,用于说明如何使用hypothesis.strategies.sampled_from()。
def test_structure_simple_from_dict_default(converter, cl_and_vals, data): """Test structuring non-nested attrs classes with default value.""" cl, vals = cl_and_vals obj = cl(*vals) attrs_with_defaults = [a for a in fields(cl) if a.default is not NOTHING] to_remove = data.draw(lists(elements=sampled_from(attrs_with_defaults), unique=True)) for a in to_remove: if isinstance(a.default, Factory): setattr(obj, a.name, a.default.factory()) else: setattr(obj, a.name, a.default) dumped = asdict(obj) for a in to_remove: del dumped[a.name] assert obj == converter.structure(dumped, cl)
def spark_application(app_id): """Mock of the Spark jobs REST resource.""" if 'last' in request.args: return jsonify(redis.get(request.base_url)) d = st.fixed_dictionaries({ 'jobId': st.integers(0), 'name': st.text(), 'submissionTime': st.text(), 'completionTime': st.text(), 'stageIds': st.lists(st.integers(0), average_size=3), 'status': st.sampled_from(['SUCCEEDED', 'RUNNING', 'FAILED']), 'numTasks': st.integers(0), 'numActiveTasks': st.integers(0), 'numCompletedTasks': st.integers(0), 'numSkippedTasks': st.integers(0), 'numFailedTasks': st.integers(0), 'numActiveStages': st.integers(0), 'numCompletedStages': st.integers(0), 'numSkippedStages': st.integers(0), 'numFailedStages': st.integers(0), }) result = json.dumps(st.lists(d, average_size=3).example()) redis.set(request.base_url, result) return jsonify(result)
def functiondef_node(draw, name=None, annotated=False, returns=False): name = name or draw(valid_identifier()) args = draw(arguments_node(annotated)) body = [] returns_node = astroid.Return() arg_node, arg_type_node = draw(hs.sampled_from(list(zip(args.args, args.annotations)))) if returns: returns_node.postinit(arg_node) else: returns_node.postinit(const_node(None)) body.append(returns_node) node = astroid.FunctionDef(name=name) node.postinit( args, body, None, arg_type_node ) return node
def enum_to_strategy(enum): """Generate strategy for enum.""" return st.sampled_from([ value.number for value in enum.DESCRIPTOR.values ])
def add_bools(list_of_lists): """ Given recursive list that can contain other lists, return tuple of that plus a booleans strategy for each list. """ l = [] def count(recursive): l.append(1) for child in recursive: if isinstance(child, list): count(child) count(list_of_lists) return st.tuples(st.just(list_of_lists), st.tuples(*[st.sampled_from([True, False]) for i in l]))
def remove_strategy(self): def get_address(service_name): return st.tuples(st.just(service_name), st.sampled_from( self.fake.services[service_name])) return st.tuples(st.just("remove"), ( st.sampled_from(list(self.fake.services.keys())).flatmap(get_address)))
def steps(self): result = add_strategy | replace_strategy # Replace or add to a known service cluster: if self.fake.services: result |= st.tuples(st.just("replace"), st.tuples(st.sampled_from(list(self.fake.services.keys())), st.lists(nice_strings))) result |= st.tuples(st.just("add"), st.tuples(st.sampled_from(list(self.fake.services.keys())), nice_strings)) # Remove a known address from known cluster: if not self.fake.is_empty(): result |= self.remove_strategy() return result
def default(self, obj): """ """ if isinstance(obj, dict) and _is_swagger_parameter(obj): parameter_type = obj.get('format', obj.get('type')) parameter_schema = obj.get('schema') parameter_ref = obj.get('$ref') if parameter_type in SWAGGER_FORMAT_MAPPING: return SWAGGER_FORMAT_MAPPING[parameter_type] elif parameter_ref: return self.transform(self.get_ref(parameter_ref, self.spec)) elif parameter_type == 'array': if obj['items'].get('enum'): return st.lists(elements=st.sampled_from(obj['items']['enum'])) elif obj['items'].get('type'): return st.lists(elements=SWAGGER_FORMAT_MAPPING[obj['items']['type']]) elif obj['items'].get('$ref'): schema = self.get_ref(obj['items']['$ref'], self.spec) return st.lists(elements=self.transform(schema)) raise Exception('array', obj) elif parameter_type == 'object': properties = {} for property_name, property_ in obj['properties'].items(): properties[property_name] = self.transform(property_) return st.fixed_dictionaries(properties) elif parameter_schema: if parameter_schema.get('type') == 'array': schema = self.get_ref(parameter_schema['items']['$ref'], self.spec) return st.lists(elements=self.transform(schema)) else: schema = self.get_ref(parameter_schema['$ref'], self.spec) transformed = self.transform(schema) return transformed else: raise Exception("Invalid", obj, parameter_type) return obj
def strategy(self): 'Returns resulting strategy that generates configured char set' max_codepoint = None if self._unicode else 127 strategies = [] if self._negate: if self._categories or self._whitelist_chars: strategies.append( hs.characters( blacklist_categories=self._categories | set(['Cc', 'Cs']), blacklist_characters=self._whitelist_chars, max_codepoint=max_codepoint, ) ) if self._blacklist_chars: strategies.append( hs.sampled_from( list(self._blacklist_chars - self._whitelist_chars) ) ) else: if self._categories or self._blacklist_chars: strategies.append( hs.characters( whitelist_categories=self._categories, blacklist_characters=self._blacklist_chars, max_codepoint=max_codepoint, ) ) if self._whitelist_chars: strategies.append( hs.sampled_from( list(self._whitelist_chars - self._blacklist_chars) ) ) return hs.one_of(*strategies) if strategies else hs.just(u'')
def jobs( draw, ids=uuids(), statuses=sampled_from(JobInterface.JobStatus), parameters=dictionaries(text(), text()), results=dictionaries(text(), text()), dates_submitted=datetimes(), registration_schemas=dictionaries(text(), text()), result_schemas=dictionaries(text(), text()) ) -> JobInterface: """ :param draw: A function that can take a strategy and draw a datum from it :param ids: A hypothesis strategy (statisticians should read "random variable"), that represents the set of all valid job IDs :param statuses: A hypothesis strategy that samples from the set of all allowed job statuses :param parameters: A hypothesis strategy that samples from all job parameters :param results: A hypothesis strategy that represents the possible results :param dates_submitted: A hypothesis strategy that represents the possible dates that can be submitted :param registration_schemas: The possible job registration schemas :param result_schemas: The possible job result schemas :return: A randomly-generated implementation of :class:`JobInterface` """ return Job( draw(ids), draw(statuses), draw(parameters), draw(results), draw(dates_submitted), draw(registration_schemas), draw(result_schemas) )
def user_actions(draw, skip=None, **lists_kwargs): if skip is None: skip = [] return draw(lists(sampled_from(k for k in USER_ACTIONS if k not in skip), **lists_kwargs))
def test_numeric_type_instances_can_be_compared(data): first = data.draw(hs.sampled_from(NUMERIC_TYPES))() second = data.draw(hs.sampled_from(NUMERIC_TYPES))() # We don't actually care if it's True or False assert (first < second) in (True, False)
def models(): return sampled_from([v1_5_model])
def durations(draw): num = draw(st.integers(1, 100)) suffix = draw(st.sampled_from('ymd')) return f'{num}{suffix}'
def provide_require_st(draw, filter_=True): # pragma: no cover commands = draw(range_intagers_st) provides = draw( st.lists( st.lists(range_intagers_st, max_size=10), min_size = commands, max_size = commands ), ) is_func = draw( st.lists( st.booleans(), min_size = commands, max_size = commands ) ) provides_set = set() for command in provides: provides_set.update(command) requires = [] if provides_set: for command in provides: if command: max_prov = max(command) else: max_prov = 0 if filter_: provides_filter = [x for x in provides_set if x > max_prov] else: provides_filter = provides_set if provides_filter: sample = st.sampled_from(provides_filter) requires.append(draw(st.lists(sample, max_size=10))) else: requires.append([]) else: requires = [[]] * commands return (provides, requires, is_func)
def get_request(data, spec, spec_host): endpoint_path = data.draw(st.sampled_from(spec['paths'].keys())) endpoint = spec['paths'][endpoint_path] method_name = data.draw(st.sampled_from(endpoint.keys())) endpoint = endpoint[method_name] path_params = _get_filtered_parameter(endpoint, 'path', spec) path_args = data.draw(st.fixed_dictionaries(path_params)) query_params = _get_filtered_parameter(endpoint, 'query', spec) query_args = data.draw(st.fixed_dictionaries(query_params)) body_params = _get_filtered_parameter(endpoint, 'body', spec) if body_params: body_args = data.draw(body_params['body']) else: body_args = None valid_request_body_format = get_item_path_acceptable_format(endpoint, spec) request_data = None request_headers = {} if body_args: # no_body_format_declaration(body_args, valid_request_body_format, endpoint) if body_args and valid_request_body_format is None: # Force a request format, swagger ui seems to force json format valid_request_body_format = ["application/json"] request_body_format = data.draw(st.sampled_from(valid_request_body_format), 'request_body_format') request_headers['Content-Type'] = request_body_format if request_body_format == 'application/x-www-form-urlencoded': request_data = body_args elif request_body_format == 'application/json': request_data = json.dumps(body_args, cls=CustomJsonEncoder) elif request_body_format == 'application/xml': pass # TODO Implement XML else: raise Exception(request_body_format) endpoint_url = endpoint_path.format(**path_args) assume('\x00' not in endpoint_url) # Generate request URL = furl(spec_host) URL = URL.join(endpoint_url.lstrip('/')) if query_args: URL = URL.add(args=query_args) request = Request(method_name, URL.url, data=request_data, headers=request_headers).prepare() request.build_context = locals() return request
def applications(): """Mock of the YARN cluster apps REST resource.""" if 'last' in request.args: return jsonify(redis.get(request.base_url)) d = st.fixed_dictionaries({ 'allocatedMB': st.integers(-1), 'allocatedVCores': st.integers(-1), 'amContainerLogs': st.text(), 'amHostHttpAddress': st.text(), 'applicationTags': st.text(), 'applicationType': st.sampled_from(['MAPREDUCE', 'SPARK']), 'clusterId': st.integers(0), 'diagnostics': st.text(), 'elapsedTime': st.integers(0), 'finalStatus': st.sampled_from(['UNDEFINED', 'SUCCEEDED', 'FAILED', 'KILLED']), 'finishedTime': st.integers(0), 'id': st.text(string.ascii_letters, min_size=5, max_size=25), 'memorySeconds': st.integers(0), 'name': st.text(min_size=5), 'numAMContainerPreempted': st.integers(0), 'numNonAMContainerPreempted': st.integers(0), 'preemptedResourceMB': st.integers(0), 'preemptedResourceVCores': st.integers(0), 'progress': st.floats(0, 100), 'queue': st.text(), 'runningContainers': st.integers(-1), 'startedTime': st.integers(0), 'state': st.sampled_from(['NEW', 'NEW_SAVING', 'SUBMITTED', 'ACCEPTED', 'RUNNING', 'FINISHED', 'FAILED', 'KILLED']), 'trackingUI': st.text(), 'trackingUrl': st.just(os.environ['YARN_ENDPOINT']), 'user': st.text(), 'vcoreSeconds': st.integers(0) }) result = json.dumps({ 'apps': { 'app': st.lists(d, min_size=4, average_size=10).example() } }) redis.set(request.base_url, result) return jsonify(result)
def mapreduce_application(): """Mock of the mapreduce jobs REST resource.""" if 'last' in request.args: return jsonify(redis.get(request.base_url)) d = st.fixed_dictionaries({ 'startTime': st.integers(0), 'finishTime': st.integers(0), 'elapsedTime': st.integers(0), 'id': st.integers(0), 'name': st.text(), 'user': st.text(), 'state': st.sampled_from(['NEW', 'SUCCEEDED', 'RUNNING', 'FAILED', 'KILLED']), 'mapsTotal': st.integers(0), 'mapsCompleted': st.integers(0), 'reducesTotal': st.integers(0), 'reducesCompleted': st.integers(0), 'mapProgress': st.floats(0, 100), 'reduceProgress': st.floats(0, 100), 'mapsPending': st.integers(0), 'mapsRunning': st.integers(0), 'reducesPending': st.integers(0), 'reducesRunning': st.integers(0), 'uberized': st.booleans(), 'diagnostics': st.text(), 'newReduceAttempts': st.integers(0), 'runningReduceAttempts': st.integers(0), 'failedReduceAttempts': st.integers(0), 'killedReduceAttempts': st.integers(0), 'successfulReduceAttempts': st.integers(0), 'newMapAttempts': st.integers(0), 'runningMapAttempts': st.integers(0), 'failedMapAttempts': st.integers(0), 'killedMapAttempts': st.integers(0), 'successfulMapAttempts': st.integers(0) }) result = json.dumps({ 'jobs': { 'job': st.lists(d, average_size=3).example() } }) redis.set(request.base_url, result) return jsonify(result)