我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用functools.partial()。
def add(self, categorize): """Add given method to categorize messages. When a message is received, each of the added methods (most recently added method first) is called with the message. The method should return a category (any hashable object) or None (in which case next recently added method is called with the same message). If all the methods return None for a given message, the message is queued with category=None, so that 'receive' method here works just as Task.receive. """ if inspect.isfunction(categorize): argspec = inspect.getargspec(categorize) if len(argspec.args) != 1: categorize = None elif type(categorize) != partial_func: categorize = None if categorize: self._categorize.insert(0, categorize) else: logger.warning('invalid categorize function ignored')
def _build_multipart(cls, data): """ Build up the MIME payload for the POST data """ boundary = b'--------------GHSKFJDLGDS7543FJKLFHRE75642756743254' sep_boundary = b'\n--' + boundary end_boundary = sep_boundary + b'--' end_items = end_boundary, b"\n", builder = functools.partial( cls._build_part, sep_boundary=sep_boundary, ) part_groups = map(builder, data.items()) parts = itertools.chain.from_iterable(part_groups) body_items = itertools.chain(parts, end_items) content_type = 'multipart/form-data; boundary=%s' % boundary.decode('ascii') return b''.join(body_items), content_type
def main(): args = parse_args() stream = subunit.ByteStreamToStreamResult( sys.stdin, non_subunit_name='stdout') starts = Starts(sys.stdout) outcomes = testtools.StreamToDict( functools.partial(show_outcome, sys.stdout, print_failures=args.print_failures, failonly=args.failonly )) summary = testtools.StreamSummary() result = testtools.CopyStreamResult([starts, outcomes, summary]) result.startTestRun() try: stream.run(result) finally: result.stopTestRun() if count_tests('status', '.*') == 0: print("The test run didn't actually run any tests") return 1 if args.post_fails: print_fails(sys.stdout) print_summary(sys.stdout) return (0 if summary.wasSuccessful() else 1)
def add_local_charm_dir(self, charm_dir, series): """Upload a local charm to the model. This will automatically generate an archive from the charm dir. :param charm_dir: Path to the charm directory :param series: Charm series """ fh = tempfile.NamedTemporaryFile() CharmArchiveGenerator(charm_dir).make_archive(fh.name) with fh: func = partial( self.add_local_charm, fh, series, os.stat(fh.name).st_size) charm_url = await self._connector.loop.run_in_executor(None, func) log.debug('Uploaded local charm: %s -> %s', charm_dir, charm_url) return charm_url
def __getattr__(self, name): """ Wrap method calls in coroutines that use run_in_executor to make them async. """ attr = getattr(self._cs, name) if not callable(attr): wrapper = partial(getattr, self._cs, name) setattr(self, name, wrapper) else: async def coro(*args, **kwargs): method = partial(attr, *args, **kwargs) for attempt in range(1, 4): try: return await self.loop.run_in_executor(None, method) except theblues.errors.ServerError: if attempt == 3: raise await asyncio.sleep(1, loop=self.loop) setattr(self, name, coro) wrapper = coro return wrapper
def _ireduce_linalg(arrays, func, **kwargs): """ Yield the cumulative reduction of a linag algebra function """ arrays = iter(arrays) first = next(arrays) second = next(arrays) func = partial(func, **kwargs) accumulator = func(first, second) yield accumulator for array in arrays: # For some reason, np.dot(..., out = accumulator) did not produce results # that were equal to numpy.linalg.multi_dot func(accumulator, array, out = accumulator) yield accumulator
def iload(files, load_func, **kwargs): """ Create a stream of arrays from files, which are loaded lazily. Parameters ---------- pattern : iterable of str or str Either an iterable of filenames or a glob-like pattern str. load_func : callable, optional Function taking a filename as its first arguments kwargs Keyword arguments are passed to ``load_func``. Yields ------ arr: `~numpy.ndarray` Loaded data. """ if isinstance(files, str): files = iglob(files) files = iter(files) yield from map(partial(load_func, **kwargs), files) # pmap does not support local functions
def __init__(self, params): super(DecodeText, self).__init__(params) self._unk_mapping = None self._unk_replace_fn = None if self.params["unk_mapping"] is not None: self._unk_mapping = _get_unk_mapping(self.params["unk_mapping"]) if self.params["unk_replace"]: self._unk_replace_fn = functools.partial( _unk_replace, mapping=self._unk_mapping) self._postproc_fn = None if self.params["postproc_fn"]: self._postproc_fn = locate(self.params["postproc_fn"]) if self._postproc_fn is None: raise ValueError("postproc_fn not found: {}".format( self.params["postproc_fn"]))
def loads_with_persistent_ids(str, env): """ Performs a pickle loads on the given string, substituting the given TradingEnvironment in to any tokenized representations of a TradingEnvironment or AssetFinder. Parameters ---------- str : String The string representation of the object to be unpickled. env : TradingEnvironment The TradingEnvironment to be inserted to the unpickled object. Returns ------- obj An unpickled object formed from the parameter 'str'. """ file = BytesIO(str) unpickler = pickle.Unpickler(file) unpickler.persistent_load = partial(_persistent_load, env=env) return unpickler.load()
def display_graph(g, format='svg', include_asset_exists=False): """ Display a TermGraph interactively from within IPython. """ try: import IPython.display as display except ImportError: raise NoIPython("IPython is not installed. Can't display graph.") if format == 'svg': display_cls = display.SVG elif format in ("jpeg", "png"): display_cls = partial(display.Image, format=format, embed=True) out = BytesIO() _render(g, out, format, include_asset_exists=include_asset_exists) return display_cls(data=out.getvalue())
def __init__(self, min_, max_, float_=False): super(QwordSpinBox, self).__init__() self._minimum = min_ self._maximum = max_ self.int_ = float if float_ else int rx = QRegExp('-?\d{0,20}(?:\.\d{0,20})?' if float_ else '-?\d{0,20}') validator = QRegExpValidator(rx, self) self._lineEdit = QLineEdit(self) self._lineEdit.setText(str(self.int_(0))) self._lineEdit.setValidator(validator) self._lineEdit.textEdited.connect(partial(self.setValue, change=False)) self.editingFinished.connect(lambda: self.setValue(self.value(), update=False) or True) self.setLineEdit(self._lineEdit)
def handle_event(self, event): if event.get('type') == EVENT_TYPE_MESSAGE: if self.message_pattern: match = self.message_pattern.match(event.get('text', '')) if match: kwargs = match.groupdict() args = () if kwargs else match.groups() args = (event, event.get('text')) + args else: return else: args = (event, event.get('text')) kwargs = {} context_data = {} handle_result = partial(self.handle_result, event) self.container.spawn_worker( self, args, kwargs, context_data=context_data, handle_result=handle_result)
def gl_init(self): self.gl_vertex_shader_factory = functools.lru_cache(maxsize=None)(functools.partial(gl.Shader,GL_VERTEX_SHADER)) self.gl_fragment_shader_factory = functools.lru_cache(maxsize=None)(functools.partial(gl.Shader,GL_FRAGMENT_SHADER)) self.gl_program_factory = functools.lru_cache(maxsize=None)(GLProgram) self.gl_texture_factory = functools.lru_cache(maxsize=None)(gx.texture.GLTexture) array_table = {gx.VA_PTNMTXIDX:GLMatrixIndexArray()} array_table.update((attribute,array.gl_convert()) for attribute,array in self.array_table.items()) for shape in self.shapes: shape.gl_init(array_table) for material in self.materials: material.gl_init() for texture in self.textures: texture.gl_init(self.gl_texture_factory) self.gl_joints = [copy.copy(joint) for joint in self.joints] self.gl_joint_matrices = numpy.empty((len(self.joints),3,4),numpy.float32) self.gl_matrix_table = gl.TextureBuffer(GL_DYNAMIC_DRAW,GL_RGBA32F,(len(self.matrix_descriptors),3,4),numpy.float32) self.gl_update_matrix_table() self.gl_draw_objects = list(self.gl_generate_draw_objects(self.scene_graph)) self.gl_draw_objects.sort(key=lambda draw_object: draw_object.material.unknown0)
def __getattr__(self, name): """\ This is used to plug-in external serializers. When a "to_<name>" method is invoked, this method tries to find a ``segno.plugin.converter`` plugin with the provided ``<name>``. If such a plugin exists, a callable function is returned. The result of invoking the function depends on the plugin. """ if name.startswith('to_'): from pkg_resources import iter_entry_points from functools import partial for ep in iter_entry_points(group='segno.plugin.converter', name=name[3:]): plugin = ep.load() return partial(plugin, self) raise AttributeError('{0} object has no attribute {1}' .format(self.__class__, name))
def files_exist(self, file_paths): """ Threaded exists for all file paths. file_paths: (list) file paths to test for existence Returns: { filepath: bool } """ results = {} def exist_thunk(path, interface): results[path] = interface.exists(path) for path in file_paths: if len(self._threads): self.put(partial(exist_thunk, path)) else: exist_thunk(path, self._interface) desc = 'Existence Testing' if self.progress else None self.wait(desc) return results
def list_files(self, prefix="", flat=False): """ List the files in the layer with the given prefix. flat means only generate one level of a directory, while non-flat means generate all file paths with that prefix. Here's how flat=True handles different senarios: 1. partial directory name prefix = 'bigarr' - lists the '' directory and filters on key 'bigarr' 2. full directory name prefix = 'bigarray' - Same as (1), but using key 'bigarray' 3. full directory name + "/" prefix = 'bigarray/' - Lists the 'bigarray' directory 4. partial file name prefix = 'bigarray/chunk_' - Lists the 'bigarray/' directory and filters on 'chunk_' Return: generated sequence of file paths relative to layer_path """ for f in self._interface.list_files(prefix, flat): yield f
def wait_for_opacity(self, selector, opacity, **kwargs): ''' Wait for an element to reach a specific opacity. Parameters ---------- selector: str A CSS selector to search for. This can be any valid CSS selector. opacity: float The opacity to wait for. kwargs: Passed on to _wait_for ''' def _wait_for_opacity(self, browser): return str(self.get_element(selector).value_of_css_property('opacity')) == str(opacity) self._wait_for(partial(_wait_for_opacity, self), **kwargs)
def decompile(self, data, ttFont): sstruct.unpack2(Glat_format_0, data, self) if self.version <= 1.9: decoder = partial(self.decompileAttributes12,fmt=Glat_format_1_entry) elif self.version <= 2.9: decoder = partial(self.decompileAttributes12,fmt=Glat_format_23_entry) elif self.version >= 3.0: (data, self.scheme) = grUtils.decompress(data) sstruct.unpack2(Glat_format_3, data, self) self.hasOctaboxes = (self.compression & 1) == 1 decoder = self.decompileAttributes3 gloc = ttFont['Gloc'] self.attributes = {} count = 0 for s,e in zip(gloc,gloc[1:]): self.attributes[ttFont.getGlyphName(count)] = decoder(data[s:e]) count += 1
def compile(self, ttFont): data = sstruct.pack(Glat_format_0, self) if self.version <= 1.9: encoder = partial(self.compileAttributes12, fmt=Glat_format_1_entry) elif self.version <= 2.9: encoder = partial(self.compileAttributes12, fmt=Glat_format_1_entry) elif self.version >= 3.0: self.compression = (self.scheme << 27) + (1 if self.hasOctaboxes else 0) data = sstruct.pack(Glat_format_3, self) encoder = self.compileAttributes3 glocs = [] for n in range(len(self.attributes)): glocs.append(len(data)) data += encoder(self.attributes[ttFont.getGlyphName(n)]) glocs.append(len(data)) ttFont['Gloc'].set(glocs) if self.version >= 3.0: data = grUtils.compress(self.scheme, data) return data
def do_alto_post(self, endpoint, data, callback): """ALTO post to the given endpoint with given data""" # Make HTTP POST to ALTO url = self._alto_url + endpoint try: alto_resp_future = self._loop.run_in_executor(None, functools.partial( requests.post, url, json=data)) alto_resp = yield from alto_resp_future except OSError as exc: logging.info('Consumed OSError while connecting to ALTO server') return # Process peers ranked_peers = self._process_alto_response(alto_resp) # Return results to swarm callback(ranked_peers)
def doctable(ctx): df = pd.read_csv('./docs/flight-options.csv') # open an existing document doc = docx.Document('./docs/style-reference.docx') as_int = partial(format_decimal, format='#') as_usd = partial(format_currency, currency='USD') s = doc.sections[0] width = s.page_width - s.left_margin - s.right_margin doc.add_picture('./docs/diagrams_002.png', width=width) formatters = { 'ticket_price': as_usd, 'total_hours': as_int, 'trip': as_int, 'airline': partial(shorten_long_name, width=20), 'selected': compose({0: 'No', 1: 'Yes'}.get, int) } add_table(df, doc, table_style='Plain Table 3', formatters=formatters) # save the doc doc.save('./docs/test.docx')
def fix_tickets( self, ticket_frame: pd.DataFrame, path_fixes) -> pd.DataFrame: ticket_frame.rename( columns={'Total changed lines': 'ChangedLines'}, inplace=True) ticket_frame = ticket_frame[ ticket_frame.ChangedLines < 100000] ticket_frame = ticket_frame.assign( ChangedFiles=ticket_frame['Changed files'].apply( partial(self.fix_path_prefixes, path_fixes))) fixed_frame = ticket_frame.drop( 'Changed files', axis=1).sort_values( by='CommitDate').reset_index(drop=True) fixed_frame.fillna(value={'Found': ''}, axis=0, inplace=True) return fixed_frame # prj1 specific methods
def getter(self, proxy_into = None, no_idmap = False): schema = self.schema proxy_class = self.proxy_class index = self.index idmap = self.idmap if not no_idmap else None buf = self.buf if proxy_class is not None: proxy_class_new = functools.partial(proxy_class.__new__, proxy_class) else: proxy_class_new = None @cython.locals(pos=int) def getter(pos): return schema.unpack_from(buf, index[pos], idmap, proxy_class_new, proxy_into) return getter
def iter_fast(self): # getter inlined schema = self.schema proxy_class = self.proxy_class index = self.index idmap = self.idmap buf = self.buf if proxy_class is not None: proxy_class_new = functools.partial(proxy_class.__new__, proxy_class) else: proxy_class_new = None proxy_into = schema.Proxy() for i in xrange(len(self)): yield schema.unpack_from(buf, index[i], idmap, proxy_class_new, proxy_into)
def ext_pillar(minion_id, pillar, *args, **kwargs): import salt.utils stack = {} stack_config_files = list(args) traverse = { 'pillar': partial(salt.utils.traverse_dict_and_list, pillar), 'grains': partial(salt.utils.traverse_dict_and_list, __grains__), 'opts': partial(salt.utils.traverse_dict_and_list, __opts__), } for matcher, matchs in kwargs.iteritems(): t, matcher = matcher.split(':', 1) if t not in traverse: raise Exception('Unknown traverse option "{0}", ' 'should be one of {1}'.format(t, traverse.keys())) cfgs = matchs.get(traverse[t](matcher, None), []) if not isinstance(cfgs, list): cfgs = [cfgs] stack_config_files += cfgs for cfg in stack_config_files: if not os.path.isfile(cfg): log.warning('Ignoring pillar stack cfg "{0}": ' 'file does not exist'.format(cfg)) continue stack = _process_stack_cfg(cfg, stack, minion_id, pillar) return stack
def test_single_connection(self): """ Test a single connection with sequential requests. """ conn = self.get_connection() query = "SELECT keyspace_name FROM system.schema_keyspaces LIMIT 1" event = Event() def cb(count, *args, **kwargs): count += 1 if count >= 10: conn.close() event.set() else: conn.send_msg( QueryMessage(query=query, consistency_level=ConsistencyLevel.ONE), request_id=0, cb=partial(cb, count)) conn.send_msg( QueryMessage(query=query, consistency_level=ConsistencyLevel.ONE), request_id=0, cb=partial(cb, 0)) event.wait()
def test_single_connection_pipelined_requests(self): """ Test a single connection with pipelined requests. """ conn = self.get_connection() query = "SELECT keyspace_name FROM system.schema_keyspaces LIMIT 1" responses = [False] * 100 event = Event() def cb(response_list, request_num, *args, **kwargs): response_list[request_num] = True if all(response_list): conn.close() event.set() for i in range(100): conn.send_msg( QueryMessage(query=query, consistency_level=ConsistencyLevel.ONE), request_id=i, cb=partial(cb, responses, i)) event.wait()
def _set_final_result(self, response): self._cancel_timer() if self._metrics is not None: self._metrics.request_timer.addValue(time.time() - self._start_time) with self._callback_lock: self._final_result = response # save off current callbacks inside lock for execution outside it # -- prevents case where _final_result is set, then a callback is # added and executed on the spot, then executed again as a # registered callback to_call = tuple( partial(fn, response, *args, **kwargs) for (fn, args, kwargs) in self._callbacks ) self._event.set() # apply each callback for callback_partial in to_call: callback_partial()
def test_create_action_plan(self): _, goal = self.client.show_goal("dummy") _, audit_template = self.create_audit_template(goal['uuid']) _, audit = self.create_audit(audit_template['uuid']) self.assertTrue(test_utils.call_until_true( func=functools.partial(self.has_audit_finished, audit['uuid']), duration=30, sleep_for=.5 )) _, action_plans = self.client.list_action_plans( audit_uuid=audit['uuid']) action_plan = action_plans['action_plans'][0] _, action_plan = self.client.show_action_plan(action_plan['uuid']) self.assertEqual(audit['uuid'], action_plan['audit_uuid']) self.assertEqual('RECOMMENDED', action_plan['state'])
def test_delete_action_plan(self): _, goal = self.client.show_goal("dummy") _, audit_template = self.create_audit_template(goal['uuid']) _, audit = self.create_audit(audit_template['uuid']) self.assertTrue(test_utils.call_until_true( func=functools.partial(self.has_audit_finished, audit['uuid']), duration=30, sleep_for=.5 )) _, action_plans = self.client.list_action_plans( audit_uuid=audit['uuid']) action_plan = action_plans['action_plans'][0] _, action_plan = self.client.show_action_plan(action_plan['uuid']) self.client.delete_action_plan(action_plan['uuid']) self.assertRaises(exceptions.NotFound, self.client.show_action_plan, action_plan['uuid'])
def create_action_plan(cls, audit_template_uuid, **audit_kwargs): """Wrapper utility for creating a test action plan :param audit_template_uuid: Audit template UUID to use :param audit_kwargs: Dict of audit properties to set :return: The action plan as dict """ _, audit = cls.create_audit(audit_template_uuid, **audit_kwargs) audit_uuid = audit['uuid'] assert test_utils.call_until_true( func=functools.partial(cls.has_audit_finished, audit_uuid), duration=30, sleep_for=.5 ) _, action_plans = cls.client.list_action_plans(audit_uuid=audit_uuid) if len(action_plans['action_plans']) == 0: return return action_plans['action_plans'][0]
def _augment_module_post(net: nn.Module, callback_dict: dict) -> (dict, list): backward_hook_remove_func_list = [] vis_param_dict = dict() vis_param_dict['layer'] = None vis_param_dict['index'] = None vis_param_dict['method'] = GradType.NAIVE for x, y in net.named_modules(): if not isinstance(y, nn.Sequential) and y is not net: # I should add hook to all layers, in case they will be needed. backward_hook_remove_func_list.append( y.register_backward_hook( partial(_backward_hook, module_name=x, callback_dict=callback_dict, vis_param_dict=vis_param_dict))) def remove_handles(): for x in backward_hook_remove_func_list: x.remove() return vis_param_dict, remove_handles
def test_cascade(self): # Register 2 functions and make sure the last registered # function is executed first. ret = pyrun(textwrap.dedent( """ import functools, os, imp mod = imp.load_source("mod", r"{}") def foo(s): with open(r"{}", "ab") as f: f.write(s) mod.register_exit_fun(functools.partial(foo, b'1')) mod.register_exit_fun(functools.partial(foo, b'2')) """.format(os.path.abspath(__file__), TESTFN) )) self.assertEqual(ret, 0) with open(TESTFN, "rb") as f: self.assertEqual(f.read(), b"21")
def __build_buttons(self): self.__reset_button = tkinter.Button(self) self.__reset_button['text'] = 'Reset' self.__reset_button['command'] = self.__reset self.__reset_button.grid(column=0, row=0, columnspan=10, sticky=tkinter.EW) self.__buttons = [] for y in range(self.__height): row = [] for x in range(self.__width): button = tkinter.Button(self) button.grid(column=x, row=y+1) button['text'] = '?' command = functools.partial(self.__push, x, y) button['command'] = command row.append(button) self.__buttons.append(row)
def __build_buttons(self): self.__reset_button = tkinter.Button(self) self.__reset_button['text'] = 'Reset' self.__reset_button['command'] = self.__reset self.__reset_button.grid(column=0, row=1, columnspan=10, sticky=tkinter.EW) self.__buttons = [] for y in range(self.__height): row = [] for x in range(self.__width): button = tkinter.Button(self, width=2, height=1, text='?') button.grid(column=x, row=y+2) command = functools.partial(self.__push, x, y) button['command'] = command row.append(button) self.__buttons.append(row)
def data_parallel(f, input, params, stats, mode, device_ids, output_device=None): if output_device is None: output_device = device_ids[0] if len(device_ids) == 1: return f(input, params, stats, mode) def replicate(param_dict, g): replicas = [{} for d in device_ids] for k,v in param_dict.items(): for i,u in enumerate(g(v)): replicas[i][k] = u return replicas params_replicas = replicate(params, lambda x: Broadcast(device_ids)(x)) stats_replicas = replicate(stats, lambda x: comm.broadcast(x, device_ids)) replicas = [partial(f, params=p, stats=s, mode=mode) for p,s in zip(params_replicas, stats_replicas)] inputs = scatter([input], device_ids) outputs = parallel_apply(replicas, inputs) return gather(outputs, output_device)
def __init__(self, *args, **kwargs): super(GirderSession, self).__init__(*args, **kwargs) self.wait_for_success = functools.partial( self.wait_for, predicate=lambda j: j['status'] == JobStatus.SUCCESS, on_timeout=lambda j: 'Timed out waiting for job/%s to move into success state' % j['_id']) self.wait_for_error = functools.partial( self.wait_for, predicate=lambda j: j['status'] == JobStatus.ERROR, on_timeout=lambda j: 'Timed out waiting for job/%s to move into error state' % j['_id']) self.wait_for_canceled = functools.partial( self.wait_for, predicate=lambda j: j['status'] == JobStatus.CANCELED, on_timeout=lambda j: 'Timed out waiting for job/%s to move into canceled state' % j['_id'])
def executemany(self, query, args): """Must be used with 'yield' as 'n = yield cursor.executemany(stmt)'. """ yield self._sem.acquire() self._thread_pool.async_task(self._exec_task, partial_func(self._cursor.executemany, query, args))
def callproc(self, proc, args=()): """Must be used with 'yield' as 'yield cursor.callproc(proc)'. """ yield self._sem.acquire() self._thread_pool.async_task(self._exec_task, partial_func(self._cursor.callproc, proc, args))
def execute(self, query, args=None): """Must be used with 'yield' as 'n = yield cursor.execute(stmt)'. """ yield self._sem.acquire() self._thread_pool.async_task(self._exec_task, partial_func(self._cursor.execute, query, args))
def lazycache(filename, module_globals): """Seed the cache for filename with module_globals. The module loader will be asked for the source only when getlines is called, not immediately. If there is an entry in the cache already, it is not altered. :return: True if a lazy load is registered in the cache, otherwise False. To register such a load a module loader with a get_source method must be found, the filename must be a cachable filename, and the filename must not be already cached. """ if filename in cache: if len(cache[filename]) == 1: return True else: return False if not filename or (filename.startswith('<') and filename.endswith('>')): return False # Try for a __loader__, if available if module_globals and '__loader__' in module_globals: name = module_globals.get('__name__') loader = module_globals['__loader__'] get_source = getattr(loader, 'get_source', None) if name and get_source: get_lines = functools.partial(get_source, name) cache[filename] = (get_lines,) return True return False
def parsers(self): """Metadata item name to parser function mapping.""" parse_list = self._parse_list parse_list_semicolon = partial(self._parse_list, separator=';') parse_bool = self._parse_bool parse_dict = self._parse_dict return { 'zip_safe': parse_bool, 'use_2to3': parse_bool, 'include_package_data': parse_bool, 'package_dir': parse_dict, 'use_2to3_fixers': parse_list, 'use_2to3_exclude_fixers': parse_list, 'convert_2to3_doctests': parse_list, 'scripts': parse_list, 'eager_resources': parse_list, 'dependency_links': parse_list, 'namespace_packages': parse_list, 'install_requires': parse_list_semicolon, 'setup_requires': parse_list_semicolon, 'tests_require': parse_list_semicolon, 'packages': self._parse_packages, 'entry_points': self._parse_file, 'py_modules': parse_list, }
def parse_section_extras_require(self, section_options): """Parses `extras_require` configuration file section. :param dict section_options: """ parse_list = partial(self._parse_list, separator=';') self['extras_require'] = self._parse_section_to_dict( section_options, parse_list)