我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用six.iteritems()。
def sync_helpers(include, src, dest, options=None): if not os.path.isdir(dest): os.makedirs(dest) global_options = parse_sync_options(options) for inc in include: if isinstance(inc, str): inc, opts = extract_options(inc, global_options) sync(src, dest, inc, opts) elif isinstance(inc, dict): # could also do nested dicts here. for k, v in six.iteritems(inc): if isinstance(v, list): for m in v: inc, opts = extract_options(m, global_options) sync(src, dest, '%s.%s' % (k, inc), opts)
def serialize_safe(cls, themap, protocol_version): key_type, value_type = cls.subtypes pack = int32_pack if protocol_version >= 3 else uint16_pack buf = io.BytesIO() buf.write(pack(len(themap))) try: items = six.iteritems(themap) except AttributeError: raise TypeError("Got a non-map object for a map value") inner_proto = max(3, protocol_version) for key, val in items: keybytes = key_type.to_binary(key, inner_proto) valbytes = value_type.to_binary(val, inner_proto) buf.write(pack(len(keybytes))) buf.write(keybytes) buf.write(pack(len(valbytes))) buf.write(valbytes) return buf.getvalue()
def get_hacluster_config(exclude_keys=None): ''' Obtains all relevant configuration from charm configuration required for initiating a relation to hacluster: ha-bindiface, ha-mcastport, vip param: exclude_keys: list of setting key(s) to be excluded. returns: dict: A dict containing settings keyed by setting name. raises: HAIncompleteConfig if settings are missing. ''' settings = ['ha-bindiface', 'ha-mcastport', 'vip'] conf = {} for setting in settings: if exclude_keys and setting in exclude_keys: continue conf[setting] = config_get(setting) missing = [] [missing.append(s) for s, v in six.iteritems(conf) if v is None] if missing: log('Insufficient config data to configure hacluster.', level=ERROR) raise HAIncompleteConfig return conf
def ensure_loopback_device(path, size): ''' Ensure a loopback device exists for a given backing file path and size. If it a loopback device is not mapped to file, a new one will be created. TODO: Confirm size of found loopback device. :returns: str: Full path to the ensured loopback device (eg, /dev/loop0) ''' for d, f in six.iteritems(loopback_devices()): if f == path: return d if not os.path.exists(path): cmd = ['truncate', '--size', size, path] check_call(cmd) return create_loopback(path)
def context_complete(self, ctxt): """Check for missing data for the required context data. Set self.missing_data if it exists and return False. Set self.complete if no missing data and return True. """ # Fresh start self.complete = False self.missing_data = [] for k, v in six.iteritems(ctxt): if v is None or v == '': if k not in self.missing_data: self.missing_data.append(k) if self.missing_data: self.complete = False log('Missing required data: %s' % ' '.join(self.missing_data), level=INFO) else: self.complete = True return self.complete
def __call__(self): ports = config('data-port') if ports: # Map of {port/mac:bridge} portmap = parse_data_port_mappings(ports) ports = portmap.keys() # Resolve provided ports or mac addresses and filter out those # already attached to a bridge. resolved = self.resolve_ports(ports) # FIXME: is this necessary? normalized = {get_nic_hwaddr(port): port for port in resolved if port not in ports} normalized.update({port: port for port in resolved if port in ports}) if resolved: return {normalized[port]: bridge for port, bridge in six.iteritems(portmap) if port in normalized.keys()} return None
def get_os_version_package(pkg, fatal=True): '''Derive OpenStack version number from an installed package.''' codename = get_os_codename_package(pkg, fatal=fatal) if not codename: return None if 'swift' in pkg: vers_map = SWIFT_CODENAMES for cname, version in six.iteritems(vers_map): if cname == codename: return version[-1] else: vers_map = OPENSTACK_CODENAMES for version, cname in six.iteritems(vers_map): if cname == codename: return version # e = "Could not determine OpenStack version for package: %s" % pkg # error_out(e)
def save_script_rc(script_path="scripts/scriptrc", **env_vars): """ Write an rc file in the charm-delivered directory containing exported environment variables provided by env_vars. Any charm scripts run outside the juju hook environment can source this scriptrc to obtain updated config information necessary to perform health checks or service changes. """ juju_rc_path = "%s/%s" % (charm_dir(), script_path) if not os.path.exists(os.path.dirname(juju_rc_path)): os.mkdir(os.path.dirname(juju_rc_path)) with open(juju_rc_path, 'wb') as rc_script: rc_script.write( "#!/bin/bash\n") [rc_script.write('export %s=%s\n' % (u, p)) for u, p in six.iteritems(env_vars) if u != "script_path"]
def _validate_dict_data(self, expected, actual): """Validate dictionary data. Compare expected dictionary data vs actual dictionary data. The values in the 'expected' dictionary can be strings, bools, ints, longs, or can be a function that evaluates a variable and returns a bool. """ self.log.debug('actual: {}'.format(repr(actual))) self.log.debug('expected: {}'.format(repr(expected))) for k, v in six.iteritems(expected): if k in actual: if (isinstance(v, six.string_types) or isinstance(v, bool) or isinstance(v, six.integer_types)): # handle explicit values if v != actual[k]: return "{}:{}".format(k, actual[k]) # handle function pointers, such as not_null or valid_ip elif not v(actual[k]): return "{}:{}".format(k, actual[k]) else: return "key '{}' does not exist".format(k) return None
def run_action(self, unit_sentry, action, _check_output=subprocess.check_output, params=None): """Run the named action on a given unit sentry. params a dict of parameters to use _check_output parameter is used for dependency injection. @return action_id. """ unit_id = unit_sentry.info["unit_name"] command = ["juju", "action", "do", "--format=json", unit_id, action] if params is not None: for key, value in params.iteritems(): command.append("{}={}".format(key, value)) self.log.info("Running command: %s\n" % " ".join(command)) output = _check_output(command, universal_newlines=True) data = json.loads(output) action_id = data[u'Action queued with id'] return action_id
def _get_top_data(topfile): ''' Helper method to retrieve and parse the nova topfile ''' topfile = os.path.join(_hubble_dir()[1], topfile) try: with open(topfile) as handle: topdata = yaml.safe_load(handle) except Exception as e: raise CommandExecutionError('Could not load topfile: {0}'.format(e)) if not isinstance(topdata, dict) or 'nova' not in topdata or \ not(isinstance(topdata['nova'], dict)): raise CommandExecutionError('Nova topfile not formatted correctly') topdata = topdata['nova'] ret = [] for match, data in topdata.iteritems(): if __salt__['match.compound'](match): ret.extend(data) return ret
def __init__(self, start_from, **kwargs): self.flags = Flags() if start_from == 0 or start_from == 0xFFFFFFFF: self.timestamp = start_from else: try: datetime.fromtimestamp(start_from) except TypeError as exc: raise_from(InvalidTimestampError('Timestamp invalid (0, 0xFFFFFFFF, or Unix Timestamp'), exc) else: self.timestamp = start_from for k,v in iteritems(kwargs): try: getattr(self.flags.flag, k) setattr(self.flags.flag, k, int(v)) except AttributeError as exc: raise_from(InvalidFlagError('Invalid flag: {}'.format(k)), exc) # save the timestamp and flags for reuse (if needed) Struct.set_ts(self.timestamp) Struct.set_flags(self.flags.from_bytes) # build the request self.event_request = EventRequest(timestamp=self.timestamp,flags=self.flags.from_bytes) self.message_header = MessageHeader(type=2, data=self.event_request.pack()) self.record = self.message_header.pack()
def _params_to_urlencoded(params): """ Returns a application/x-www-form-urlencoded ``str`` representing the key/value pairs in ``params``. Keys are values are ``str()``'d before calling ``urllib.urlencode``, with the exception of unicode objects which are utf8-encoded. """ def encode(o): if isinstance(o, six.binary_type): return o else: if isinstance(o, six.text_type): return o.encode('utf-8') else: return str(o).encode('utf-8') utf8_params = {encode(k): encode(v) for k, v in six.iteritems(params)} return url_encode(utf8_params)
def save_changes(self): new_projects = [] removed_projects = [] for project_id, project in six.iteritems(self.projects_by_id): if project_id in self.cleaned_data['projects']: if enable_plugin_for_tenant(project, self.tenant): new_projects.append(project) else: if disable_plugin_for_tenant(project, self.tenant): removed_projects.append(project) if new_projects or removed_projects: with Context.for_tenant(self.tenant) as ctx: ctx.send_notification( **make_subscription_update_notification(new_projects, removed_projects) ) if removed_projects: mentions.clear_project_mentions(self.tenant, removed_projects) ctx.push_recent_events_glance()
def service(action, service_name, **kwargs): """Control a system service. :param action: the action to take on the service :param service_name: the name of the service to perform th action on :param **kwargs: additional params to be passed to the service command in the form of key=value. """ if init_is_systemd(): cmd = ['systemctl', action, service_name] else: cmd = ['service', service_name, action] for key, value in six.iteritems(kwargs): parameter = '%s=%s' % (key, value) cmd.append(parameter) return subprocess.call(cmd) == 0
def get_os_version_package(pkg, fatal=True): '''Derive OpenStack version number from an installed package.''' codename = get_os_codename_package(pkg, fatal=fatal) if not codename: return None if 'swift' in pkg: vers_map = SWIFT_CODENAMES for cname, version in six.iteritems(vers_map): if cname == codename: return version[-1] else: vers_map = OPENSTACK_CODENAMES for version, cname in six.iteritems(vers_map): if cname == codename: return version # e = "Could not determine OpenStack version for package: %s" % pkg # error_out(e) # Module local cache variable for the os_release.
def save_script_rc(script_path="scripts/scriptrc", **env_vars): """ Write an rc file in the charm-delivered directory containing exported environment variables provided by env_vars. Any charm scripts run outside the juju hook environment can source this scriptrc to obtain updated config information necessary to perform health checks or service changes. """ juju_rc_path = "%s/%s" % (charm_dir(), script_path) if not os.path.exists(os.path.dirname(juju_rc_path)): os.mkdir(os.path.dirname(juju_rc_path)) with open(juju_rc_path, 'wt') as rc_script: rc_script.write( "#!/bin/bash\n") [rc_script.write('export %s=%s\n' % (u, p)) for u, p in six.iteritems(env_vars) if u != "script_path"]
def validate_svc_catalog_endpoint_data(self, expected, actual): """Validate service catalog endpoint data. Validate a list of actual service catalog endpoints vs a list of expected service catalog endpoints. """ self.log.debug('Validating service catalog endpoint data...') self.log.debug('actual: {}'.format(repr(actual))) for k, v in six.iteritems(expected): if k in actual: ret = self._validate_dict_data(expected[k][0], actual[k][0]) if ret: return self.endpoint_error(k, ret) else: return "endpoint {} does not exist".format(k) return ret
def get_unit_process_ids(self, unit_processes, expect_success=True): """Construct a dict containing unit sentries, process names, and process IDs. :param unit_processes: A dictionary of Amulet sentry instance to list of process names. :param expect_success: if False expect the processes to not be running, raise if they are. :returns: Dictionary of Amulet sentry instance to dictionary of process names to PIDs. """ pid_dict = {} for sentry_unit, process_list in six.iteritems(unit_processes): pid_dict[sentry_unit] = {} for process in process_list: pids = self.get_process_id_list( sentry_unit, process, expect_success=expect_success) pid_dict[sentry_unit].update({process: pids}) return pid_dict
def _apply_overrides(settings, overrides, schema): """Get overrides config overlayed onto modules defaults. :param modules: require stack modules config. :returns: dictionary of modules config with user overrides applied. """ if overrides: for k, v in six.iteritems(overrides): if k in schema: if schema[k] is None: settings[k] = v elif type(schema[k]) is dict: settings[k] = _apply_overrides(settings[k], overrides[k], schema[k]) else: raise Exception("Unexpected type found in schema '%s'" % type(schema[k]), level=ERROR) else: log("Unknown override key '%s' - ignoring" % (k), level=INFO) return settings
def parse_vlan_range_mappings(mappings): """Parse vlan range mappings. Mappings must be a space-delimited list of provider:start:end mappings. The start:end range is optional and may be omitted. Returns dict of the form {provider: (start, end)}. """ _mappings = parse_mappings(mappings) if not _mappings: return {} mappings = {} for p, r in six.iteritems(_mappings): mappings[p] = tuple(r.split(':')) return mappings
def merge_default_with_oplog(graph, op_log=None, run_meta=None): """Monkeypatch. There currently is a bug in tfprof_logger that prevents it from being used with Python 3. So we override the method manually until the fix comes in. """ tmp_op_log = tfprof_log_pb2.OpLog() # pylint: disable=W0212 logged_ops = tfprof_logger._get_logged_ops(graph, run_meta) if not op_log: tmp_op_log.log_entries.extend(logged_ops.values()) else: all_ops = dict() for entry in op_log.log_entries: all_ops[entry.name] = entry for op_name, entry in six.iteritems(logged_ops): if op_name in all_ops: all_ops[op_name].types.extend(entry.types) if entry.float_ops > 0 and all_ops[op_name].float_ops == 0: all_ops[op_name].float_ops = entry.float_ops else: all_ops[op_name] = entry tmp_op_log.log_entries.extend(all_ops.values()) return tmp_op_log
def __init__(self, *args, **kwargs): # The super class also validates the connector super(PostgresSAConnector, self).__init__(*args, **kwargs) # dbparams can include values in http://www.postgresql.org/docs/ # current/static/libpq-connect.html#LIBPQ-PARAMKEYWORDS self.databaseOperators = PostgresOperators # Get a list of types and their classes so that we can cast using # sqlalchemy self.types = KnownTypes KnownTypes.update({ type: getattr(dialect, type) for type in dir(dialect) if isinstance(getattr(dialect, type), sqlalchemy.sql.visitors.VisitableType)}) # Include types that were added to the ischema_names table (this is # done, for instance, by the geoalchemy module). for ikey, itype in six.iteritems(dialect.base.ischema_names): key = getattr(itype, '__visit_name__', None) if key and key not in KnownTypes: KnownTypes[key] = itype
def __getstate__(self): state_dict = {k: v for k, v in iteritems(self.__dict__) if not k.startswith('_')} state_dict['_portfolio_store'] = self._portfolio_store state_dict['_account_store'] = self._account_store state_dict['processed_transactions'] = \ dict(self.processed_transactions) state_dict['orders_by_id'] = \ dict(self.orders_by_id) state_dict['orders_by_modified'] = \ dict(self.orders_by_modified) state_dict['_payout_last_sale_prices'] = \ self._payout_last_sale_prices STATE_VERSION = 3 state_dict[VERSION_LABEL] = STATE_VERSION return state_dict
def get_positions(self): positions = self._positions_store for sid, pos in iteritems(self.positions): if pos.amount == 0: # Clear out the position if it has become empty since the last # time get_positions was called. Catching the KeyError is # faster than checking `if sid in positions`, and this can be # potentially called in a tight inner loop. try: del positions[sid] except KeyError: pass continue # Note that this will create a position if we don't currently have # an entry position = positions[sid] position.amount = pos.amount position.cost_basis = pos.cost_basis position.last_sale_price = pos.last_sale_price return positions
def __init__(self, constants, dates, sids): loaders = {} for column, const in iteritems(constants): frame = DataFrame( const, index=dates, columns=sids, dtype=column.dtype, ) loaders[column] = DataFrameLoader( column=column, baseline=frame, adjustments=None, ) self._loaders = loaders
def test_completeness(self): """ Tests that all rules are being tested. """ if not self.class_: return # This is the base class testing, it is always complete. dem = { k for k, v in iteritems(vars(zipline.utils.events)) if isinstance(v, type) and issubclass(v, self.class_) and v is not self.class_ } ds = { k[5:] for k in dir(self) if k.startswith('test') and k[5:] in dem } self.assertTrue( dem <= ds, msg='This suite is missing tests for the following classes:\n' + '\n'.join(map(repr, dem - ds)), )
def create_bar_reader(cls, tempdir): resources = { cls.AAPL: join(TEST_RESOURCE_PATH, 'AAPL.csv'), cls.MSFT: join(TEST_RESOURCE_PATH, 'MSFT.csv'), cls.BRK_A: join(TEST_RESOURCE_PATH, 'BRK-A.csv'), } raw_data = { asset: read_csv(path, parse_dates=['day']).set_index('day') for asset, path in iteritems(resources) } # Add 'price' column as an alias because all kinds of stuff in zipline # depends on it being present. :/ for frame in raw_data.values(): frame['price'] = frame['close'] writer = DailyBarWriterFromCSVs(resources) data_path = tempdir.getpath('testdata.bcolz') table = writer.write(data_path, trading_days, cls.assets) return raw_data, BcolzDailyBarReader(table)
def with_defaults(**default_funcs): """ Decorator for providing dynamic default values for a method. Usages: @with_defaults(foo=lambda self: self.x + self.y) def func(self, foo): ... If a value is passed for `foo`, it will be used. Otherwise the function supplied to `with_defaults` will be called with `self` as an argument. """ def decorator(f): @wraps(f) def method(self, *args, **kwargs): for name, func in iteritems(default_funcs): if name not in kwargs: kwargs[name] = func(self) return f(self, *args, **kwargs) return method return decorator
def pipeline_event_loader_args(self, dates): _, mapping = super( BlazeShareBuybackAuthLoaderTestCase, self, ).pipeline_event_loader_args(dates) return (bz.data(pd.concat( pd.DataFrame({ BUYBACK_ANNOUNCEMENT_FIELD_NAME: frame[BUYBACK_ANNOUNCEMENT_FIELD_NAME], SHARE_COUNT_FIELD_NAME: frame[SHARE_COUNT_FIELD_NAME], TS_FIELD_NAME: frame[TS_FIELD_NAME], SID_FIELD_NAME: sid, }) for sid, frame in iteritems(mapping) ).reset_index(drop=True)),)
def insert(self, **kwargs): """ Insert commands at the beginning of the sequence. This is provided because certain commands have to come first (such as user creation), but may be need to beadded after other commands have already been specified. Later calls to insert put their commands before those in the earlier calls. Also, since the order of iterated kwargs is not guaranteed (in Python 2.x), you should really only call insert with one keyword at a time. See the doc of append for more details. :param kwargs: the key/value pair to append first :return: the action, so you can append Action(...).insert(...).append(...) """ for k, v in six.iteritems(kwargs): self.commands.insert(0, {k: v}) return self
def do(self, **kwargs): """ Here for compatibility with legacy clients only - DO NOT USE!!! This is sort of mix of "append" and "insert": it puts commands in the list, with some half smarts about which commands go at the front or back. If you add multiple commands to the back in one call, they will get added sorted by command name. :param kwargs: the commands in key=val format :return: the Action, so you can do Action(...).do(...).do(...) """ # add "create" / "add" / "removeFrom" first for k, v in list(six.iteritems(kwargs)): if k.startswith("create") or k.startswith("addAdobe") or k.startswith("removeFrom"): self.commands.append({k: v}) del kwargs[k] # now do the other actions, in a canonical order (to avoid py2/py3 variations) for k, v in sorted(six.iteritems(kwargs)): if k in ['add', 'remove']: self.commands.append({k: {"product": v}}) else: self.commands.append({k: v}) return self
def update(self, email=None, username=None, first_name=None, last_name=None, country=None): """ Update values on an existing user. See the API docs for what kinds of update are possible. :param email: new email for this user :param username: new username for this user :param first_name: new first name for this user :param last_name: new last name for this user :param country: new country for this user :return: the User, so you can do User(...).update(...).add_to_groups(...) """ if email: self._validate(email=email) if username: self._validate(username=username) updates = {} for k, v in six.iteritems(dict(email=email, username=username, firstname=first_name, lastname=last_name, country=country)): if v: updates[k] = v return self.append(update=updates)
def clean_headers(headers): """Forces header keys and values to be strings, i.e not unicode. The httplib module just concats the header keys and values in a way that may make the message header a unicode string, which, if it then tries to contatenate to a binary request body may result in a unicode decode error. Args: headers: dict, A dictionary of headers. Returns: The same dictionary but with all the keys converted to strings. """ clean = {} try: for k, v in six.iteritems(headers): if not isinstance(k, six.binary_type): k = str(k) if not isinstance(v, six.binary_type): v = str(v) clean[_to_bytes(k)] = _to_bytes(v) except UnicodeEncodeError: raise NonAsciiHeaderError(k, ': ', v) return clean
def _add_next_methods(self, resourceDesc, schema): # Add _next() methods # Look for response bodies in schema that contain nextPageToken, and methods # that take a pageToken parameter. if 'methods' in resourceDesc: for methodName, methodDesc in six.iteritems(resourceDesc['methods']): if 'response' in methodDesc: responseSchema = methodDesc['response'] if '$ref' in responseSchema: responseSchema = schema.get(responseSchema['$ref']) hasNextPageToken = 'nextPageToken' in responseSchema.get('properties', {}) hasPageToken = 'pageToken' in methodDesc.get('parameters', {}) if hasNextPageToken and hasPageToken: fixedMethodName, method = createNextMethod(methodName + '_next') self._set_dynamic_attr(fixedMethodName, method.__get__(self, self.__class__))
def _build_query(self, params): """Builds a query string. Args: params: dict, the query parameters Returns: The query parameters properly encoded into an HTTP URI query string. """ if self.alt_param is not None: params.update({'alt': self.alt_param}) astuples = [] for key, value in six.iteritems(params): if type(value) == type([]): for x in value: x = x.encode('utf-8') astuples.append((key, x)) else: if isinstance(value, six.text_type) and callable(value.encode): value = value.encode('utf-8') astuples.append((key, value)) return '?' + urlencode(astuples)
def test_init_with_values(self): value = self.faker.text() node = caduc.config.Node(someAttribute=value) dict(node).should.be.eql({'someAttribute':value}) dic = self.faker.pydict(nb_elements=10, variable_nb_elements=True) node = caduc.config.Node(**dic) for key, val in six.iteritems(node): val.should.be.eql(dic[key]) dict(node).should.be.eql(dic) # ensure Node initializer does not alter source dict dic = self.faker.pydict(nb_elements=10, variable_nb_elements=True) orig = dict(dic) caduc.config.Node(**dic) dic.should.be.eql(orig)
def get_grace_times(self, names): labels = self.details['Config']['Labels'] if labels and labels.get("com.caduc.image.grace_time"): return set([labels.get('com.caduc.image.grace_time', None)]) grace_config = self.config.get("images") grace_times = set() if grace_config: for name in names: for pattern, kv in six.iteritems(grace_config): if fnmatch.fnmatch(name, pattern): grace_time = kv['grace_time'] if grace_time is None or grace_time==-1: grace_times.add(float('inf')) else: grace_times.add(kv['grace_time']) if grace_times: return grace_times return set([self.grace_time])
def update(self, other): for k, v in six.iteritems(other): try: oldv = self[k] except KeyError: if isinstance(v, dict): node = Node() node.update(v) self[k] = node else: self[k] = v else: if isinstance(oldv, dict): if not isinstance(v, dict): raise ValueError("Can't update uncoherent values for key %s, old value: %r, new value: %r" % (k, oldv, v)) oldv.update(v) else: if isinstance(v, dict): raise ValueError("Can't update uncoherent values for key %s, old value: %r, new value: %r" % (k, oldv, v)) self[k] = v
def _add_table_metadata(self, table_metadata): old_indexes = {} old_meta = self.tables.get(table_metadata.name, None) if old_meta: # views are not queried with table, so they must be transferred to new table_metadata.views = old_meta.views # indexes will be updated with what is on the new metadata old_indexes = old_meta.indexes # note the intentional order of add before remove # this makes sure the maps are never absent something that existed before this update for index_name, index_metadata in six.iteritems(table_metadata.indexes): self.indexes[index_name] = index_metadata for index_name in (n for n in old_indexes if n not in table_metadata.indexes): self.indexes.pop(index_name, None) self.tables[table_metadata.name] = table_metadata
def _get_schema_mismatches(self, peers_result, local_result, local_address): peers_result = dict_factory(*peers_result.results) versions = defaultdict(set) if local_result.results: local_row = dict_factory(*local_result.results)[0] if local_row.get("schema_version"): versions[local_row.get("schema_version")].add(local_address) for row in peers_result: schema_ver = row.get('schema_version') if not schema_ver: continue addr = self._rpc_from_peer_row(row) peer = self._cluster.metadata.get_host(addr) if peer and peer.is_up is not False: versions[schema_ver].add(addr) if len(versions) == 1: log.debug("[control connection] Schemas match") return None return dict((version, list(nodes)) for version, nodes in six.iteritems(versions))
def __init__(self, *args, **kwargs): if len(args) > 1: raise TypeError('expected at most 1 arguments, got %d' % len(args)) self._items = [] self._index = {} if args: e = args[0] if callable(getattr(e, 'keys', None)): for k in e.keys(): self._insert(k, e[k]) else: for k, v in e: self._insert(k, v) for k, v in six.iteritems(kwargs): self._insert(k, v)