我们从Python开源项目中,提取了以下47个代码示例,用于说明如何使用attr.ib()。
def test_do(): @attr.s class EDo: arg = attr.ib() @do def do_func(a, b): done_a = yield Effect(EDo(a)) done_b = yield Effect(EDo(b)) return [done_a, done_b] effect = do_func(1, 2) assert isinstance(effect, Effect) assert isinstance(effect.intent, ChainedIntent) dispatcher = TypeDispatcher({ EDo: lambda intent: 'done: %s' % intent.arg }) ret = sync_perform(dispatcher, effect) assert ret == ['done: 1', 'done: 2']
def test_chained_intent(self): @attr.s class ENumToString: num = attr.ib() def collect_intent_results(): intent_results = [] for i in range(5): res = yield Effect(ENumToString(i)) intent_results.append(res) return ''.join(intent_results) effect = Effect(ChainedIntent(collect_intent_results())) dispatcher = TypeDispatcher({ ENumToString: lambda intent: str(intent.num) }) ret = sync_perform(dispatcher, effect) assert ret == '01234'
def test_chained_intent(self): @attr.s class ENumToString: num = attr.ib() def collect_intent_results(): intent_results = [] for i in range(5): res = yield Effect(ENumToString(i)) intent_results.append(res) return ''.join(intent_results) effect = Effect(ChainedIntent(collect_intent_results())) dispatcher = TypeDispatcher({ ENumToString: lambda intent: str(intent.num) }) ret = await asyncio_perform(dispatcher, effect) assert ret == '01234'
def test_optional_field_roundtrip(converter, cl_and_vals): """ Classes with optional fields can be unstructured and structured. """ cl, vals = cl_and_vals @attr.s class C(object): a = attr.ib(type=Optional[cl]) inst = C(a=cl(*vals)) assert inst == converter.structure(converter.unstructure(inst), C) inst = C(a=None) unstructured = converter.unstructure(inst) assert inst == converter.structure(unstructured, C)
def __init_subclass__(cls, class_id, method_id, response_to=None, followed_by_content=False, closing=False, **kwargs): super().__init_subclass__(**kwargs) cls.struct = make_struct(cls) cls.closing = closing cls.followed_by_content = followed_by_content if followed_by_content: cls.content = attr.ib(default=None) cls.responses = [] cls.response_to = response_to if response_to is not None: response_to.responses.append(cls) cls.class_id = attr.ib(default=class_id, init=False) cls.method_id = attr.ib(default=method_id, init=False) cls.channel_id = attr.ib(default=None, init=False) cls.BY_ID[(class_id, method_id)] = cls
def attr_struct(cls=None, vals_to_attrs=False, defaults=..., **kws): if not cls: return ft.partial( attr_struct, vals_to_attrs=vals_to_attrs, defaults=defaults, **kws ) try: keys = cls.keys del cls.keys except AttributeError: keys = list() else: attr_kws = dict() if defaults is not ...: attr_kws['default'] = defaults if isinstance(keys, str): keys = keys.split() for k in keys: setattr(cls, k, attr.ib(**attr_kws)) if vals_to_attrs: for k, v in vars(cls).items(): if k.startswith('_') or k in keys or callable(v): continue setattr(cls, k, attr.ib(v)) kws.setdefault('hash', not hasattr(cls, '__hash__')) kws.setdefault('slots', True) return attr.s(cls, **kws)
def test_secret_str_censors(self): """ _SecretStr censors it's __repr__ if its called from another __repr__. """ s = _SecretStr("abc") @attr.s class Cfg(object): s = attr.ib() assert "Cfg(s=<SECRET>)" == repr(Cfg(s))
def test_tolerates_attribs(self): """ Classes are allowed to have plain attr.ibs for e.g. __attrs_post_init__. """ @environ.config class Cfg(object): e = environ.var() x = attr.ib(default=42) cfg = environ.to_config(Cfg, environ={ "APP_E": "e", }) assert Cfg("e", 42) == cfg
def var(default=RAISE, convert=None, name=None, validator=None): return attr.ib( default=default, metadata={CNF_KEY: _ConfigEntry(name, default, None)}, convert=convert, validator=validator, )
def group(cls): return attr.ib( default=None, metadata={CNF_KEY: _ConfigEntry(None, None, cls, True)} )
def secret(self, default=RAISE, convert=None, name=None, section=None): if section is None: section = self.section return attr.ib( default=default, metadata={ CNF_KEY: _ConfigEntry(name, default, None, self._get), CNF_INI_SECRET_KEY: _INIConfig(section), }, convert=convert, )
def test_AsyncResource_defaults(): @attr.s class MyAR(tabc.AsyncResource): record = attr.ib(default=attr.Factory(list)) async def aclose(self): self.record.append("ac") async with MyAR() as myar: assert isinstance(myar, MyAR) assert myar.record == [] assert myar.record == ["ac"]
def test_base(self): @attr.s class EDoSomething: arg = attr.ib() dispatcher = TypeDispatcher({ EDoSomething: lambda intent: 'bar' }) effect = Effect(EDoSomething('foo')) ret = sync_perform(dispatcher, effect) assert ret == 'bar'
def test_base(self): @attr.s class EDoSomething: arg = attr.ib() dispatcher = TypeDispatcher({ EDoSomething: lambda intent: 'bar' }) effect = Effect(EDoSomething('foo')) ret = await asyncio_perform(dispatcher, effect) assert ret == 'bar'
def test_edge_errors(): """Edge input cases cause errors.""" @attr.s class A(object): pass with pytest.raises(ValueError): # Can't generate for only one class. create_uniq_field_dis_func(A) @attr.s class B(object): pass with pytest.raises(ValueError): # No fields on either class. create_uniq_field_dis_func(A, B) @attr.s class C(object): a = attr.ib() @attr.s class D(object): a = attr.ib() with pytest.raises(ValueError): # No unique fields on either class. create_uniq_field_dis_func(C, D)
def test_union_field_roundtrip(converter, cl_and_vals_a, cl_and_vals_b, strat): """ Classes with union fields can be unstructured and structured. """ converter.unstruct_strat = strat cl_a, vals_a = cl_and_vals_a cl_b, vals_b = cl_and_vals_b a_field_names = {a.name for a in fields(cl_a)} b_field_names = {a.name for a in fields(cl_b)} assume(a_field_names) assume(b_field_names) common_names = a_field_names & b_field_names assume(len(a_field_names) > len(common_names)) @attr.s class C(object): a = attr.ib(type=Union[cl_a, cl_b]) inst = C(a=cl_a(*vals_a)) if strat is UnstructureStrategy.AS_DICT: assert inst == converter.structure(converter.unstructure(inst), C) else: # Our disambiguation functions only support dictionaries for now. with pytest.raises(ValueError): converter.structure(converter.unstructure(inst), C) def handler(obj, _): return converter.structure(obj, cl_a) converter._union_registry[Union[cl_a, cl_b]] = handler assert inst == converter.structure(converter.unstructure(inst), C) del converter._union_registry[Union[cl_a, cl_b]]
def bare_typed_attrs(draw, defaults=None): """ Generate a tuple of an attribute and a strategy that yields values appropriate for that attribute. """ default = attr.NOTHING if defaults is True or (defaults is None and draw(booleans())): default = None return ((attr.ib(type=Any, default=default), just(None)))
def int_typed_attrs(draw, defaults=None): """ Generate a tuple of an attribute and a strategy that yields ints for that attribute. """ default = attr.NOTHING if defaults is True or (defaults is None and draw(booleans())): default = draw(integers()) return ((attr.ib(type=int, default=default), integers()))
def float_typed_attrs(draw, defaults=None): """ Generate a tuple of an attribute and a strategy that yields floats for that attribute. """ default = attr.NOTHING if defaults is True or (defaults is None and draw(booleans())): default = draw(floats()) return ((attr.ib(type=float, default=default), floats()))
def dict_typed_attrs(draw, defaults=None): """ Generate a tuple of an attribute and a strategy that yields dictionaries for that attribute. The dictionaries map strings to integers. """ default = attr.NOTHING val_strat = dictionaries(keys=text(), values=integers()) if defaults is True or (defaults is None and draw(booleans())): default = draw(val_strat) return ((attr.ib(type=Dict[unicode, int], default=default), val_strat))
def just_class(tup): # tup: Tuple[List[Tuple[_CountingAttr, Strategy]], # Tuple[Type, Sequence[Any]]] nested_cl = tup[1][0] nested_cl_args = tup[1][1] default = attr.Factory(lambda: nested_cl(*nested_cl_args)) combined_attrs = list(tup[0]) combined_attrs.append((attr.ib(type=nested_cl, default=default), just(nested_cl(*nested_cl_args)))) return _create_hyp_class(combined_attrs)
def list_of_class(tup): nested_cl = tup[1][0] nested_cl_args = tup[1][1] default = attr.Factory(lambda: [nested_cl(*nested_cl_args)]) combined_attrs = list(tup[0]) combined_attrs.append((attr.ib(type=List[nested_cl], default=default), just([nested_cl(*nested_cl_args)]))) return _create_hyp_class(combined_attrs)
def dict_of_class(tup): nested_cl = tup[1][0] nested_cl_args = tup[1][1] default = attr.Factory(lambda: {"cls": nested_cl(*nested_cl_args)}) combined_attrs = list(tup[0]) combined_attrs.append((attr.ib(type=Dict[str, nested_cl], default=default), just({'cls': nested_cl(*nested_cl_args)}))) return _create_hyp_class(combined_attrs)
def list_of_class(tup): nested_cl = tup[1][0] default = attr.Factory(lambda: [nested_cl()]) combined_attrs = list(tup[0]) combined_attrs.append((attr.ib(default=default), st.just([nested_cl()]))) return _create_hyp_class(combined_attrs)
def dict_of_class(tup): nested_cl = tup[1][0] default = attr.Factory(lambda: {"cls": nested_cl()}) combined_attrs = list(tup[0]) combined_attrs.append((attr.ib(default=default), st.just({'cls': nested_cl()}))) return _create_hyp_class(combined_attrs)
def bare_attrs(draw, defaults=None): """ Generate a tuple of an attribute and a strategy that yields values appropriate for that attribute. """ default = NOTHING if defaults is True or (defaults is None and draw(st.booleans())): default = None return ((attr.ib(default=default), st.just(None)))
def int_attrs(draw, defaults=None): """ Generate a tuple of an attribute and a strategy that yields ints for that attribute. """ default = NOTHING if defaults is True or (defaults is None and draw(st.booleans())): default = draw(st.integers()) return ((attr.ib(default=default), st.integers()))
def str_attrs(draw, defaults=None): """ Generate a tuple of an attribute and a strategy that yields strs for that attribute. """ default = NOTHING if defaults is True or (defaults is None and draw(st.booleans())): default = draw(st.text()) return ((attr.ib(default=default), st.text()))
def dict_attrs(draw, defaults=None): """ Generate a tuple of an attribute and a strategy that yields dictionaries for that attribute. The dictionaries map strings to integers. """ default = NOTHING val_strat = st.dictionaries(keys=st.text(), values=st.integers()) if defaults is True or (defaults is None and draw(st.booleans())): default_val = draw(val_strat) default = attr.Factory(lambda: default_val) return ((attr.ib(default=default), val_strat))
def generate_fake_bugspots(hotspot_files): @attr.s class BugspotsResult: filename = attr.ib(convert=str) class FakeBugspots: def __init__(self, *args, **kwargs): pass def get_hotspots(self): return [BugspotsResult(file) for file in hotspot_files] return FakeBugspots
def test_unextractable_schema(extractor): """ test an attrs class which we can't extract schema from. """ @attr.s class NoBueno(object): foo = attr.ib() with pytest.raises(UnextractableSchema): extractor.extract(NoBueno)
def Field(*args, default=attr.NOTHING, **kwargs): if callable(default): default = attr.Factory(default) return attr.ib(*args, default=default, **kwargs)
def ForeignKey(cls, *args, **kwargs): metadata = { 'related': { 'target': cls, 'type': 'ForeignKey', } } return attr.ib(*args, metadata=metadata, **kwargs)
def __init_subclass__(cls, soft, reply_code, **kwargs): super().__init_subclass__(**kwargs) cls.soft = attr.ib(default=soft) cls.reply_code = attr.ib(default=reply_code) cls.BY_ID[reply_code] = cls
def __init_subclass__(cls, class_id, **kwargs): super().__init_subclass__(**kwargs) cls.class_id = attr.ib(default=class_id, init=False) cls.struct = make_struct( cls, exclude_attrs={'class_id'}, struct=PropertiesStruct ) cls.BY_ID[class_id] = cls
def attr_init(factory_or_default=attr.NOTHING, **attr_kws): if callable(factory_or_default): factory_or_default = attr.Factory(factory_or_default) return attr.ib(default=factory_or_default, **attr_kws)
def log(name): """Returns a logger attr.ib :param name: name to pass to logging.getLogger() :rtype: attr.ib """ return attr.ib(default=logging.getLogger(name), repr=False, hash=False, cmp=False)
def attrib(*args, **kwargs): """ Extend the attr.ib to include our metadata elements. ATM we support additional keyword args which are then stored within `metadata`: - `doc` for documentation to describe the attribute (e.g. in --help) """ doc = kwargs.pop('doc', None) metadata = kwargs.get('metadata', {}) if doc: metadata['doc'] = doc if metadata: kwargs['metadata'] = metadata return attr.ib(*args, **kwargs)
def __init__(self, path, session): """Representation for a repository Parameters ---------- path: str Path to the top of the repository """ self.path = path.rstrip(os.sep) # TODO: might be done as some rg to attr.ib self._session = session self._all_files = None self._branch = None
def TypedList(type_): """A helper to generate an attribute which would be with list factory but also defining a type in its metadata """ return attr.ib(default=Factory(list), metadata={'type': type_}) # # Models #
def __setattr__(self, name, value): """Mimic attr.s(frozen=True) behaviour but allow for attributes to be initialized after class instantiation. Useful when you would like a class to be immutable after a certain action, such as a save to a database. Any attributes created with ``attr.ib(init=False)`` or are initially set to ``None`` in ``__init__()`` are allowed to have their values be set once after initialization. Any other attributes with initial values set are immediately frozen upon initialization. **Note**: Obviously, this doesn't stop anyone from setting the uninitialized attributes before you've set it yourself. Hopefully, you've got responsibile users. Raises: :class:`attr.exceptions.FronzenInstanceError`: if a frozen attribute is set """ current_value = getattr(self, name, None) if current_value is None or isinstance(current_value, attr.Attribute): super().__setattr__(name, value) else: raise attr.exceptions.FrozenInstanceError()
def test_is_callable_cls(): from coalaip.model_validators import is_callable @attr.s class TestCallable: fn = attr.ib(validator=is_callable) return TestCallable
def test_use_model_attr_cls(): from coalaip.model_validators import use_model_attr @attr.s class TestUseModelAttr: validator = attr.ib() test = attr.ib(validator=use_model_attr('validator')) return TestUseModelAttr
def create_test_does_not_contain_cls(): from coalaip.model_validators import does_not_contain def create_cls(*avoid_keys): @does_not_contain(*avoid_keys) def validator(*args, **kwargs): pass @attr.s class TestDoesNotContain: data = attr.ib(validator=validator) return TestDoesNotContain return create_cls
def create_reactor(): """ Twisted 17.1.0 and higher requires a reactor which implements ``IReactorPluggableNameResolver``. """ @implementer(IHostResolution) @attr.s class Resolution(object): name = attr.ib() class _FakeResolver(object): def resolveHostName(self, resolutionReceiver, hostName, *args, **kwargs): portNumber = kwargs.pop('portNumber') r = Resolution(name=hostName) resolutionReceiver.resolutionBegan(r) if hostName in HOST_MAP: resolutionReceiver.addressResolved( IPv4Address('TCP', HOST_MAP[hostName], portNumber)) resolutionReceiver.resolutionComplete() return r @implementer(IReactorPluggableNameResolver) class _ResolvingMemoryClockReactor(MemoryReactorClock): nameResolver = _FakeResolver() return _ResolvingMemoryClockReactor()
def _run_comparison(self, hgvsc, expected_sequence): # test replicates the internal class of p_to_c @attr.s(slots=True) class RefTranscriptData(object): transcript_sequence = attr.ib() aa_sequence = attr.ib() cds_start = attr.ib() cds_stop = attr.ib() protein_accession = attr.ib() @classmethod def setup_transcript_data(cls, ac, ac_p, db, ref="GRCh37.p10"): """helper for generating RefTranscriptData from for c_to_p""" tx_info = db.get_tx_info(ac) tx_seq = db.get_tx_seq(ac) if tx_info is None or tx_seq is None: raise hgvs.exceptions.HGVSError("Missing transcript data for accession: {}".format(ac)) # use 1-based hgvs coords cds_start = tx_info["cds_start_i"] + 1 cds_stop = tx_info["cds_end_i"] # padding list so biopython won't complain during the conversion tx_seq_to_translate = tx_seq[cds_start - 1:cds_stop] if len(tx_seq_to_translate) % 3 != 0: "".join(list(tx_seq_to_translate).extend(["N"] * ((3 - len(tx_seq_to_translate) % 3) % 3))) tx_seq_cds = Seq(tx_seq_to_translate) protein_seq = str(tx_seq_cds.translate()) transcript_data = RefTranscriptData(tx_seq, protein_seq, cds_start, cds_stop, ac_p) return transcript_data ac_p = "DUMMY" var = self._parser.parse_hgvs_variant(hgvsc) transcript_data = RefTranscriptData.setup_transcript_data(var.ac, ac_p, self._datasource) builder = altseqbuilder.AltSeqBuilder(var, transcript_data) insert_result = builder.build_altseq() actual_sequence = insert_result[0].transcript_sequence msg = "expected: {}\nactual : {}".format(expected_sequence, actual_sequence) self.assertEqual(expected_sequence, actual_sequence, msg)
def test_comparison(self): """ The binary comparison operations work on ``KubernetesError`` as expected. """ model = v1_5_model a1 = KubernetesError(200, model.v1.Status(status=u"A")) a2 = KubernetesError(200, model.v1.Status(status=u"A")) b = KubernetesError(201, model.v1.Status(status=u"A")) c = KubernetesError(200, model.v1.Status(status=u"B")) # a1 == a2 self.expectThat(a1, Equals(a2)) # not (a1 != a2) self.expectThat(a1, Not(NotEquals(a2))) # not (a1 > a2) self.expectThat(a1, Not(GreaterThan(a2))) # not (a1 < a2) self.expectThat(a1, Not(LessThan(a2))) # not (a1 == b) self.expectThat(a1, Not(Equals(b))) # a1 != b self.expectThat(a1, NotEquals(b)) # a1 < b self.expectThat(a1, LessThan(b)) # not (a1 > b) self.expectThat(a1, Not(GreaterThan(b))) # not (a1 == c) self.expectThat(a1, Not(Equals(b))) # a1 != c self.expectThat(a1, NotEquals(b)) # a1 < c self.expectThat(a1, LessThan(c)) # not (a1 > c) self.expectThat(a1, Not(GreaterThan(c))) @attr.s class Comparator(object): result = attr.ib() def __cmp__(self, other): return self.result largest = Comparator(1) equalest = Comparator(0) smallest = Comparator(-1) # a1 < largest self.expectThat(a1, LessThan(largest)) # a1 == equalest self.expectThat(a1, Equals(equalest)) # a1 > smallest self.expectThat(a1, GreaterThan(smallest))