我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用collections.abc.Sequence()。
def torrents(self, torrents=None, keys='ALL', autoconnect=True): """Fetch and return torrents torrents: Iterator of torrent IDs, TorrentFilter object (or its string representation) or None for all torrents keys: tuple of Torrent keys to fetch or 'ALL' for all torrents autoconnect: Wether to attempt to connect automatically if not connected; if False and not connected, return None Return Response with the following properties: torrents: tuple of Torrent objects with requested torrents success: False if no torrents were found, True otherwise msgs: list of strings/`ClientError`s caused by the request """ if not autoconnect and not self.rpc.connected: return None elif torrents is None: return await self._get_torrents_by_ids(keys) elif isinstance(torrents, (str, TorrentFilter)): return await self._get_torrents_by_filter(keys, tfilter=torrents) elif isinstance(torrents, abc.Sequence) and \ all(isinstance(id, int) for id in torrents): return await self._get_torrents_by_ids(keys, ids=torrents) else: raise ValueError("Invalid 'torrents' argument: {!r}".format(torrents))
def __init__(self, member, flags=None): if flags is not None: assert (issubclass(type(flags), colabc.Set) or \ issubclass(type(flags), colabc.Sequence)) and \ not isinstance(flags, str), \ "flags must be a container and not a string" assert all([isinstance(flag, str) for flag in list(flags)]), \ "all flags must be strings, given{}".format(flags) super().__init__() self.member = member # list of selections self._registry = [] # list of flags for specific kinds of selections self._flags = set() if flags: self._flags.update(flags)
def __init__(self, selection_list=None, flags=None): if not selection_list: self.data = [] super().__init__(selection_list, flags=flags) if selection_list: assert issubclass(type(selection_list), col.Sequence), \ "selection_dict must be a subclass of collections.Sequence, not {}".format( type(selection_list)) self.data = selection_list # if values in the selection_list are SelectionMembers update # their registries for idx, member in enumerate(self.data): if issubclass(type(member), SelectionMember): member.register_selection(idx, self, flags=flags)
def __init__(self, action, asset_pair, volume, price=None, validate=False): if action not in [Order.BUY, Order.SELL]: raise ValueError("Action must be Order.BUY or Order.SELL") if not isinstance(asset_pair, Sequence) or len(asset_pair) != 2: raise TypeError("asset_pair must be a sequence of lenght 2") if any([not isinstance(a, Asset) for a in asset_pair]): raise TypeError("asset_pair must be a sequence of assets, " "not {}, {}".format(*map(type, asset_pair))) if volume < 0: raise ValueError("volume should be strictly positive") self.id = None self._action = action self._asset_pair = asset_pair self._volume = volume self._price = price self.status = None self.time_placed = None self.closed_price = None self.closed_time = None self._validate = validate
def get_part(self, doc, part): """ Returns the next step in the correct type """ if isinstance(doc, Mapping): return part elif isinstance(doc, Sequence): if part == '-': return part if not RE_ARRAY_INDEX.match(str(part)): raise JsonPointerException("'%s' is not a valid list index" % (part, )) return int(part) elif hasattr(doc, '__getitem__'): # Allow indexing via ducktyping if the target has defined __getitem__ return part else: raise JsonPointerException("Document '%s' does not support indexing, " "must be dict/list or support __getitem__" % type(doc))
def addTags(filename, new_tags): with open(filename, 'rb') as task: header = task.readline() # read the tags tags = task.readline().decode().strip().split(' ') if isinstance(new_tags, str): if new_tags in tags: return else: tags.append(new_tags) elif isinstance(new_tags, Sequence): new_tags = [tag for tag in new_tags if tag not in tags] if new_tags: tags.extend(new_tags) else: return else: raise ValueError(f'Cannot add tags {new_tags} to task {filename}') body = task.read() with open(filename, 'wb') as task: task.write(header) task.write((' '.join(tags) + '\n').encode()) task.write(body)
def short_repr(obj, noneAsNA=False): '''Return a short representation of obj for clarity.''' if obj is None: return 'unspecified' if noneAsNA else 'None' elif isinstance(obj, str) and len(obj) > 80: return '{}...{}'.format(obj[:60].replace('\n', '\\n'), obj[-20:].replace('\n', '\\n')) elif isinstance(obj, (str, int, float, bool)) or (isinstance(obj, collections.Sequence) \ and len(obj) <= 2) or len(str(obj)) < 80: return repr(obj) elif isinstance(obj, collections.Sequence): # should be a list or tuple return f'[{short_repr(obj[0])}, ...] ({len(obj)} items)' elif isinstance(obj, dict): if obj: first_key = list(obj.keys())[0] return f'{{{first_key!r}:{short_repr(obj[first_key])!r}, ...}} ({len(obj)} items)' else: return '{}' else: return f'{repr(obj)[:60]}...' # # SoS Workflow dictionary #
def _get_path_map(self): res = {} # if user-specified path_map, it overrides CONFIG path_map = self.config.get('path_map', []) # if not path_map: return res if isinstance(path_map, str): path_map = [path_map] if isinstance(path_map, Sequence): for v in path_map: if ' -> ' not in v: raise ValueError(f'Path map should be separated as from -> to, {v} specified') elif v.count(' -> ') > 1: raise ValueError(f'Path map should be separated as from -> to, {v} specified') res[v.split(' -> ')[0]] = v.split(' -> ')[1] elif isinstance(path_map, dict): for k,v in path_map.items(): res[k] = v else: raise ValueError(f'Unacceptable path_mapue for configuration path_map: {path_map}') return res
def get_part(self, doc, part): """Returns the next step in the correct type""" if isinstance(doc, Mapping): return part elif isinstance(doc, Sequence): if part == '-': return part if not self._RE_ARRAY_INDEX.match(str(part)): raise JsonPointerException("'%s' is not a valid sequence index" % part) return int(part) elif hasattr(doc, '__getitem__'): # Allow indexing via ducktyping # if the target has defined __getitem__ return part else: raise JsonPointerException("Document '%s' does not support indexing, " "must be mapping/sequence or support __getitem__" % type(doc))
def walk(self, doc, part): """ Walks one step in doc and returns the referenced part """ part = self.get_part(doc, part) assert hasattr(doc, '__getitem__'), "invalid document type %s" % (type(doc),) if isinstance(doc, Sequence): if part == '-': return EndOfList(doc) try: return doc[part] except IndexError: raise JsonPointerException("index '%s' is out of bounds" % (part, )) # Else the object is a mapping or supports __getitem__(so assume custom indexing) try: return doc[part] except KeyError: raise JsonPointerException("member '%s' not found in %s" % (part, doc))
def run_test(work_type: FunctionType, job_sets: Sequence, trials: int, pool_class: type, worker_count: int) -> Mapping: pool = pool_class(worker_count) if work_type == 'compute': test_func = pool.run_compute_test elif work_type == 'network': test_func = pool.run_network_test else: raise Exception("Invalid work type: {}".format(work_type)) results = map( lambda jobs: test_func(jobs, trials, show_progress=True), tqdm(job_sets, desc=pool_class.__name__), ) summarized_results = list(map(summarize_test, results)) pool.destroy_pool() return summarized_results
def __init__(self, ev, step, progname, repeat=[1,], iter_triggers=(), end_triggers=(),): module_logger.debug("[ step %s ] Container initialization\n iter_triggers: %s\n end_triggers: %s\n repeat: %s" % (progname, iter_triggers, end_triggers, repeat) ) self.ev=ev self.progname=progname self.starters=iter_triggers self.enders=end_triggers if isinstance(repeat, AbcSequence): repeat=IterGen(repeat) self.repeat=repeat self.loop_index=0 self.initiating_sequence=None self.step=step self.max_concurrent=self.step.config['max_concurrent'] self.triggers=None
def __call__(self, obj): """Transforms the JSON object `obj`.""" if isinstance(obj, str): return obj elif isinstance(obj, Sequence): return self.act_on_list(obj) elif isinstance(obj, Mapping): return self.act_on_dict(obj) else: return obj
def non_string_sequence(v, types=None): """Return whether the object is a Sequence other than str, optionally with the given element types.""" return isinstance(v, Sequence) and not isinstance(v, str) and (types is None or all(any(isinstance(x, t) for t in make_iterable(types)) for x in v))
def _file_is_filtered(self, tfile): if self._ffilter is None: return False # No filter specified elif isinstance(self._ffilter, (abc.Sequence, abc.Set)): # ffilter is a collection of file IDs return not tfile['id'] in self._ffilter else: # ffilter is a TorrentFileFilter instance return not self._ffilter.match(tfile)
def focused_file_ids(self): """File IDs of the focused files in a tuple""" focused = self.focused_widget if focused is not None: # The focused widget in the list can be a file or a directory. If # it's a directory, the 'file_id' property returns the IDs of all # the contained files recursively. fid = focused.file_id return tuple(fid) if isinstance(fid, (abc.Sequence, abc.Set)) else (fid,)
def register(self, sid, callback, keys=(), tfilter=None): """Add new request to request pool sid: Subscriber ID (any hashable) callback: Callable that receives a tuple of Torrents on updates keys: Wanted Torrent keys tfilter: None for all torrents or TorrentFilter instance """ if isinstance(tfilter, abc.Sequence): tfilter = TorrentFilter('|'.join('id=%s' % tid for tid in tfilter)) log.debug('Registering subscriber: %s', sid) event = blinker.signal(sid) event.connect(callback) self._keys[event] = tuple(keys) self._tfilters[event] = tfilter # It's possible that a currently ongoing request doesn't collect the # keys this new callback needs. In that case, the request is finished # AFTER we added the callback, and the callback would be called with # lacking keys, resuling in a KeyError. # Therefore we ask the poller to dump the result of a currently # ongoing request to prevent this. if self.running: self.skip_ongoing_request() self._combine_requests()
def run(self, TORRENT_FILTER, PEER_FILTER, sort, columns): columns = self.cfg['columns.peers'].value if columns is None else columns sort = self.cfg['sort.peers'].value if sort is None else sort try: tfilter = self.select_torrents(TORRENT_FILTER, allow_no_filter=True, discover_torrent=True) pfilter = self.get_peer_filter(PEER_FILTER) sort = self.get_peer_sorter(sort) columns = self.get_peer_columns(columns) except ValueError as e: log.error(e) return False # Unless we're listing peers of exactly one torrent, specified by its # ID, automatically add the 'torrent' column. if 'torrent' not in columns and \ (not isinstance(tfilter, abc.Sequence) or len(tfilter) != 1): columns.append('torrent') log.debug('Listing %s peers of %s torrents', pfilter, tfilter) if asyncio.iscoroutinefunction(self.make_plist): return await self.make_plist(tfilter, pfilter, sort, columns) else: return self.make_plist(tfilter, pfilter, sort, columns)
def run(self, TORRENT_FILTER, TRACKER_FILTER, sort, columns): columns = self.cfg['columns.trackers'].value if columns is None else columns sort = self.cfg['sort.trackers'].value if sort is None else sort try: torfilter = self.select_torrents(TORRENT_FILTER, allow_no_filter=True, discover_torrent=True) trkfilter = self.get_tracker_filter(TRACKER_FILTER) sort = self.get_tracker_sorter(sort) columns = self.get_tracker_columns(columns) except ValueError as e: log.error(e) return False # Unless we're listing trackers of exactly one torrent, specified by its # ID, automatically add the 'torrent' column. if 'torrent' not in columns and \ (not isinstance(torfilter, abc.Sequence) or len(torfilter) != 1): columns.append('torrent') log.debug('Listing %s trackers of %s torrents', trkfilter, torfilter) if asyncio.iscoroutinefunction(self.make_trklist): return await self.make_trklist(torfilter, trkfilter, sort, columns) else: return self.make_trklist(torfilter, trkfilter, sort, columns)
def __repr__(self): if isinstance(self._args, abc.Sequence): argstr = ', '.join(repr(arg) for arg in self._args) elif isinstance(self._args, abc.Mapping): argstr = ', '.join('%s=%r' % (k,v) for k,v in self._args.items()) provides = '/'.join(interface for interface in self.provides) string = '<Command [{}] {}({})'.format(provides, self.name, argstr) if self.finished: string += ' success={}'.format(self.success) else: string += ' running' return string + '>'
def _validate_cmdchain_item(self, item): # Test if item is of a valid type if not (is_op(item) or (isinstance(item, abc.Sequence) and not isinstance(item, str) and all(isinstance(arg, str) for arg in item))): raise RuntimeError('Invalid type for command chain item: {!r}'.format(item)) # Test if item is an operator after another operator try: prev_item = self._prev_validation_item except AttributeError: prev_item = None self._prev_validation_item = item if is_op(prev_item) and is_op(item): raise CmdError('Consecutive operators: "{} {}"'.format(prev_item, item))
def __add__(self, other): if not isinstance(other, Sequence): return NotImplemented self._check_compatibility(other) return type(self)(map(operator.add, self, other))
def __radd__(self, other): if not isinstance(other, Sequence): return NotImplemented self._check_compatibility(other) return type(self)(map(operator.add, other, self))
def __sub__(self, other): if not isinstance(other, Sequence): return NotImplemented self._check_compatibility(other) return type(self)(map(operator.sub, self, other))
def __rsub__(self, other): if not isinstance(other, Sequence): return NotImplemented self._check_compatibility(other) return type(self)(map(operator.sub, other, self))
def redirect_exception(*exceptions, cls=ChiakiException): """Context manager to re-raise exceptions with a proxy exception class. The exceptions can either be an exception type or a (exc_type, string) pair. """ exceptions = dict(exc if isinstance(exc, Sequence) else (exc, None) for exc in exceptions) try: yield except tuple(exceptions) as e: raise cls(exceptions[type(e)] or str(e)) from e # asynccontextmanager when
def test_Sequence(self): for sample in [tuple, list, bytes, str]: self.assertIsInstance(sample(), Sequence) self.assertTrue(issubclass(sample, Sequence)) self.assertIsInstance(range(10), Sequence) self.assertTrue(issubclass(range, Sequence)) self.assertTrue(issubclass(str, Sequence)) self.validate_abstract_methods(Sequence, '__contains__', '__iter__', '__len__', '__getitem__')
def fake_deepdiff(one, two, indent=4, path=None, strict_strings=None): """Compare two term dictionaries. ``strict_strings=False`` treats strings that contain the same combination of words as equal. """ for k, v in one.items(): _one = v _two = two.get(k) if _one == _two: continue if all(isinstance(d, abc.MutableMapping) for d in (_one, _two)): _path = path if path is not None else [] _path += ['{:<{width}}{}'.format('', k, width=indent)] fake_deepdiff(_one, _two, indent + 4, _path, strict_strings) continue if (all(isinstance(l, abc.MutableSequence) for l in (_one, _two)) and set(tuple(x) for x in _one if isinstance(x, abc.Sequence)) == set(tuple(x) for x in _two if isinstance(x, abc.Sequence))): continue if all(isinstance(l, str) for l in (_one, _two)): if (strict_strings is False and set(c.strip(';:,.?=_-\n') for c in _one.split()) == set(c.strip(';:,.?=_-\n') for c in _two.split())): continue else: _one = _one.strip().replace('\n', '') _two = _two.strip().replace('\n', '') print('\n'.join(path) if path else '') print('{:<{width}}{}'.format('', k, width=indent)) print('{:<{width}}one: {}'.format('', _one, width=indent + 4)) print('{:<{width}}two: {}'.format('', _two, width=indent + 4))
def is_seq(s): """Return True if sequence is list or tuple or some facsimile. Reject dictionary views, memoryview, bytearray, array.array etc. """ if isinstance(s, abc.Sequence) and not isinstance(s, (str, bytes)): return True return False
def enlist(*args, ret_type=tuple): """Take a combinations of strings and sequences, consolidate. """ inset = set() for s in args: if isinstance(s, str): inset.add(s) # can be tuple, list, set, etc... if not isinstance(s, str) and isinstance(s, abc.Sequence): inset |= set(s) return tuple(sorted(inset)) if ret_type is tuple else sorted(inset)
def test_Sequence(self): for sample in [tuple, list, bytes, str]: self.assertIsInstance(sample(), Sequence) self.assertTrue(issubclass(sample, Sequence)) self.assertIsInstance(range(10), Sequence) self.assertTrue(issubclass(range, Sequence)) self.assertIsInstance(memoryview(b""), Sequence) self.assertTrue(issubclass(memoryview, Sequence)) self.assertTrue(issubclass(str, Sequence)) self.validate_abstract_methods(Sequence, '__contains__', '__iter__', '__len__', '__getitem__')
def CheckSqliteRowAsSequence(self): """ Checks if the row object can act like a sequence """ self.con.row_factory = sqlite.Row row = self.con.execute("select 1 as a, 2 as b").fetchone() as_tuple = tuple(row) self.assertEqual(list(reversed(row)), list(reversed(as_tuple))) self.assertIsInstance(row, Sequence)
def walk(self, doc, part): """ Walks one step in doc and returns the referenced part """ part = self.get_part(doc, part) assert (type(doc) in (dict, list) or hasattr(doc, '__getitem__')), "invalid document type %s" % (type(doc),) if isinstance(doc, Mapping): try: return doc[part] except KeyError: raise JsonPointerException("member '%s' not found in %s" % (part, doc)) elif isinstance(doc, Sequence): if part == '-': return EndOfList(doc) try: return doc[part] except IndexError: raise JsonPointerException("index '%s' is out of bounds" % (part, )) else: # Object supports __getitem__, assume custom indexing return doc[part]
def to_tensor(X, use_cuda): """Turn to torch Variable. Handles the cases: * Variable * PackedSequence * numpy array * torch Tensor * list or tuple of one of the former * dict of one of the former """ to_tensor_ = partial(to_tensor, use_cuda=use_cuda) if isinstance(X, (Variable, nn.utils.rnn.PackedSequence)): return X if isinstance(X, dict): return {key: to_tensor_(val) for key, val in X.items()} if isinstance(X, (list, tuple)): return [to_tensor_(x) for x in X] if isinstance(X, np.ndarray): X = torch.from_numpy(X) if isinstance(X, Sequence): X = torch.from_numpy(np.array(X)) elif np.isscalar(X): X = torch.from_numpy(np.array([X])) if not is_torch_data_type(X): raise TypeError("Cannot convert this data type to a torch tensor.") if use_cuda: X = X.cuda() return X
def append(self, task_def): self._unsubmitted_tasks.append(task_def) if isinstance(task_def[2], Sequence): self._all_output.extend(task_def[2]) self._all_ids.append(task_def[0])
def has_output(self, output): if not isinstance(output, Sequence) or not self._unsubmitted_tasks: return False return any(x in self._all_output for x in output)
def collect_input(script, input): # determine file extension if input is not None: if isinstance(input, (str, file_target)): ext = os.path.splitext(input)[-1] elif isinstance(input, Sequence) and len(input) > 0: ext = os.path.splitext(input[0])[-1] else: raise ValueError('Unknown input file for action pandoc') else: ext = '.md' input_file = tempfile.NamedTemporaryFile(mode='w+t', suffix=ext, delete=False).name with open(input_file, 'w') as tmp: if script is not None and script.strip(): tmp.write(script.rstrip() + '\n\n') if isinstance(input, str): try: with open(input) as ifile: tmp.write(ifile.read() + '\n\n') except Exception as e: raise ValueError(f'Failed to read input file {input}: {e}') elif isinstance(input, Sequence): for ifile in input: try: with open(ifile) as itmp: tmp.write(itmp.read().rstrip() + '\n\n') except Exception as e: raise ValueError(f'Failed to read input file {ifile}: {e}') return input_file
def match(self, target, step): # for sos_step, we need to match step name if isinstance(target, sos_step): return step.match(target.target_name()) if not 'provides' in step.options: return False patterns = step.options['provides'] if isinstance(patterns, (str, BaseTarget)): patterns = [patterns] elif not isinstance(patterns, Sequence): raise RuntimeError(f'Unknown target to match: {patterns}') # for p in patterns: # other targets has to match exactly if isinstance(target, BaseTarget) or isinstance(p, BaseTarget): if target == p: return {} else: continue # if this is a regular string res = extract_pattern(p, [target]) if res and not any(None in x for x in res.values()): return {x:y[0] for x,y in res.items()} # string match elif file_target(p) == file_target(target): return True return False
def __init__(self, *targets): super(remote, self).__init__() self.__unresolvable_object__ = True if len(targets) == 1: self._target = targets[0] else: # multi-item targets self._target = targets if isinstance(self._target, Sequence) and not isinstance(self._target, str): self.__flattenable__ = True
def _get_shared_dirs(self): value = self.config.get('shared', []) if isinstance(value, str): return [value] elif isinstance(value, Sequence): return value else: raise ValueError('Option shared can only be a string or a list of strings')
def _map_path(self, source): result = {} cwd = os.getcwd() if isinstance(source, (str, path)): dest = os.path.abspath(os.path.expanduser(source)) # we use samefile to avoid problems with case-insensitive file system #522 # we also use the "cwd" name to avoid wrong case for cwd. For example, # if the cwd = '/Users/user/Project' # then, dest = '/USERS/USER/PROJECT/a.txt' # would be converted to '/Users/user/Project/a.txt' before path mapping if os.path.exists(dest[:len(cwd)]) and os.path.samefile(dest[:len(cwd)], cwd): dest = cwd + dest[len(cwd):] matched = [k for k in self.path_map.keys() if os.path.exists(dest[:len(k)]) and os.path.samefile(dest[:len(k)], k)] if matched: # pick the longest key that matches k = max(matched, key=len) dest = self.path_map[k] + dest[len(k):] else: env.logger.warning( f'Path {source} is not under any specified paths of localhost and is mapped to {dest} on remote host.') result[source] = dest.replace('\\', '/') elif isinstance(source, (Sequence, set)): for src in source: result.update(self._map_path(src)) else: env.logger.debug(f'Ignore unmappable source {source}') return {source: source} return result # # Interface functions #
def _map_var(self, source): cwd = os.getcwd() if isinstance(source, str): dest = os.path.abspath(os.path.expanduser(source)) # we use samefile to avoid problems with case-insensitive file system #522 # we also use the "cwd" name to avoid wrong case for cwd. For example, # if the cwd = '/Users/user/Project' # then, dest = '/USERS/USER/PROJECT/a.txt' # would be converted to '/Users/user/Project/a.txt' before path mapping if os.path.exists(dest[:len(cwd)]) and os.path.samefile(dest[:len(cwd)], cwd): dest = cwd + dest[len(cwd):] matched = [k for k in self.path_map.keys() if os.path.exists(dest[:len(k)]) and os.path.samefile(dest[:len(k)], k)] if matched: # pick the longest key that matches k = max(matched, key=len) dest = self.path_map[k] + dest[len(k):] else: env.logger.warning( f'Path {source} is not under any specified paths of localhost and is mapped to {dest} on remote host.') return dest.replace('\\', '/') elif isinstance(source, (Sequence, set)): ret = [self._map_var(x) for x in source] return [x for x in ret if x is not None] else: env.logger.debug(f'Ignore unmappable source {source}') return source
def map(self, work_func: FunctionType, inputs: Sequence) -> Sequence: raise NotImplementedError("{} does not implement map" .format(self.__class__.__name__))
def _get_str_query(self, *args, **kwargs): if kwargs: if len(args) > 0: raise ValueError("Either kwargs or single query parameter " "must be present") query = kwargs elif len(args) == 1: query = args[0] else: raise ValueError("Either kwargs or single query parameter " "must be present") if query is None: query = '' elif isinstance(query, Mapping): quoter = partial(_quote, qs=True) lst = [] for k, v in query.items(): if isinstance(v, str): pass elif type(v) == int: # no subclasses like bool v = str(v) else: raise TypeError("Invalid variable type: mapping value " "should be str or int, got {!r}".format(v)) lst.append( quoter(k, safe='/?:@') + '=' + quoter(v, safe='/?:@')) query = '&'.join(lst) elif isinstance(query, str): query = _quote(query, safe='/?:@', protected=PROTECT_CHARS, qs=True) elif isinstance(query, (bytes, bytearray, memoryview)): raise TypeError("Invalid query type: bytes, bytearray and " "memoryview are forbidden") elif isinstance(query, Sequence): quoter = partial(_quote, qs=True, safe='/?:@') query = '&'.join(quoter(k) + '=' + quoter(v) for k, v in query) else: raise TypeError("Invalid query type: only str, mapping or " "sequence of (str, str) pairs is allowed") return query
def sequence_of_type(_type, mutable, instance, attribute, value): """ Validate that a value is a Sequence containing a specific type. :arg _type: The type of the values inside of the sequence :arg mutable: selects whether a sequence can be mutable or not :mutable: only mutable sequences are allowed :immutable: only immutable sequences are allowed :both: both mutable and immutable sequences are allowed :arg instance: The instance of the attr.s class that is being created :arg attribute: The attribute of the attr.s class that is being set :arg value: The value the attribute is being set to This function will be used with the :meth:`attr.ib` validate parameter and :func:`functools.partial`. Example:: @attr.s class CommodityData: type = attr.ib(validator=partial(enum_validator, CommodityType)) """ if mutable == 'both': msg = 'a Sequence' elif mutable == 'mutable': msg = 'a MutableSequence' elif mutable == 'immutable': msg = 'an Immutable Sequence' else: raise ValueError('sequence_of_type was given an improper argument for mutable') if not isinstance(value, Sequence): raise ValueError('{} is not {}'.format(value, msg)) if isinstance(value, MutableSequence): if mutable == 'immutable': raise ValueError('{} is not {}'.format(value, msg)) else: if mutable == 'mutable': raise ValueError('{} is not {}'.format(value, msg)) for entry in value: if not isinstance(entry, _type): raise ValueError('The Sequence element {} is not a {}'.format(value, _type))
def __register_methods(self): """Register all Pypacker TG methods. """ def xmlrpc_wrap(func, *args, **kwargs): """Register all Pypacker TG methods. """ try: return func(*args, **kwargs) except: exc_type, exc_value, exc_traceback = sys.exc_info() traceback_message = traceback.format_exception(exc_type, exc_value, exc_traceback) raise xmlrpc.Fault(500, traceback_message) def wrap_method(method): """Register all Pypacker TG methods. """ return lambda args, kwargs: xmlrpc_wrap(getattr(self.pypacker, method), *pickle.loads(args.data), **pickle.loads(kwargs.data)) def wrap_attibute(attr): """Register all Pypacker TG attributes. """ return lambda: getattr(self.pypacker, attr) # Get full list of pypacker attrs pypacker_attrs = (fn for fn in dir(self.pypacker) if not fn.startswith("_")) # Register attributes and methods for attr in pypacker_attrs: attr_instance = getattr(self.pypacker, attr) if isinstance(attr_instance, (str, int, Sequence, Mapping)): setattr(self, "xmlrpc_{0}".format(attr), wrap_attibute(attr)) self.class_logger.debug("Registered Pypacker TG attribute %s", attr) elif callable(attr_instance): setattr(self, "xmlrpc_{0}".format(attr), wrap_method(attr)) self.class_logger.debug("Registered Pypacker TG method %s", attr) # Need to wrap stop_sniff separately # because we have to perform additional procedures with sniffed data before sending. self.xmlrpc_stop_sniff = self.stop_sniff # pylint: disable=attribute-defined-outside-init
def __init__(self, filters=''): if not isinstance(self.filterclass, type) or not issubclass(self.filterclass, Filter): raise RuntimeError('Attribute "filterclass" must be set to a Filter class, not {!r}' .format(self.filterclass)) if isinstance(filters, str): # Because str is also instance of abc.Sequence pass elif isinstance(filters, abc.Sequence) and all(isinstance(f, str) for f in filters): filters = '|'.join(filters) elif not isinstance(filters, str): raise TypeError('filters must be string or sequence of strings, not {}: {!r}' .format(type(filters).__name__, filters)) # self._filterchains is a tuple of tuples. Each inner tuple combines # filters with AND. The outer tuple combines the inner, AND-combined # tuples with OR. parts = tuple(part for part in self._op_regex.split(filters) if part is not '') if len(parts) < 1: self._filterchains = () else: if parts[0] in '&|': raise ValueError('Filter can\'t start with operator: {!r}'.format(parts[0])) elif parts[-1] in '&|': raise ValueError('Filter can\'t end with operator: {!r}'.format(parts[-1])) filters = [] ops = [] expect = 'filter' nofilter = self.filterclass() for i,part in enumerate(parts): if expect is 'filter': if part not in '&|': f = self.filterclass(part) if f == nofilter: # part is something like 'all' or '*' - this # disables all other filters filters = [] ops = [] break else: filters.append(f) expect = 'operator' continue elif expect is 'operator': if part in '&|': ops.append(part) expect = 'filter' continue raise ValueError('Consecutive operators: {!r}'.format(''.join(parts[i-2:i+2]))) if filters: fchain = [[]] for filter,op in zip_longest(filters, ops): fchain[-1].append(filter) if op is '|': fchain.append([]) self._filterchains = tuple(tuple(x) for x in fchain) else: self._filterchains = ()
def repository(): @singledispatch def _repogitory(obj): return obj def _register(cls, func=None): if func is None: return lambda f: _register(cls, f) if isinstance(func, type): if issubclass(func, ObjConverter): func = func(cls) if isinstance(func, ObjConverter): func.repogitory = _repogitory func = func.run return _repogitory.org_register(cls, func) _repogitory.org_register = _repogitory.register _repogitory.register = _register def fromSQLAlchemyModel(model, attrs=None, ignores=None): names = [col.name for col in model.__table__.columns] ObjConverter.build(_repogitory, model, names, attrs, ignores) _repogitory.fromSQLAlchemyModel = fromSQLAlchemyModel def fromDjangoModel(model, attrs, ignores): ObjConverter.build(_repogitory, model, _django_get_all_field_names(model), attrs, ignores) _repogitory.fromDjangoModel = fromDjangoModel def raw(obj): return obj _repogitory.register(str, raw) def conv_seq(obj): return tuple(_repogitory(o) for o in obj) _repogitory.register(abc.Sequence, conv_seq) _repogitory.register(abc.Set, conv_seq) @_repogitory.register(abc.Mapping) def conv_mapping(obj): return {_repogitory(k):_repogitory(v) for k, v in obj.items()} def conv_date(obj): return obj.isoformat() _repogitory.register(datetime.date, conv_date) _repogitory.register(datetime.datetime, conv_date) return _repogitory