我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用typing.Iterable()。
def __init__(self, host: str = '127.0.0.1', port: int = 11300, encoding: Optional[str] = 'utf-8', use: str = DEFAULT_TUBE, watch: Union[str, Iterable[str]] = DEFAULT_TUBE) -> None: self._sock = socket.create_connection((host, port)) self._reader = self._sock.makefile('rb') # type: BinaryIO self.encoding = encoding if use != DEFAULT_TUBE: self.use(use) if isinstance(watch, str): if watch != DEFAULT_TUBE: self.watch(watch) self.ignore(DEFAULT_TUBE) else: for tube in watch: self.watch(tube) if DEFAULT_TUBE not in watch: self.ignore(DEFAULT_TUBE)
def avg_linelength_diffs(diffargs): # type: (List[Tuple[str, bytes]]) -> Iterable[int] """Returns the nudged absolute line length differences. """ for filename1, content2 in diffargs: linelen1 = get_num_lines(filename1) filelen1 = len(get_cached_file(filename1)) avg1 = 0.0 if linelen1 > 0: avg1 = float(filelen1) / linelen1 linelen2 = count_content_lines(content2) filelen2 = len(content2) avg2 = 0.0 if linelen2 > 0: avg2 = float(filelen2) / linelen2 yield int(abs(10000.0 * (avg1 - avg2)))
def unified_diff(filename, content2=None): # type: (str, Optional[bytes]) -> Tuple[int, Iterable[str]] """This function prints a unified diff of the contents of filename and the standard input, when used from the command line as follows: echo 123 > d.txt ; echo 456 | ./whatstyle.py --stdindiff d.txt We get this result: --- +++ @@ -1 +1 @@ -123 +456 """ use_stdin = content2 is None if content2 is None: # Read binary input stream stdin = rawstream(sys.stdin) econtent2 = bytestr(stdin.read()) else: econtent2 = content2 exit_code, diff = compute_unified_diff(filename, econtent2, lineterm='') if use_stdin: write('\n'.join(diff)) return exit_code, diff
def compute_unified_diff(filename, content2, **kwargs): # type: (str, bytes, **Any) -> Tuple[int, Iterable[str]] diff = () # type: Iterable[str] exit_code = ERROR kw = kwargs.copy() if 'n' not in kwargs: # zero context lines kw['n'] = 0 try: content1 = get_cached_file(filename) if PY3: c1 = unistr(content1) c2 = unistr(content2) else: c1 = content1 c2 = content2 diff = difflib.unified_diff(c1.splitlines(True), c2.splitlines(True), **kw) exit_code = OK finally: return exit_code, diff # --------------------------------------------------------------------- # Spare the user from specifying a formatter by finding a suitable one.
def misses_to_frame(parsed_lexemes: Iterable, terms: Dict[str, str]=None) -> pd.DataFrame: if not terms: terms = {} miss_dict = collect_misses(parsed_lexemes) misses = [] for miss in miss_dict: low_miss = miss.lower() miss_record = OrderedDict() miss_record['miss'] = low_miss miss_record['term'] = terms.get(low_miss, low_miss) miss_record['lexemes'] = ' '.join(miss_dict[miss]) misses.append(miss_record) miss_frame = pd.DataFrame.from_records( misses, index='miss', columns=['miss', 'term', 'lexemes']) return miss_frame
def _list(cls, user_id: int, is_active: Optional[bool]=None, fields: Optional[Iterable[str]]=None): if fields is None: fields = ( 'access_key', 'secret_key', 'is_active', 'is_admin', ) q = 'query($user_id: Int!, $is_active: Boolean) {' \ ' keypairs(user_id: $user_id, is_active: $is_active) {' \ ' $fields' \ ' }' \ '}' q = q.replace('$fields', ' '.join(fields)) vars = { 'user_id': user_id, 'is_active': is_active, } resp = yield Request('POST', '/admin/graphql', { 'query': q, 'variables': vars, }) data = resp.json() return data['keypairs']
def _get_or_create(cls, lang: str, client_token: Optional[str]=None, mounts: Optional[Iterable[str]]=None, envs: Optional[Mapping[str, str]]=None, max_mem: int=0, exec_timeout: int=0) -> str: if client_token: assert len(client_token) > 8 else: client_token = uuid.uuid4().hex resp = yield Request('POST', '/kernel/create', { 'lang': lang, 'clientSessionToken': client_token, 'config': { 'mounts': mounts, 'envs': envs, }, # 'limits': { # 'maxMem': max_mem, # 'execTimeout': exec_timeout, # }, }) data = resp.json() o = cls(data['kernelId']) # type: ignore o.created = data.get('created', True) # True is for legacy return o
def _extract_positional_label_by_id(self, files: Iterable[Path]) -> Dict[str, Union[PositionalLabel, str]]: xml_ending = ".xml" microphone_endings = [ "_Yamaha", "_Kinect-Beam", "_Kinect-RAW", "_Realtek", "_Samson", "_Microsoft-Kinect-Raw" ] xml_files = [file for file in files if file.name.endswith(xml_ending) if self.id_filter_regex.match(name_without_extension(file))] return OrderedDict( (name_without_extension(file) + microphone_ending, self._extract_label_from_xml(file)) for file in xml_files for microphone_ending in microphone_endings if (Path(file.parent) / (name_without_extension(file) + microphone_ending + ".wav")).exists())
def train(self, labeled_spectrogram_batches: Iterable[List[LabeledSpectrogram]], preview_labeled_spectrogram_batch: List[LabeledSpectrogram], tensor_board_log_directory: Path, net_directory: Path, batches_per_epoch: int): print_preview_batch = lambda: log(self.test_and_predict_batch(preview_labeled_spectrogram_batch)) print_preview_batch() self.loss_net.fit_generator(self._loss_inputs_generator(labeled_spectrogram_batches), epochs=100000000, steps_per_epoch=batches_per_epoch, callbacks=self.create_callbacks( callback=print_preview_batch, tensor_board_log_directory=tensor_board_log_directory, net_directory=net_directory), initial_epoch=self.load_epoch if (self.load_epoch is not None) else 0)
def _best_transform_from(self, source_type: Type[S], target_types: Iterable[Type]) -> Tuple[Callable[[S], Any], Type, int]: best = None best_cost = _MAX_TRANSFORM_COST to_type = None for target_type in target_types: try: transform, cost = self._transform(source_type, target_type) if cost < best_cost: best = transform best_cost = cost to_type = target_type except NoConversionError: pass if best is None: raise NoConversionError("Pipeline can't convert \"{source_type}\" to any of \"{target_types}\"".format(source_type=source_type, target_types=target_types)) return best, to_type, best_cost
def _best_transform_to(self, target_type: Type[T], source_types: Iterable[Type]) -> Tuple[Callable[[T], Any], Type, int]: best = None best_cost = _MAX_TRANSFORM_COST from_type = None for source_type in source_types: try: transform, cost = self._transform(source_type, target_type) if cost < best_cost: best = transform best_cost = cost from_type = source_type except NoConversionError: pass if best is None: raise NoConversionError("Pipeline can't convert from any of \"{source_types}\" to \"{target_type}\"".format(source_types=source_types, target_type=target_type)) return best, from_type, best_cost
def _create_sink_handlers_simultaneously(self, before: Type[T], transform: DataTransformer, after: Type[T], targets: Iterable[DataSink]): before_transform_handlers = set() after_transform_handlers = set() for sink in targets: try: before_transformer, before_to_type, before_cost = self._best_transform_from(before, sink.accepts) except NoConversionError: before_transformer = None try: after_transformer, after_to_type, after_cost = self._best_transform_from(after, sink.accepts) except NoConversionError: after_transformer = None if before_transformer is not None and after_transformer is not None: if before_cost < after_cost: before_transform_handlers.add(_SinkHandler(sink, before_to_type, before_transformer)) else: after_transform_handlers.add(_SinkHandler(sink, after_to_type, after_transformer)) elif before_transformer is not None: before_transform_handlers.add(_SinkHandler(sink, before_to_type, before_transformer)) elif after_transformer is not None: after_transform_handlers.add(_SinkHandler(sink, after_to_type, after_transformer)) return before_transform_handlers, after_transform_handlers
def put_many(self, type: Type[T], items: Iterable[T]) -> None: """Puts multiple objects of the same type into the data sink. The objects may be transformed into a new type for insertion if necessary. Args: items: An iterable (e.g. list) of objects to be inserted into the data pipeline. """ LOGGER.info("Getting SinkHandlers for \"{type}\"".format(type=type.__name__)) try: handlers = self._put_types[type] except KeyError: try: LOGGER.info("Building new SinkHandlers for \"{type}\"".format(type=type.__name__)) handlers = self._put_handlers(type) except NoConversionError: handlers = None self._get_types[type] = handlers LOGGER.info("Creating new PipelineContext") context = self._new_context() LOGGER.info("Sending items \"{items}\" to SourceHandlers".format(items=items)) if handlers is not None: items = list(items) for handler in handlers: handler.put_many(items, context)
def _split_text_to_lines_and_columns( cls, text) -> Iterable[Tuple[int, int, str]]: lines = text.splitlines() if len(lines) > cls._MAX_LINES: raise ValueError( 'Max {} specification lines allowed, got {}' .format(cls._MAX_LINES, len(lines))) for line_number, line_text in enumerate(lines, 1): if len(line_text) > cls._MAX_LINE_LENGTH: raise ValueError( 'Specification lines must be max {} chars long, ' 'got {}: {!r}' .format(cls._MAX_LINE_LENGTH, len(line_text), line_text)) yield (line_number, 1, '{:40}'.format(line_text[0:40])) yield (line_number, 2, '{:40}'.format(line_text[40:80]))
def run(cls, command: str, *args: Iterable[str]): """ Runs the registered Command if it exists. :raises KeyError: when the command does not exist :param command: the registry key of the command to be invoked :param args: the arguments to be passed on to the command """ if cls.has_option(command): try: cls.__registry[command](*args) # except Exception as e: # raise e except TypeError: print(cls.__registry[command].usage_notice()) except KeyboardInterrupt: print("Command aborted.") except Exception as e: print(e) print("Please contact support.") else: raise UnknownCommand(command)
def coerce(cls, index, value): if not isinstance(value, cls): if isinstance(value, Iterable): result = cls() # noinspection PyTypeChecker for i in value: item = cls.__item_type__.coerce(index, i) # item.set_parent(result) result.append(item) return result return super().coerce(index, value) else: return value
def send_sms(recipients: Iterable[str], msg: str, username: str, api_key: str, sender: str): data = { 'messages': [], } # type: Dict[str, List] for recipient in recipients: data['messages'].append({ 'source': 'python', 'from': sender, 'body': msg[:140], 'to': recipient, 'schedule': '' }) try: async with aiohttp.ClientSession(headers={'Content-Type': 'application/json'}, auth=aiohttp.BasicAuth(username, api_key)) as session: async with session.post(CLICKSEND_URL, data=json.dumps(data), timeout=30) as resp: if resp.status != 200: log.msg('Error sending clicksend sms notification: http status %s' % (str(resp.status)), 'NOTIFICATION') except aiohttp.ClientError as e: log.msg('Error sending clicksend sms notification: %s' % (str(e)), 'NOTIFICATIONS')
def _get_alerts(self, q: str, q_args: Iterable[Any]) -> List[Dict[str, Any]]: rows = await self.request.app['dbcon'].fetch_all(q, q_args) ret = [] for id, monitor_id, start_ts, end_ts, alert_msg in rows: alert = { 'id': id, 'monitor_id': monitor_id, 'start_ts': start_ts, 'end_ts': end_ts, 'alert_msg': alert_msg, 'monitor_description': '', } monitor = self.request.app['active_monitor_manager'].monitors.get(monitor_id, None) # type: ActiveMonitor if monitor: alert['monitor_description'] = monitor.get_description() ret.append(alert) return ret
def get(self) -> web.Response: dbcon = self.request.app['dbcon'] if 'id' in self.request.rel_url.query: contact_id = require_int(get_request_param(self.request, 'id')) c = await contact.get_contact(dbcon, contact_id) contact_list = [] # type: Iterable[object_models.Contact] if c: contact_list = [c] metadata_list = await metadata.get_metadata_for_object(dbcon, 'contact', contact_id) elif 'meta_key' in self.request.rel_url.query: meta_key = require_str(get_request_param(self.request, 'meta_key')) meta_value = require_str(get_request_param(self.request, 'meta_value')) contact_list = await contact.get_contacts_for_metadata(dbcon, meta_key, meta_value) metadata_list = await metadata.get_metadata_for_object_metadata( dbcon, meta_key, meta_value, 'contact', 'contacts') else: contact_list = await contact.get_all_contacts(dbcon) metadata_list = await metadata.get_metadata_for_object_type(dbcon, 'contact') return web.json_response(apply_metadata_to_model_list(contact_list, metadata_list))
def get(self) -> web.Response: dbcon = self.request.app['dbcon'] if 'id' in self.request.rel_url.query: monitor_group_id = require_int(get_request_param(self.request, 'id')) monitor_group_item = await monitor_group.get_monitor_group(dbcon, monitor_group_id) monitor_group_list = [] # type: Iterable[object_models.MonitorGroup] if monitor_group_item: monitor_group_list = [monitor_group_item] metadata_list = await metadata.get_metadata_for_object(dbcon, 'monitor_group', monitor_group_id) elif 'meta_key' in self.request.rel_url.query: meta_key = require_str(get_request_param(self.request, 'meta_key')) meta_value = require_str(get_request_param(self.request, 'meta_value')) monitor_group_list = await monitor_group.get_monitor_groups_for_metadata(dbcon, meta_key, meta_value) metadata_list = await metadata.get_metadata_for_object_metadata( dbcon, meta_key, meta_value, 'monitor_group', 'monitor_groups') else: monitor_group_list = await monitor_group.get_all_monitor_groups(dbcon) metadata_list = await metadata.get_metadata_for_object_type(dbcon, 'monitor_group') return web.json_response(apply_metadata_to_model_list(monitor_group_list, metadata_list))
def set_active_monitor_contacts(dbcon: DBConnection, contact_ids: Iterable[int], monitor_id: int): """(Re-)set contacts for an active monitor. Delete existing contacts for an active monitor and set the given new contacts. """ async def _run(cur: Cursor) -> None: q = """delete from active_monitor_contacts where active_monitor_id=%s""" await cur.execute(q, (monitor_id,)) for contact_id in contact_ids: q = """insert into active_monitor_contacts (active_monitor_id, contact_id) values (%s, %s)""" q_args = (monitor_id, contact_id) await cur.execute(q, q_args) if not await active_monitor_exists(dbcon, monitor_id): raise errors.InvalidArguments('monitor does not exist') await dbcon.transact(_run)
def set_active_monitor_contact_groups(dbcon: DBConnection, contact_group_ids: Iterable[int], monitor_id: int) -> None: """(Re-)set contact_groups for an active monitor. Delete existing contact groups for an active monitor and set the given new contact groups. """ async def _run(cur: Cursor) -> None: q = """delete from active_monitor_contact_groups where active_monitor_id=%s""" await cur.execute(q, (monitor_id,)) for contact_group_id in contact_group_ids: q = """insert into active_monitor_contact_groups (active_monitor_id, contact_group_id) values (%s, %s)""" q_args = (monitor_id, contact_group_id) await cur.execute(q, q_args) if not await active_monitor_exists(dbcon, monitor_id): raise errors.InvalidArguments('monitor does not exist') await dbcon.transact(_run)
def set_contact_group_contacts(dbcon: DBConnection, contact_group_id: int, contact_ids: Iterable[int]) -> None: """(Re-)set contacts for a contact group. Delete existing contacts for a contact group and set the given new contacts. """ async def _run(cur: Cursor) -> None: q = """delete from contact_group_contacts where contact_group_id=%s""" await cur.execute(q, (contact_group_id,)) for contact_id in contact_ids: q = """insert into contact_group_contacts (contact_group_id, contact_id) values (%s, %s)""" q_args = (contact_group_id, contact_id) await cur.execute(q, q_args) if not await contact_group_exists(dbcon, contact_group_id): raise errors.InvalidArguments('contact group does not exist') await dbcon.transact(_run)
def delete_metadata(dbcon: DBConnection, object_type: str, object_id: int, keys: Optional[Iterable[str]] = None): """Delete metadata for an object. If keys is given, only delete the specified keys, otherwise delete all metadata for the object. """ async def _run(cur: Cursor) -> None: if keys: # noinspection PyTypeChecker for key in keys: q = """delete from object_metadata where object_type=%s and object_id=%s and `key`=%s""" q_args = (object_type, object_id, key) # type: Tuple await cur.execute(q, q_args) else: q = """delete from object_metadata where object_type=%s and object_id=%s""" q_args = (object_type, object_id) await cur.execute(q, q_args) await dbcon.transact(_run)
def splat(f: Callable[..., A]) -> Callable[[Iterable], A]: """Convert a function taking multiple arguments into a function taking a single iterable argument. Args: f: Any function Returns: A function that accepts a single iterable argument. Each element of this iterable argument is passed as an argument to ``f``. Example: $ def f(a, b, c): $ return a + b + c $ $ f(1, 2, 3) # 6 $ g = splat(f) $ g([1, 2, 3]) # 6 """ def splatted(args): return f(*args) return splatted
def unsplat(f: Callable[[Iterable], A]) -> Callable[..., A]: """Convert a function taking a single iterable argument into a function taking multiple arguments. Args: f: Any function taking a single iterable argument Returns: A function that accepts multiple arguments. Each argument of this function is passed as an element of an iterable to ``f``. Example: $ def f(a): $ return a[0] + a[1] + a[2] $ $ f([1, 2, 3]) # 6 $ g = unsplat(f) $ g(1, 2, 3) # 6 """ def unsplatted(*args): return f(args) return unsplatted
def config_per_platform(config: ConfigType, domain: str) -> Iterable[Tuple[Any, Any]]: """Generator to break a component config into different platforms. For example, will find 'switch', 'switch 2', 'switch 3', .. etc """ for config_key in extract_domain_configs(config, domain): platform_config = config[config_key] if not platform_config: continue elif not isinstance(platform_config, list): platform_config = [platform_config] for item in platform_config: try: platform = item.get(CONF_PLATFORM) except AttributeError: platform = None yield platform, item
def train(self, optimizer, training_set: Iterable[Tuple[QASetting, List[Answer]]], batch_size: int, max_epochs=10, hooks=tuple(), l2=0.0, clip=None, clip_op=tf.clip_by_value, summary_writer=None, **kwargs): """ This method trains the reader (and changes its state). Args: optimizer: TF optimizer training_set: the training instances. batch_size: size of training batches max_epochs: maximum number of epochs hooks: TrainingHook implementations that are called after epochs and batches l2: whether to use l2 regularization clip: whether to apply gradient clipping and at which value clip_op: operation to perform for clipping """ batches, loss, min_op, summaries = self._setup_training( batch_size, clip, optimizer, training_set, summary_writer, l2, clip_op, **kwargs) self._train_loop(min_op, loss, batches, hooks, max_epochs, summaries, summary_writer, **kwargs)
def __init__( self, x: Union[int, float, 'Vec', Iterable[float]]=0.0, y: float=0.0, z: float=0.0, ) -> None: """Create a Vector. All values are converted to Floats automatically. If no value is given, that axis will be set to 0. An iterable can be passed in (as the x argument), which will be used for x, y, and z. """ if isinstance(x, (int, float)): self.x = float(x) self.y = float(y) self.z = float(z) else: it = iter(x) self.x = float(next(it, 0.0)) self.y = float(next(it, y)) self.z = float(next(it, z))
def test_urls(self, urls: t.Iterable[str]): self._proxy.clear_flows() for url in urls: # TODO: Handle both of these failing driver_results = self._driver_controller.get_results( url=url, controller_wait=self._proxy_pending_requests_wait, ) proxy_results = self._proxy.get_results() result = SeproxerUrlResult( url=url, driver_results=driver_results, proxy_results=proxy_results, ) self._result_handler.handle(result)
def multimask_images(images: Iterable[SpatialImage], masks: Sequence[np.ndarray], image_type: type = None ) -> Iterable[Sequence[np.ndarray]]: """Mask images with multiple masks. Parameters ---------- images: Images to mask. masks: Masks to apply. image_type: Type to cast images to. Yields ------ Sequence[np.ndarray] For each mask, a masked image. """ for image in images: yield [mask_image(image, mask, image_type) for mask in masks]
def mask_images(images: Iterable[SpatialImage], mask: np.ndarray, image_type: type = None) -> Iterable[np.ndarray]: """Mask images. Parameters ---------- images: Images to mask. mask: Mask to apply. image_type: Type to cast images to. Yields ------ np.ndarray Masked image. """ for images in multimask_images(images, (mask,), image_type): yield images[0]
def mean(data: Iterable[float]) -> float: 'Accurate arithmetic mean' data = list(data) return fsum(data) / len(data)
def transpose(matrix: Iterable[Iterable]) -> Iterable[tuple]: 'Swap rows with columns for a 2-D array' return zip(*matrix)
def assign_data(centroids: Sequence[Centroid], data: Iterable[Point]) -> Dict[Centroid, Sequence[Point]]: 'Assign data the closest centroid' d : DefaultDict[Point, List[Point]] = defaultdict(list) for point in data: centroid: Point = min(centroids, key=partial(dist, point)) d[centroid].append(point) return dict(d)
def compute_centroids(groups: Iterable[Sequence[Point]]) -> List[Centroid]: 'Compute the centroid of each group' return [tuple(map(mean, transpose(group))) for group in groups]
def k_means(data: Iterable[Point], k:int=2, iterations:int=10) -> List[Point]: 'Return k-centroids for the data' data = list(data) centroids = sample(data, k) for i in range(iterations): labeled = assign_data(centroids, data) centroids = compute_centroids(labeled.values()) return centroids
def id_to_work_id(self, ids: Iterable[str]): id_csv = ','.join(ids) endpoint = 'book/id_to_work_id/{}'.format(id_csv) res = self._transport.req(endpoint=endpoint) return res['work-ids']
def isbn_to_id(self, isbns: Iterable[str]): raise NotImplementedError('API always 404s on this endpoint')
def review_counts(self, isbns: Iterable[str]): # This endpoint 406s on non-json content type endpoint = 'book/review_counts.json' params = { 'isbns': ','.join(set(isbns)), } res = self._transport.req(endpoint=endpoint, params=params, transform='json') return res['books']
def render(posts: Iterable[Post], output_filepath: Optional[str] = None): ''' Puts html from posts into output_filepath (.html) and opens that in a specified browser ''' posts = list(posts) if posts: write_and_open_in_browser(render_to_html(posts), output_filepath) else: print('Nothing found')
def has_links(posts: List[Post], *patterns, **kwargs) -> Iterable[Post]: ''' Returns all posts which have matching links in them. Args: post (List[Post]): Posts to filter *patterns (List[str]): Patterns which links should match not (List[str]) : Patterns which links should not match. E.g., not=['google', 'longecity', 'amazon'] ''' return filter(lambda p: (len(match_links(p, *patterns, **kwargs)) > 0), posts)
def grouper(n, iterable, padvalue=None): # type: (int, Iterable[Any], Any) -> Iterable[Iterable[Any]] """grouper(3, 'abcdefg', 'x') --> ('a','b','c'), ('d','e','f'), ('g','x','x')""" return izip_longest(*[iter(iterable)] * n, fillvalue=padvalue) # ---------------------------------------------------------------------- # Functions to find an executable in the PATH, from: # http://stackoverflow.com/questions/377017/test-if-executable-exists-in-python
def option_make(optionname, # type: AnyStr optiontype, # type: AnyStr configs, # type: Iterable[OptionValue] nestedopts=None # type: Optional[StyleDef] ): # type: (...) -> Tuple[str, str, List[OptionValue], Optional[StyleDef]] configs = [typeconv(c) for c in configs] return unistr(optionname), unistr(optiontype), configs, nestedopts
def styledef_make(options=None): # type: (Union[dict, Iterable[Option], None]) -> StyleDef if isinstance(options, dict): s = styledef_make() for _, option in sorted(options.items()): if isinstance(option, dict): option = styledef_make(option) styledef_add_option(option, s) return s if options is None: options = [] return StyleDef((option_name(o), o) for o in options)
def inclusiverange(start, stop): # type: (int, int) -> Iterable[int] return range(start, stop + 1)
def stylevariants(optionname, values): # type: (str, Iterable[OptionValue]) -> List[Style] return [stylevariant(optionname, v) for v in values]
def variants_for(self, option): # type: (Option) -> List[Style] def kvpairs(vs): # type: (Iterable[OptionValue]) -> List[Style] return stylevariants(stylename, vs) stylename = option_name(option) styletype = option_type(option) configs = option_configs(option) if configs: return kvpairs(configs) if styletype == 'bool': return kvpairs([True, False]) if styletype == 'int': if stylename == 'column_limit': # Here we can get weird results, for example # in bottle_sqlalchemy.py is a constructor with # 8 arguments which are already split between two lines. # We find an optimum column limit of 126 because this # has less diff lines than putting each argument on a new # line. Maybe we should use a different diff metric. return kvpairs(self.column_limit_candidates) elif stylename == 'indent_width': return kvpairs([2, 4, 8]) elif stylename == 'spaces_before_comment': return kvpairs(inclusiverange(1, 4)) elif stylename.startswith('split_penalty'): # We avoid changing large integers whose purpose # is not exactly clear for the moment. pass return [] # ----------------------------------------------------------------------
def variants_for(self, option): # type: (Option) -> List[Style] stylename = option_name(option) styletype = option_type(option) configs = option_configs(option) def kvpairs(vs): # type: (Iterable[OptionValue]) -> List[Style] return stylevariants(stylename, vs) if configs: return kvpairs(configs) if stylename == 'indent': return kvpairs(['yes']) if stylename == 'wrap': return kvpairs([0]) if stylename == 'indent-spaces': return kvpairs(inclusiverange(0, 8)) if styletype == 'AutoBool': return kvpairs(['yes', 'no', 'auto']) if styletype == 'Boolean': return kvpairs(['yes', 'no']) return []
def variants_for(self, option): # type: (Option) -> List[Style] def kvpairs(vs): # type: (Iterable[OptionValue]) -> List[Style] return stylevariants(stylename, vs) def numreplace(configs, numvalues): # type: (List[OptionValue], Iterable[int]) -> List[OptionValue] extconfigs = [] # type: List[OptionValue] for c in configs: if isinstance(c, text_type) and '#' in c: for n in numvalues: num = str(n) nc = c.replace('#', num) extconfigs.append(nc) else: extconfigs.append(c) return extconfigs stylename = option_name(option) configs = option_configs(option) if stylename == self.columnlimitname: candidates = self.column_limit_candidates candidates = [c for c in candidates if 50 <= c <= 200] return kvpairs(candidates) if stylename == 'indent': return kvpairs(numreplace(configs, [2, 4, 8])) if stylename == 'min-conditional-indent': return kvpairs(numreplace(configs, [0, 1, 2, 3])) if stylename == 'max-instatement-indent': return kvpairs(numreplace(configs, inclusiverange(40, 120))) if stylename == 'mode': return [] if configs: return kvpairs(numreplace(configs, [1, 2, 4, 8])) return []