我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用attr.asdict()。
def json_ready_header_auth(header_auth): # type: (MessageHeaderAuthentication) -> Dict[str, Text] """Create a JSON-serializable representation of a :class:`aws_encryption_sdk.internal.structures.MessageHeaderAuthentication`. http://docs.aws.amazon.com/encryption-sdk/latest/developer-guide/message-format.html#header-authentication :param header_auth: header auth for which to create a JSON-serializable representation :type header_auth: aws_encryption_sdk.internal.structures.MessageHeaderAuthentication :rtype: dict """ dict_header_auth = attr.asdict(header_auth) for key, value in dict_header_auth.items(): dict_header_auth[key] = unicode_b64_encode(value) return dict_header_auth
def test_structure_simple_from_dict_default(converter, cl_and_vals, data): """Test structuring non-nested attrs classes with default value.""" cl, vals = cl_and_vals obj = cl(*vals) attrs_with_defaults = [a for a in fields(cl) if a.default is not NOTHING] to_remove = data.draw(lists(elements=sampled_from(attrs_with_defaults), unique=True)) for a in to_remove: if isinstance(a.default, Factory): setattr(obj, a.name, a.default.factory()) else: setattr(obj, a.name, a.default) dumped = asdict(obj) for a in to_remove: del dumped[a.name] assert obj == converter.structure(dumped, cl)
def test_structure_union(converter, cl_and_vals_a, cl_and_vals_b): """Structuring of automatically-disambiguable unions works.""" # type: (Converter, Any, Any) -> None cl_a, vals_a = cl_and_vals_a cl_b, vals_b = cl_and_vals_b a_field_names = {a.name for a in fields(cl_a)} b_field_names = {a.name for a in fields(cl_b)} assume(a_field_names) assume(b_field_names) common_names = a_field_names & b_field_names if len(a_field_names) > len(common_names): obj = cl_a(*vals_a) dumped = asdict(obj) res = converter.structure(dumped, Union[cl_a, cl_b]) assert isinstance(res, cl_a) assert obj == res
def test_fallback(cl_and_vals): """The fallback case works.""" cl, vals = cl_and_vals assume(attr.fields(cl)) # At least one field. @attr.s class A(object): pass fn = create_uniq_field_dis_func(A, cl) assert fn({}) is A assert fn(attr.asdict(cl(*vals))) is cl attr_names = {a.name for a in attr.fields(cl)} if 'xyz' not in attr_names: fn({'xyz': 1}) is A # Uses the fallback.
def to_yaml(cls: Type['File'], instance: 'File') -> Mapping: """Represent the instance as YAML node. Keyword arguments: instance: The File to be represented. Returns: YAML representation of the instance. """ # Dump mod part columns = (str(c).split('.')[-1] for c in Mod.__table__.columns) yml = {f: getattr(instance.mod, f) for f in columns} # Dump the file part yml['file'] = attr.asdict(instance) for field in ('mod',): del yml['file'][field] return yml
def dump_object(obj): """Get this object as a dict.""" def should_dump(self, attr, value): """Decide if the attribute should be dumped or not.""" column = self.__table__.c[attr.name] default = column.default if default is not None: return not value == default.arg elif self.__table__.c[attr.name].nullable and value is None: return False else: return True d = asdict(obj, filter=partial(should_dump, obj)) if isinstance( obj, db.SpacialObject ) and 'type' in d: # obj.type is not None d['type'] = d['type'].name return d
def train_model(self, request: Dict) -> Dict: ws_id = request['workspace_id'] try: pages = request['pages'] pages = self._fetch_pages_html(pages) result = train_model( pages, model_cls=self.model_cls, progress_callback=partial(self.progress_callback, ws_id=ws_id), **self.model_kwargs) except Exception as e: logging.error('Failed to train a model', exc_info=e) result = ModelMeta( model=None, meta=Meta(advice=[AdviceItem( ERROR, 'Unknown error while training a model: {}'.format(e))])) return { 'workspace_id': ws_id, 'quality': json.dumps(attr.asdict(result.meta)), 'model': (encode_object(result.model) if result.model is not None else None), }
def test_single_domain(): docs = [{'html': 'foo{} bar'.format(i % 4), 'url': 'http://example.com/{}'.format(i), 'relevant': i % 2 == 0} for i in range(10)] result = train_model(docs) pprint(attr.asdict(result.meta)) assert lst_as_dict(result.meta.advice)[:2] == [ {'kind': 'Warning', 'text': "Only 1 relevant domain in data means that it's impossible to do " 'cross-validation across domains, and will likely result in ' 'model over-fitting.'}, {'kind': 'Warning', 'text': 'Number of human labeled documents is just 10, consider having ' 'at least 100 labeled.'}, ] assert lst_as_dict(result.meta.description)[:3] == [ {'heading': 'Dataset', 'text': '10 documents, 10 labeled across 1 domain.'}, {'heading': 'Class balance', 'text': '50% relevant, 50% not relevant.'}, {'heading': 'Metrics', 'text': ''}, ] assert result.model is None
def json_ready_header(header): # type: (MessageHeader) -> Dict[str, Any] """Create a JSON-serializable representation of a :class:`aws_encryption_sdk.structures.MessageHeader`. http://docs.aws.amazon.com/encryption-sdk/latest/developer-guide/message-format.html#header-structure :param header: header for which to create a JSON-serializable representation :type header: aws_encryption_sdk.structures.MessageHeader :rtype: dict """ dict_header = attr.asdict(header) del dict_header['content_aad_length'] dict_header['version'] = str(float(dict_header['version'].value)) dict_header['algorithm'] = dict_header['algorithm'].name for key, value in dict_header.items(): if isinstance(value, Enum): dict_header[key] = value.value dict_header['message_id'] = unicode_b64_encode(dict_header['message_id']) dict_header['encrypted_data_keys'] = sorted( list(dict_header['encrypted_data_keys']), key=lambda x: six.b(x['key_provider']['provider_id']) + x['key_provider']['key_info'] ) for data_key in dict_header['encrypted_data_keys']: data_key['key_provider']['provider_id'] = unicode_b64_encode(six.b(data_key['key_provider']['provider_id'])) data_key['key_provider']['key_info'] = unicode_b64_encode(data_key['key_provider']['key_info']) data_key['encrypted_data_key'] = unicode_b64_encode(data_key['encrypted_data_key']) return dict_header
def list_asdict(in_list: Iterable[Any]) -> List[Any]: """asdict'ify a list of objects. Useful when converting a list of objects to json. """ return [asdict(obj) for obj in in_list] # noinspection PyUnusedLocal
def export(self): return asdict(self)
def public_export(self): result = {} for name, attr in asdict(self, recurse=False).items(): if isinstance(attr, Keypair): result[name + '_pk'] = pet2ascii(attr.pk) return result
def _orderedCleanDict(attrsObj): """ -> dict with false-values removed Also evaluates attr-instances for false-ness by looking at the values of their properties """ def _filt(k, v): if attr.has(v): return not not any(attr.astuple(v)) return not not v return attr.asdict(attrsObj, dict_factory=UnsortableOrderedDict, recurse=False, filter=_filt)
def json(self): return attr.asdict(self)
def handle_next_request(self): # Get the next JobRequest try: request_id, meta, request_message = self.transport.receive_request_message() except MessageReceiveTimeout: # no new message, nothing to do return if meta.setdefault('__request_serialized__', True) is False: # The caller is a new client that did not double-serialize, so do not double-deserialize job_request = request_message else: # The caller is an old client that double-serialized, so be sure to double-deserialize # TODO: Remove this and the serializer in version >= 0.25.0 job_request = self.serializer.blob_to_dict(request_message) self.job_logger.info('Job request: %s', job_request) # Process and run the Job job_response = self.process_job(job_request) # Send the JobResponse response_dict = {} try: response_dict = attr.asdict(job_response, dict_factory=UnicodeKeysDict) if meta['__request_serialized__'] is False: # Match the response serialization behavior to the request serialization behavior response_message = response_dict else: # TODO: Remove this and the serializer in version >= 0.25.0 response_message = self.serializer.dict_to_blob(response_dict) except Exception as e: self.metrics.counter('server.error.serialization_failure').increment() job_response = self.handle_error(e, variables={'job_response': response_dict}) response_dict = attr.asdict(job_response, dict_factory=UnicodeKeysDict) if meta['__request_serialized__'] is False: # Match the response serialization behavior to the request serialization behavior response_message = response_dict else: # TODO: Remove this and the serializer in version >= 0.25.0 response_message = self.serializer.dict_to_blob(response_dict) self.transport.send_response_message(request_id, meta, response_message) self.job_logger.info('Job response: %s', response_dict)
def _base_send_request(self, request_id, meta, job_request): with self.metrics.timer('client.send.excluding_middleware'): if isinstance(job_request, JobRequest): job_request = attr.asdict(job_request, dict_factory=UnicodeKeysDict) meta['__request_serialized__'] = True self.transport.send_request_message( request_id, meta, self.serializer.dict_to_blob(job_request), ) # meta['__request_serialized__'] = False # self.transport.send_request_message(request_id, meta, job_request)
def ansible_inventory(hosts, out=sys.stdout, indent=None): inventory = Inventory(hosts) data = OrderedDict( (name, attr.asdict(group, dict_factory=OrderedDict)) for name, group in inventory.groups.items()) for group in data.values(): for attr_name in ('hosts', 'vars', 'children',): if not group[attr_name]: del group[attr_name] data['_meta'] = {'hostvars': inventory.hosts.copy()} json.dump(data, out, indent=indent, default=list)
def __str__(self): return json.dumps(attr.asdict(self, filter=lambda _attr, value: value is not None))
def test_attrs_asdict_unstructure(converter, nested_class): # type: (Converter, Type) -> None """Our dumping should be identical to `attrs`.""" instance = nested_class[0]() assert converter.unstructure(instance) == asdict(instance)
def test_structure_simple_from_dict(converter, cl_and_vals): # type: (Converter, Any) -> None """Test structuring non-nested attrs classes dumped with asdict.""" cl, vals = cl_and_vals obj = cl(*vals) dumped = asdict(obj) loaded = converter.structure(dumped, cl) assert obj == loaded
def asdict(self): """return a dict representation of the filter""" return attr.asdict(self)
def asdict(self): return { 'cls': self.cls, 'params': self.params, 'acquired': self.acquired, 'avail': self.avail, }
def asdict(self): result = attr.asdict(self) del result['name'] # the name is the key in the places dict return result
def dumpjob_cmd(jq, io, args): import attr tasks_as_dicts = [] jobid = _resolve_jobid(jq, args.jobid) job = jq.get_job(jobid) job = attr.asdict(job) tasks = jq.get_tasks(jobid) for task in tasks: t = attr.asdict(task) task_args = io.get_as_str(task.args) t['args_url'] = t['args'] t['args'] = json.loads(task_args) tasks_as_dicts.append(t) print(json.dumps(dict(job=job, tasks=tasks_as_dicts), indent=2, sort_keys=True))
def test_auth_loading(dummy_auth): """Is the authentication properly loaded from file?""" correct = StringIO(yaml.dump(attr.asdict(dummy_auth))) empty = StringIO() assert proxy.Authorization.load(correct) == dummy_auth with pytest.raises(exceptions.InvalidStream): proxy.Authorization.load(empty)
def test_auth_store(dummy_auth): """Is the authentication properly stored into a file?""" buffer = StringIO() dummy_auth.dump(buffer) data = yaml.load(buffer.getvalue()) assert data == attr.asdict(dummy_auth) # Function tests # # Dependency resolution tests
def __call__(self, req: requests.Request) -> requests.Request: """Make the request authenticated.""" header_fmt = 'Token {user_id}:{token}' req.headers['Authorization'] = header_fmt.format_map(attr.asdict(self)) return req
def dump(self, file: TextIO) -> None: """Store credentials for future use. Keyword arguments: file: Open YAML text stream to write to. """ yaml.dump(attr.asdict(self), file)
def to_dict(self) -> Dict[str, Any]: return attr.asdict(self)
def __structlog__(self): if attr.has(self.__class__): return attr.asdict(self) if hasattr(self, 'to_dict') and callable(self.to_dict): return self.to_dict() return self
def test_conda_manager_identify_distributions(get_conda_test_dir): # Skip if network is not available (skip_if_no_network fails with fixtures) test_dir = get_conda_test_dir files = [os.path.join(test_dir, "miniconda/bin/sqlite3"), os.path.join(test_dir, "miniconda/envs/mytest/bin/xz"), os.path.join(test_dir, "miniconda/envs/mytest/lib/python2.7/site-packages/pip/index.py"), os.path.join(test_dir, "miniconda/envs/mytest/lib/python2.7/site-packages/rpaths.py"), "/sbin/iptables"] tracer = CondaTracer() dists = list(tracer.identify_distributions(files)) assert len(dists) == 1, "Exactly one Conda distribution expected." (distributions, unknown_files) = dists[0] assert unknown_files == ["/sbin/iptables"], \ "Exactly one file (/sbin/iptables) should not be discovered." assert len(distributions.environments) == 2, \ "Two conda environments are expected." out = {'environments': [{'packages': [{'files': ['bin/sqlite3'], 'name': 'sqlite'}]}, {'packages': [{'files': ['bin/xz'], 'name': 'xz'}, {'files': ['lib/python2.7/site-packages/pip/index.py'], 'name': 'pip'}, {'files': ['lib/python2.7/site-packages/rpaths.py'], 'installer': 'pip', 'name': 'rpaths'} ] } ] } assert_is_subset_recur(out, attr.asdict(distributions), [dict, list]) NicemanProvenance.write(sys.stdout, distributions) print(json.dumps(unknown_files, indent=4))
def yaml_representer(dumper, data): ordered_items = filter( lambda i: bool(i[1]), # so only non empty/None attr.asdict( data, dict_factory=collections.OrderedDict).items()) return dumper.represent_mapping('tag:yaml.org,2002:map', ordered_items)
def asdict(self): d = attr.asdict(self) d["aliases"] = list(d["aliases"]) return d
def open(db_conf): from postgresql import driver from .pglib import category conn_params = attr.asdict(db_conf) for key in ("clean_interval", "clean_full", "reindex_interval", "reindex_full"): del conn_params[key] return Database(driver.connect(category=category, **conn_params), db_conf.database or db_conf.user)
def keys(self): return attr.asdict(self).keys()
def values(self): return attr.asdict(self).values()
def items(self): return attr.asdict(self).items()
def asdict(self): """Convert to a dictionary.""" return attr.asdict(self)
def __repr__(self): """Builds the proper repr string.""" return '{name}({kwargs})'.format( name=self.__class__.__name__, kwargs=', '.join( '{key}={value}'.format( key=key, value=value ) for key, value in sorted( attr.asdict(self.config, recurse=True).items(), key=lambda x: x[0] ) ) )
def lst_as_dict(lst): return [attr.asdict(x) for x in lst]
def test_train_model(): data = fetch_20newsgroups( random_state=42, categories=['sci.crypt', 'sci.electronics', 'sci.med', 'sci.space']) limit = 200 if limit is not None: data['target'] = data['target'][:limit] data['data'] = data['data'][:limit] n_domains = int(len(data['target']) / 5) docs = [ { 'html': '\n'.join('<p>{}</p>'.format(t) for t in text.split('\n')), 'url': 'http://example-{}.com/{}'.format(n % n_domains, n), 'relevant': {'sci.space': True, 'sci.med': None}.get( data['target_names'][target], False), } for n, (text, target) in enumerate(zip(data['data'], data['target']))] result = train_model(docs) pprint(attr.asdict(result.meta)) assert lst_as_dict(result.meta.advice) == [ {'kind': 'Notice', 'text': "The quality of the classifier is very good, ROC AUC is 0.96. " "You can label more pages if you want to improve quality, " "but it's better to start crawling " "and check the quality of crawled pages.", }, ] assert lst_as_dict(result.meta.description) == [ {'heading': 'Dataset', 'text': '200 documents, 159 labeled across 40 domains.'}, {'heading': 'Class balance', 'text': '33% relevant, 67% not relevant.'}, {'heading': 'Metrics', 'text': ''}, {'heading': 'Accuracy', 'text': '0.881 ± 0.122'}, {'heading': 'ROC AUC', 'text': '0.964 ± 0.081'}] assert len(result.meta.weights['pos']) > 0 assert len(result.meta.weights['neg']) > 0 assert isinstance(result.model, BaseModel) assert hasattr(result.model, 'predict_proba')
def test_empty(): result = train_model([]) pprint(attr.asdict(result.meta)) assert result.meta == Meta( advice=[AdviceItem('Error', 'Can not train a model, no pages given.')]) assert result.model is None
def test_unlabeled(): docs = [{'html': 'foo', 'url': 'http://example{}.com'.format(i), 'relevant': None} for i in range(10)] result = train_model(docs) pprint(attr.asdict(result.meta)) assert result.meta == Meta( advice=[AdviceItem( 'Error', 'Can not train a model, no labeled pages given.')]) assert result.model is None
def test_two_domains(): docs = [{'html': 'foo{}'.format(i % 3), 'url': 'http://example{}.com/{}'.format(i % 2, i), 'relevant': i % 3 == 0} for i in range(10)] result = train_model(docs) pprint(attr.asdict(result.meta)) assert lst_as_dict(result.meta.advice) == [ {'kind': 'Warning', 'text': 'Low number of relevant domains (just 2) might result in model ' 'over-fitting.'}, {'kind': 'Warning', 'text': 'Number of human labeled documents is just 10, consider having ' 'at least 100 labeled.'}, {'kind': 'Notice', 'text': 'The quality of the classifier is very good, ROC AUC is ' '1.00. Still, consider fixing warnings shown above.'}] assert lst_as_dict(result.meta.description) == [ {'heading': 'Dataset', 'text': '10 documents, 10 labeled across 2 domains.'}, {'heading': 'Class balance', 'text': '40% relevant, 60% not relevant.'}, {'heading': 'Metrics', 'text': ''}, {'heading': 'Accuracy', 'text': '1.000 ± 0.000'}, {'heading': 'ROC AUC', 'text': '1.000 ± 0.000'}] assert result.model is not None
def test_default_clf(model_cls): docs = [{'html': 'foo{} bar'.format(i % 4), 'url': 'http://example{}.com'.format(i), 'relevant': i % 2 == 0} for i in range(100)] result = default_train_model(docs, model_cls=model_cls) assert result.model is not None meta = attr.asdict(result.meta) pprint(meta) json.dumps(meta)
def _report_item(self, item): indent = INDENT if os.environ.get("ALLURE_INDENT_OUTPUT") else None filename = item.file_pattern.format(prefix=uuid.uuid4()) data = asdict(item, filter=lambda attr, value: not (type(value) != bool and not bool(value))) with io.open(os.path.join(self._report_dir, filename), 'w', encoding='utf8') as json_file: if sys.version_info.major < 3: json_file.write(unicode(json.dumps(data, indent=indent, ensure_ascii=False, encoding='utf8'))) else: json.dump(data, json_file, indent=indent, ensure_ascii=False)
def test_multiple_requests(self): """ Sending multiple requests with StubClient.send_request for different actions and then calling get_all_responses returns responses for all the actions that were called. """ responses = { 'action_1': {'body': {'foo': 'bar'}, 'errors': []}, 'action_2': {'body': {'baz': 42}, 'errors': []}, 'action_3': { 'body': {}, 'errors': [ { 'code': ERROR_CODE_INVALID, 'message': 'Invalid input', 'field': 'quas.wex', 'traceback': None, 'variables': None, }, ], }, } self.client.stub_action(SERVICE_NAME, 'action_1', **responses['action_1']) self.client.stub_action(SERVICE_NAME, 'action_2', **responses['action_2']) self.client.stub_action(SERVICE_NAME, 'action_3', **responses['action_3']) control = self.client._make_control_header() context = self.client._make_context_header() request_1 = dict(control_extra=control, context=context, actions=[ {'action': 'action_1'}, {'action': 'action_2'}, ]) request_2 = dict(control_extra=control, context=context, actions=[ {'action': 'action_2'}, {'action': 'action_1'}, ]) request_3 = dict(control_extra=control, context=context, actions=[{'action': 'action_3'}]) # Store requests by request ID for later verification, because order is not guaranteed requests_by_id = {} for request in (request_1, request_2, request_3): request_id = self.client.send_request(SERVICE_NAME, **request) requests_by_id[request_id] = request for response_id, response in self.client.get_all_responses(SERVICE_NAME): # The client returned the same number of actions as were requested self.assertEqual(len(response.actions), len(requests_by_id[response_id]['actions'])) for i in range(len(response.actions)): action_response = response.actions[i] # The action name returned matches the action name in the request self.assertEqual(action_response.action, requests_by_id[response_id]['actions'][i]['action']) # The action response matches the expected response # Errors are returned as the Error type, so convert them to dict first self.assertEqual(action_response.body, responses[action_response.action]['body']) self.assertEqual( [attr.asdict(e, dict_factory=UnicodeKeysDict) for e in action_response.errors], responses[action_response.action]['errors'], )