我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.utils.six.itervalues()。
def keep_lazy(*resultclasses): """ A decorator that allows a function to be called with one or more lazy arguments. If none of the args are lazy, the function is evaluated immediately, otherwise a __proxy__ is returned that will evaluate the function when needed. """ if not resultclasses: raise TypeError("You must pass at least one argument to keep_lazy().") def decorator(func): lazy_func = lazy(func, *resultclasses) @wraps(func) def wrapper(*args, **kwargs): for arg in list(args) + list(six.itervalues(kwargs)): if isinstance(arg, Promise): break else: return func(*args, **kwargs) return lazy_func(*args, **kwargs) return wrapper return decorator
def _attach_traffic_load(graph, time_interval={'start': '-10min', 'end': 'now'}): """ Inspects a topology graph and adds the required traffic-load data to it. Should be run _after_ the topology graph has been simplified. :type graph: nav.web.geomap.graph.Graph """ subedges = (edge for combo_edge in itervalues(graph.edges) for edge in (combo_edge.source_data['subedges'], combo_edge.target_data['subedges'])) needs_traffic_data = {(d['local_sysname'], d['local_interface']): d for edges in subedges for d in edges} get_cached_multiple_link_load(needs_traffic_data, time_interval)
def _get_temp_sensors_params(self, temp_sensors): """ Collect all temperature sensors and corresponding parameters""" sensors = [] for temp_sensor in itervalues(temp_sensors): temp_sensor_avail = temp_sensor.get('tempSensorAvail', None) if temp_sensor_avail: temp_oid = temp_sensor.get(0, None) serial = temp_sensor.get('tempSensorSerial', 0) name = temp_sensor.get('tempSensorName', None) sensors.append(self._make_result_dict( temp_oid, self._get_oid_for_sensor('tempSensorTempC'), serial, 'tempSensorTempC', u_o_m=Sensor.UNIT_CELSIUS, name=name)) return sensors
def _get_millivolt_sensors_params(self, millivolt_sensors): sensors = [] for millivolt_sensor in itervalues(millivolt_sensors): millivolt_avail = millivolt_sensor.get('millivoltMonitorAvail', None) if millivolt_avail: millivolt_oid = millivolt_sensor.get(0, None) serial = millivolt_sensor.get('millivoltMonitorSerial', None) name = millivolt_sensor.get('millivoltMonitorName', None) sensors.append(self._make_result_dict( millivolt_oid, self._get_oid_for_sensor('millivoltMonitorMV'), serial, 'millivoltMonitorMV', u_o_m=Sensor.UNIT_VOLTS_DC, scale='milli', name=name)) return sensors
def _get_outlet_params(self, outlet_sensors): sensors = [] for outlet_sensor in itervalues(outlet_sensors): outlet_avail = outlet_sensor.get('outletAvail', None) if outlet_avail: outlet_oid = outlet_sensor.get(0, None) serial = outlet_sensor.get('outletSerial', None) name = outlet_sensor.get('outletName', None) sensors.append(self._make_result_dict( outlet_oid, self._get_oid_for_sensor('outlet1Status'), serial, 'outlet1Status', name=name)) sensors.append(self._make_result_dict( outlet_oid, self._get_oid_for_sensor('outlet2Status'), serial, 'outlet2Status', name=name)) return sensors
def _get_outlet_params(self, outlets): sensors = [] for outlet in itervalues(outlets): outlet_avail = outlet.get('outletAvail', None) if outlet_avail: outlet_oid = outlet.get(0, None) serial = outlet.get('outletSerial', None) name = outlet.get('outletName', None) sensors.append(self._make_result_dict( outlet_oid, self._get_oid_for_sensor('outlet1Status'), serial, 'outlet1Status', name=name)) sensors.append(self._make_result_dict( outlet_oid, self._get_oid_for_sensor('outlet2Status'), serial, 'outlet2Status', name=name)) return sensors
def _get_ctrl_grp_amps_params(self, ctrl_grp_amps): sensors = [] for ctrl_grp_amp in itervalues(ctrl_grp_amps): ctrl_grp_amp_avail = ctrl_grp_amp.get('ctrlGrpAmpsAvail', None) if ctrl_grp_amp_avail: ctrl_grp_amp_oid = ctrl_grp_amp.get(0, None) serial = ctrl_grp_amp.get('ctrlGrpAmpsSerial', None) name = ctrl_grp_amp_avail.get('ctrlGrpAmpsName', None) postfixes = [chr(i) for i in range(ord('A'), ord('I'))] for pfix in postfixes: sensors.append(self._make_result_dict( ctrl_grp_amp_oid, self._get_oid_for_sensor('ctrlGrpAmps' + pfix), serial, 'ctrlGrpAmps' + pfix, u_o_m=Sensor.UNIT_AMPERES, name=name)) sensors.append(self._make_result_dict( ctrl_grp_amp_oid, self._get_oid_for_sensor('ctrlGrpAmps' + pfix + 'Volts'), serial, 'ctrlGrpAmps' + pfix + 'AVolts', name=name)) return sensors
def _get_climate_relays_params(self, climate_relays): sensors = [] for climate_relay in itervalues(climate_relays): climate_relay_avail = climate_relay.get('climateRelayAvail', None) if climate_relay_avail: climate_relay_oid = climate_relay.get(0, None) serial = climate_relay.get('climateRelaySerial', None) name = climate_relay.get('climateRelayName', None) sensors.append(self._make_result_dict( climate_relay_oid, self._get_oid_for_sensor('climateRelayTempC'), serial, 'climateRelayTempC', u_o_m=Sensor.UNIT_CELSIUS, name=name)) for i in range(1, 7): sensors.append(self._make_result_dict( climate_relay_oid, self._get_oid_for_sensor('climateRelayIO' + str(i)), serial, 'climateRelayIO' + str(i), name=name)) return sensors
def _get_ctrl_relays_params(self, ctrl_relays): sensors = [] for ctrl_relay in itervalues(ctrl_relays): ctrl_relay_oid = ctrl_relay.get(0, None) serial = ctrl_relay.get('ctrlRelayIndex', None) name = ctrl_relay.get('ctrlRelayName', None) sensors.append(self._make_result_dict( ctrl_relay_oid, self._get_oid_for_sensor('ctrlRelayState'), serial, 'ctrlRelayState', name=name)) sensors.append(self._make_result_dict( ctrl_relay_oid, self._get_oid_for_sensor('ctrlRelayLatchingMode'), serial, 'ctrlRelayLatchingMode', name=name)) sensors.append(self._make_result_dict( ctrl_relay_oid, self._get_oid_for_sensor('ctrlRelayOverride'), serial, 'ctrlRelayOverride', name=name)) sensors.append(self._make_result_dict( ctrl_relay_oid, self._get_oid_for_sensor('ctrlRelayAcknowledge'), serial, 'ctrlRelayAcknowledge', name=name)) return sensors
def _get_power_dms_params(self, power_dms): sensors = [] for power_dm in itervalues(power_dms): power_dm_avail = power_dm.get('powerDMAvail', None) if power_dm_avail: power_dm_oid = power_dm.get(0, None) serial = power_dm.get('powerDMSerial', None) name = power_dm.get('powerDMName', None) aux_count = power_dm.get('powerDMUnitInfoAuxCount', None) for i in range(1, (aux_count + 1)): aux_numb = str(i) aux_name = (name + ' ' + power_dm.get('powerDMChannelGroup' + aux_numb, None)) aux_name += ': ' + power_dm_oid.get('powerDMChannelName' + aux_numb, None) aux_name += (' - ' + power_dm_oid.get('powerDMChannelFriendly' + aux_numb, None)) sensors.append(self._make_result_dict( power_dm_oid, self._get_oid_for_sensor('powerDMDeciAmps' + aux_numb), serial, 'powerDMDeciAmps' + aux_numb, u_o_m=Sensor.UNIT_AMPERES, name=aux_name)) return sensors
def allow_lazy(func, *resultclasses): """ A decorator that allows a function to be called with one or more lazy arguments. If none of the args are lazy, the function is evaluated immediately, otherwise a __proxy__ is returned that will evaluate the function when needed. """ lazy_func = lazy(func, *resultclasses) @wraps(func) def wrapper(*args, **kwargs): for arg in list(args) + list(six.itervalues(kwargs)): if isinstance(arg, Promise): break else: return func(*args, **kwargs) return lazy_func(*args, **kwargs) return wrapper
def check_generic_foreign_keys(**kwargs): from .fields import GenericForeignKey errors = [] fields = (obj for cls in apps.get_models() for obj in six.itervalues(vars(cls)) if isinstance(obj, GenericForeignKey)) for field in fields: errors.extend(field.check()) return errors
def list(self, ignore_patterns): """ List all files in all app storages. """ for storage in six.itervalues(self.storages): if storage.exists(''): # check if storage location exists for path in utils.get_files(storage, ignore_patterns): yield path, storage
def get_action_choices(self, request, default_choices=BLANK_CHOICE_DASH): """ Return a list of choices for use in a form object. Each choice is a tuple (name, description). """ choices = [] + default_choices for func, name, description in six.itervalues(self.get_actions(request)): choice = (name, description % model_format_dict(self.opts)) choices.append(choice) return choices
def _prepare(self, model): if self.order_with_respect_to: # The app registry will not be ready at this point, so we cannot # use get_field(). query = self.order_with_respect_to try: self.order_with_respect_to = next( f for f in self._get_fields(reverse=False) if f.name == query or f.attname == query ) except StopIteration: raise FieldDoesNotExist('%s has no field named %r' % (self.object_name, query)) self.ordering = ('_order',) if not any(isinstance(field, OrderWrt) for field in model._meta.local_fields): model.add_to_class('_order', OrderWrt()) else: self.order_with_respect_to = None if self.pk is None: if self.parents: # Promote the first parent link in lieu of adding yet another # field. field = next(six.itervalues(self.parents)) # Look for a local field with the same name as the # first parent link. If a local field has already been # created, use it instead of promoting the parent already_created = [fld for fld in self.local_fields if fld.name == field.name] if already_created: field = already_created[0] field.primary_key = True self.setup_pk(field) else: auto = AutoField(verbose_name='ID', primary_key=True, auto_created=True) model.add_to_class('id', auto)
def check_generic_foreign_keys(app_configs=None, **kwargs): from .fields import GenericForeignKey if app_configs is None: models = apps.get_models() else: models = chain.from_iterable(app_config.get_models() for app_config in app_configs) errors = [] fields = ( obj for model in models for obj in six.itervalues(vars(model)) if isinstance(obj, GenericForeignKey) ) for field in fields: errors.extend(field.check()) return errors
def test_itervalues(self): self.session['x'] = 1 self.session.modified = False self.session.accessed = False i = six.itervalues(self.session) self.assertTrue(hasattr(i, '__iter__')) self.assertTrue(self.session.accessed) self.assertFalse(self.session.modified) self.assertEqual(list(i), [1])
def reset(self): # Store unused temporary file names in order to delete them # at the end of the response cycle through a callback attached in # `update_response`. wizard_files = self.data[self.step_files_key] for step_files in six.itervalues(wizard_files): for step_file in six.itervalues(step_files): self._tmp_files.append(step_file['tmp_name']) self.init_data()
def sitemap(request, sitemaps, section=None, template_name='sitemap.xml', content_type='application/xml'): req_protocol = request.scheme req_site = get_current_site(request) if section is not None: if section not in sitemaps: raise Http404("No sitemap available for section: %r" % section) maps = [sitemaps[section]] else: maps = list(six.itervalues(sitemaps)) page = request.GET.get("p", 1) urls = [] for site in maps: try: if callable(site): site = site() urls.extend(site.get_urls(page=page, site=req_site, protocol=req_protocol)) except EmptyPage: raise Http404("Page %s empty" % page) except PageNotAnInteger: raise Http404("No page '%s'" % page) response = TemplateResponse(request, template_name, {'urlset': urls}, content_type=content_type) if hasattr(site, 'latest_lastmod'): # if latest_lastmod is defined for site, set header so as # ConditionalGetMiddleware is able to send 304 NOT MODIFIED lastmod = site.latest_lastmod response['Last-Modified'] = http_date( timegm( lastmod.utctimetuple() if isinstance(lastmod, datetime.datetime) else lastmod.timetuple() ) ) return response
def __init__(self, form, inline_formsets): super(AdminErrorList, self).__init__() if form.is_bound: self.extend(list(six.itervalues(form.errors))) for inline_formset in inline_formsets: self.extend(inline_formset.non_form_errors()) for errors_in_inline_form in inline_formset.errors: self.extend(list(six.itervalues(errors_in_inline_form)))