我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用types.DictType()。
def find_modules(self, type_c, container): container = container or self nodes = {} containers = {} mod_type = type(self) if mod_type == type_c: nodes[len(nodes)+1] = self containers[len(containers)] = container # Recurse on nodes with 'modules' if self.modules is not None: if type(self.modules) is DictType: for i in xrange(len(self.modules)): child = self.modules[i] cur_nodes, cur_containers = child.find_modules( type_c, self) # This shouldn't happen if not len(cur_nodes) == len(cur_containers): raise Exception('Internal error: incorrect return length') # add the list items from our child to our list (ie return a # flattened table of the return nodes). for j in xrange(len(cur_nodes)): nodes[len(cur_nodes)+1] = cur_nodes[j] containers[len(containers)+1] = cur_containers[j] return nodes, containers
def append(self, argument, typehint = None): """Appends data to the bundle, creating an OSCMessage to encapsulate the provided argument unless this is already an OSCMessage. Any newly created OSCMessage inherits the OSCBundle's address at the time of creation. If 'argument' is an iterable, its elements will be encapsuated by a single OSCMessage. Finally, 'argument' can be (or contain) a dict, which will be 'converted' to an OSCMessage; - if 'addr' appears in the dict, its value overrides the OSCBundle's address - if 'args' appears in the dict, its value(s) become the OSCMessage's arguments """ if isinstance(argument, OSCMessage): binary = OSCBlob(argument.getBinary()) else: msg = OSCMessage(self.address) if type(argument) == types.DictType: if 'addr' in argument: msg.setAddress(argument['addr']) if 'args' in argument: msg.append(argument['args'], typehint) else: msg.append(argument, typehint) binary = OSCBlob(msg.getBinary()) self.message += binary self.typetags += 'b'
def _setTarget(self, address, prefix=None, filters=None): """Add (i.e. subscribe) a new OSCTarget, or change the prefix for an existing OSCTarget. - address ((host, port) tuple): IP-address & UDP-port - prefix (string): The OSC-address prefix prepended to the address of each OSCMessage sent to this OSCTarget (optional) """ if address not in self.targets.keys(): self.targets[address] = ["",{}] if prefix != None: if len(prefix): # make sure prefix starts with ONE '/', and does not end with '/' prefix = '/' + prefix.strip('/') self.targets[address][0] = prefix if filters != None: if type(filters) in types.StringTypes: (_, filters) = parseFilterStr(filters) elif type(filters) != types.DictType: raise TypeError("'filters' argument must be a dict with {addr:bool} entries") self._updateFilters(self.targets[address][1], filters)
def parse(self, dict, header=TRUE): """parse(dict) -> string This method parses the open file object passed, replacing any keys found using the replacement dictionary passed.""" if type(dict) != types.DictType: raise TypeError, "Second argument must be a dictionary" if not self.template: raise OpagMissingPrecondition, "template path is not set" # Open the file if its not already open. If it is, seek to the # beginning of the file. if not self.template_file: self.template_file = open(self.template, "r") else: self.template_file.seek(0) # Instantiate a new bound method to do the replacement. replacer = Replacer(dict).replace # Read in the entire template into memory. I guess we'd better keep # the templates a reasonable size if we're going to keep doing this. buffer = self.template_file.read() replaced = "" if header: replaced = "Content-Type: text/html\n\n" replaced = replaced + re.sub("%%(\w+)%%", replacer, buffer) return replaced
def _get_table_type(self, data): # ?????????table?? if self._table_type is not None: return self._table_type # ??? if data is None or len(data) == 0: return ListTable # ?????????? row = data[0] if isinstance(row, SequenceCollectionType): return ListTable elif isinstance(row, types.DictType): return DictTable else: return ObjectTable
def get_event(self, block=1): """Return an Event instance. Returns None if |block| is false and there is no event pending, otherwise waits for the completion of an event.""" while 1: if self.event_queue: return self.event_queue.pop(0) elif block: pyg_event = pygame.event.wait() else: pyg_event = pygame.event.poll() if pyg_event.type == NOEVENT: return if pyg_event.key in modcolors: continue k, c = self.tr_event(pyg_event) self.cmd_buf += c.encode('ascii', 'replace') self.k = k if not isinstance(k, types.DictType): e = Event(k, self.cmd_buf, []) self.k = self.keymap self.cmd_buf = '' return e
def unwrap(value): t = type(value) if t is types.InstanceType and isinstance(value, DynamicProxy): if pyro_daemon: try: return pyro_daemon.getLocalObject(value.objectID) except KeyError: pass return value elif t is types.ListType: for i in range(len(value)): value[i] = unwrap(value[i]) elif t is types.TupleType: value = list(value) for i in range(len(value)): value[i] = unwrap(value[i]) return tuple(value) elif t is types.DictType: for k, v in value.items(): value[k] = unwrap(v) return value
def __init__(self, initfile): self.__initfile = initfile self.__colordb = None self.__optiondb = {} self.__views = [] self.__red = 0 self.__green = 0 self.__blue = 0 self.__canceled = 0 # read the initialization file fp = None if initfile: try: try: fp = open(initfile) self.__optiondb = marshal.load(fp) if not isinstance(self.__optiondb, DictType): print >> sys.stderr, \ 'Problem reading options from file:', initfile self.__optiondb = {} except (IOError, EOFError, ValueError): pass finally: if fp: fp.close()
def unparse(self,dn,record): """ dn string-representation of distinguished name record Either a dictionary holding the LDAP entry {attrtype:record} or a list with a modify list like for LDAPObject.modify(). """ if not record: # Simply ignore empty records return # Start with line containing the distinguished name self._unparseAttrTypeandValue('dn',dn) # Dispatch to record type specific writers if isinstance(record,types.DictType): self._unparseEntryRecord(record) elif isinstance(record,types.ListType): self._unparseChangeRecord(record) else: raise ValueError, "Argument record must be dictionary or list" # Write empty line separating the records self._output_file.write(self._line_sep) # Count records written self.records_written = self.records_written+1 return # unparse()
def createSortDicDump(dic): """ Create a sorted ASCII representation of a dictionary. The order is sorted according to the dictionary keys. dic: Source dictionary (dictionary). Returns: Sorted dictionary ASCII representation (string). """ if (type(dic) != types.DictType): raise Exception, "Object given is not a dictionary" keys = dic.keys() keys.sort() asciiDic = "" for key in keys: asciiDic += ", '%s': '%s'" % (key, dic[key]) asciiDic = "{" + asciiDic[2:] + "}" return asciiDic
def validate_request(request): if not isinstance(request, dict): fault = Fault( -32600, 'Request must be {}, not %s.' % type(request) ) return fault rpcid = request.get('id', None) version = get_version(request) if not version: fault = Fault(-32600, 'Request %s invalid.' % request, rpcid=rpcid) return fault request.setdefault('params', []) method = request.get('method', None) params = request.get('params') param_types = (types.ListType, types.DictType, types.TupleType) if not method or type(method) not in types.StringTypes or \ type(params) not in param_types: fault = Fault( -32600, 'Invalid request parameters or method.', rpcid=rpcid ) return fault return True
def QueryToGAP(req): try: qu = req.query except: qu = {} if not types.DictType == type(qu): qu = {} res = cStringIO.StringIO() res.write('GAPQUERY:=rec();;') for k in qu.keys(): res.write('GAPQUERY.("') if len(k) > 1000: res.write(StringToGAP(k[:1000])) else: res.write(StringToGAP(k)) res.write('"):="') res.write(StringToGAP(str(qu[k][0]))) res.write('";;') res.write('\n') s = res.getvalue() res.close() return s
def _escape_json(json): """Escapes all string fields of JSON data. Operates recursively.""" t = type(json) if t == types.StringType or t == types.UnicodeType: return cgi.escape(json) elif t == types.IntType: return json elif t == types.FloatType: return json elif t == types.DictType: result = {} for f in json.keys(): result[f] = _escape_json(json[f]) return result elif t == types.ListType: result = [] for f in json: result.append(_escape_json(f)) return result else: raise RuntimeError, "Unsupported type: %s" % str(t)
def getrefs(i, depth): """Get the i'th object in memory, return objects that reference it""" import sys, gc, types o = sys.getobjects(i)[-1] for d in range(depth): for ref in gc.get_referrers(o): if type(ref) in (types.ListType, types.DictType, types.InstanceType): if type(ref) is types.DictType and ref.has_key('copyright'): continue o = ref break else: print "Max depth ", d return o return o
def main(ipa_bases, ipa_all, dia_defs, sort_order): segments = read_ipa_bases(ipa_bases) assert isinstance(segments, ListType) diacritics, combinations = parse_dia_defs(dia_defs) assert isinstance(diacritics, DictType) assert isinstance(combinations, ListType) all_segments = set(segments) for diacritic in diacritics.values(): assert isinstance(diacritic, Diacritic) for segment in segments: new_seg = diacritic.apply(segment) if new_seg is not None: all_segments.add(new_seg) for combination in combinations: assert isinstance(combination, Combination) for segment in segments: new_seg = combination.apply(segment) if new_seg is not None: all_segments.add(new_seg) write_ipa_all(ipa_bases, ipa_all, all_segments, sort_order)
def _checkNpcNames(self, name): def match(npcName, name = name): name = TextEncoder().encodeWtext(name) name = string.strip(name) return TextEncoder.upper(npcName) == TextEncoder.upper(name) for npcId in NPCList.NPC_LIST.keys(): data = NPCList.NPC_LIST[npcId] if type(data) is types.DictType and HumanDNA.HumanDNA.setName in data: npcName = data[HumanDNA.HumanDNA.setName] if (self.independent or not (self.main.isNPCEditor)) and match(npcName): self.notify.info('name matches NPC name "%s"' % npcName) return OL.NCGeneric match(npcName)
def copy(self): """Return a copy of the inctance.""" if self.is_struct: obj = Structure(set_all_auto=False) elif self.is_traj: obj = Trajectory(set_all_auto=False) # Copy attrs over for name in self.attr_lst: val = getattr(self, name) if val is None: setattr(obj, name, None) # dict.copy() is shallow, use deepcopy instead elif hasattr(val, 'copy') and not isinstance(val, types.DictType): setattr(obj, name, val.copy()) else: setattr(obj, name, copy.deepcopy(val)) return obj
def __init__(self, deviceID, command, routine=False, force=False, source=None, send_event=True, notify=False, speech=False): from core import sendCommand self._deviceID = deviceID self._command = command self._routine = routine self._force = force self._notify = notify self._source = source self._speech = speech self._send_event = send_event self._simple = not isinstance( command, types.DictType) if command is not None else False if self._simple: self._command = {command: ''} if self._command is not None: if self._command.get('isSpeech'): self._speech = True self._result = sendCommand(self)
def remove_optional_keys(self): """ Removes values for keys that are not required from object's data. :raises: **InvalidRequest** - in case object's data is not a dictionary. """ if self.OPTIONAL_KEYS_ALLOWED: return if type(self.data) != types.DictType: raise InvalidRequest('Data object is not a dictionary: %s.' % str(self.data)) removed_keys = [] for (key,value) in self.data.items(): if not self.get_required_data_defaults().has_key(key): if key not in self.USER_PROVIDED_KEYS and not key.startswith('#'): removed_keys.append(key) for key in removed_keys: del self.data[key]
def py_to_uge(self, key, value): for (uge_value, py_value) in self.UGE_PYTHON_OBJECT_MAP.items(): if value == py_value and type(value) == type(py_value): if self.UGE_CASE_SENSITIVE_KEYS.has_key(key): return self.UGE_CASE_SENSITIVE_KEYS[key](uge_value) return uge_value if type(value) == types.ListType: delimiter = self.LIST_KEY_MAP.get(key, self.DEFAULT_LIST_DELIMITER) return delimiter.join(value) elif type(value) == types.DictType: delimiter = self.DICT_KEY_MAP.get(key, self.DEFAULT_DICT_DELIMITER) dict_tokens = [] for (item_key,item_value) in value.items(): dict_tokens.append('%s%s%s' % (item_key, self.DICT_VALUE_DELIMITER, item_value)) return delimiter.join(dict_tokens) return value
def test_generate_queue_from_json(): queue = API.get_queue(QUEUE_NAME) json = queue.to_json() queue2 = API.generate_object(json) assert(queue2.__class__.__name__ == queue.__class__.__name__) for key in queue.data.keys(): v = queue.data[key] v2 = queue2.data[key] if type(v) == types.ListType: assert(len(v) == len(v2)) for s in v: assert(v2.count(s) == 1) elif type(v) == types.DictType: for key in v.keys(): assert(str(v[key]) == str(v2[key])) else: assert(str(v) == str(v2))
def test_generate_sconf_from_json(): sconf = API.get_sconf() json = sconf.to_json() sconf2 = API.generate_object(json) assert(sconf2.__class__.__name__ == sconf.__class__.__name__) for key in sconf.data.keys(): v = sconf.data[key] v2 = sconf2.data[key] if type(v) == types.ListType: assert(len(v) == len(v2)) for s in v: assert(v2.count(s) == 1) elif type(v) == types.DictType: for key in v.keys(): assert(str(v[key]) == str(v2[key])) else: assert(str(v) == str(v2))
def test_generate_stree_from_json(): stree = API.get_stree() json = stree.to_json() stree2 = API.generate_object(json) assert(stree2.__class__.__name__ == stree.__class__.__name__) assert(len(stree.data) == len(stree2.data)) for i in range(0,len(stree.data)): v = stree.data[i] v2 = stree2.data[i] assert(type(v) == types.DictType) assert(type(v2) == types.DictType) for key in v.keys(): x = v[key] x2 = v2[key] if type(x) == types.ListType: assert(len(x) == len(x2)) for y in x: assert(x2.count(y) == 1) else: assert(str(x) == str(x2))
def test_generate_user_from_json(): user = API.get_user(USER_NAME) json = user.to_json() user2 = API.generate_object(json) assert(user2.__class__.__name__ == user.__class__.__name__) for key in user.data.keys(): v = user.data[key] v2 = user2.data[key] if type(v) == types.ListType: assert(len(v) == len(v2)) for s in v: assert(v2.count(s) == 1) elif type(v) == types.DictType: for key in v.keys(): assert(str(v[key]) == str(v2[key])) else: assert(str(v) == str(v2))
def test_generate_prj_from_json(): prj = API.get_prj(PROJECT_NAME) json = prj.to_json() prj2 = API.generate_object(json) assert(prj2.__class__.__name__ == prj.__class__.__name__) for key in prj.data.keys(): v = prj.data[key] v2 = prj2.data[key] if type(v) == types.ListType: assert(len(v) == len(v2)) for s in v: assert(v2.count(s) == 1) elif type(v) == types.DictType: for key in v.keys(): assert(str(v[key]) == str(v2[key])) else: assert(str(v) == str(v2))
def test_generate_jc_from_json(): jc = API.get_jc(JC_NAME) json = jc.to_json() jc2 = API.generate_object(json) assert(jc2.__class__.__name__ == jc.__class__.__name__) for key in jc.data.keys(): v = jc.data[key] v2 = jc2.data[key] if type(v) == types.ListType: assert(len(v) == len(v2)) for s in v: assert(v2.count(s) == 1) elif type(v) == types.DictType: for key in v.keys(): assert(str(v[key]) == str(v2[key])) else: assert(str(v) == str(v2))
def test_generate_rqs_from_json(): rqs = API.get_rqs(RQS_NAME) json = rqs.to_json() rqs2 = API.generate_object(json) assert(rqs2.__class__.__name__ == rqs.__class__.__name__) for key in rqs.data.keys(): v = rqs.data[key] v2 = rqs2.data[key] if type(v) == types.ListType: assert(len(v) == len(v2)) for s in v: assert(v2.count(s) == 1) elif type(v) == types.DictType: for key in v.keys(): assert(str(v[key]) == str(v2[key])) else: assert(str(v) == str(v2))
def test_generate_conf_from_json(): conf = API.get_conf(name='global') json = conf.to_json() conf2 = API.generate_object(json) assert(conf2.__class__.__name__ == conf.__class__.__name__) for key in conf.data.keys(): v = conf.data[key] v2 = conf2.data[key] if type(v) == types.ListType: assert(len(v) == len(v2)) for s in v: assert(v2.count(s) == 1) elif type(v) == types.DictType: for key in v.keys(): assert(str(v[key]) == str(v2[key])) else: assert(str(v) == str(v2))
def append(self, argument, typehint=None): """Appends data to the message, updating the typetags based on the argument's type. If the argument is a blob (counted string) pass in 'b' as typehint. 'argument' may also be a list or tuple, in which case its elements will get appended one-by-one, all using the provided typehint """ if type(argument) == types.DictType: argument = argument.items() elif isinstance(argument, OSCMessage): raise TypeError("Can only append 'OSCMessage' to 'OSCBundle'") if hasattr(argument, '__iter__'): for arg in argument: self.append(arg, typehint) return if typehint == 'b': binary = OSCBlob(argument) tag = 'b' elif typehint == 't': binary = OSCTimeTag(argument) tag = 't' else: tag, binary = OSCArgument(argument, typehint) self.typetags += tag self.message += binary
def _check_callback(self): if self.action == "callback": if not hasattr(self.callback, '__call__'): raise OptionError( "callback not callable: %r" % self.callback, self) if (self.callback_args is not None and type(self.callback_args) is not types.TupleType): raise OptionError( "callback_args, if supplied, must be a tuple: not %r" % self.callback_args, self) if (self.callback_kwargs is not None and type(self.callback_kwargs) is not types.DictType): raise OptionError( "callback_kwargs, if supplied, must be a dict: not %r" % self.callback_kwargs, self) else: if self.callback is not None: raise OptionError( "callback supplied (%r) for non-callback option" % self.callback, self) if self.callback_args is not None: raise OptionError( "callback_args supplied for non-callback option", self) if self.callback_kwargs is not None: raise OptionError( "callback_kwargs supplied for non-callback option", self)
def __cmp__(self, other): if isinstance(other, Values): return cmp(self.__dict__, other.__dict__) elif isinstance(other, types.DictType): return cmp(self.__dict__, other) else: return -1
def schedule(scheduler, runbook, target, config, dbc, logger): ''' Setup schedule for new runbooks and targets ''' # Default schedule (every minute) task_schedule = { 'second' : 0, 'minute' : '*', 'hour' : '*', 'day' : '*', 'month' : '*', 'day_of_week' : '*' } # If schedule is present override default if 'schedule' in target['runbooks'][runbook].keys(): if type(target['runbooks'][runbook]['schedule']) == types.DictType: for key in target['runbooks'][runbook]['schedule'].keys(): task_schedule[key] = target['runbooks'][runbook]['schedule'][key] elif type(target['runbooks'][runbook]['schedule']) == types.StringType: breakdown = target['runbooks'][runbook]['schedule'].split(" ") task_schedule = { 'second' : 0, 'minute' : breakdown[0], 'hour' : breakdown[1], 'day' : breakdown[2], 'month' : breakdown[3], 'day_of_week' : breakdown[4] } cron = CronTrigger( second=task_schedule['second'], minute=task_schedule['minute'], hour=task_schedule['hour'], day=task_schedule['day'], month=task_schedule['month'], day_of_week=task_schedule['day_of_week'], ) return scheduler.add_job( monitor, trigger=cron, args=[runbook, target, config, dbc, logger] )
def validate_request(request): if not isinstance(request, types.DictType): return Fault(-32600, 'Request must be {}, not %s.' % type(request)) rpcid = request.get('id', None) version = get_version(request) if not version: return Fault(-32600, 'Request %s invalid.' % request, rpcid=rpcid) request.setdefault('params', []) method = request.get('method', None) params = request.get('params') param_types = (types.ListType, types.DictType, types.TupleType) if not method or type(method) not in types.StringTypes or type(params) not in param_types: return Fault(-32600, 'Invalid request parameters or method.', rpcid=rpcid) return True
def _get_key_and_headers(keys, rows): if keys is None: if len(rows) == 0: keys = [] else: r0 = rows[0] if type(r0) == types.DictType: keys = r0.keys() keys.sort() elif type(r0) in listtype: keys = [i for i in range(len(r0))] else: keys = [''] _keys = [] column_headers = [] for k in keys: if type(k) not in listtype: k = [k, k] _keys.append(k[0]) column_headers.append(str(k[1])) return _keys, column_headers
def __call__(self, query_result): if not query_result: return {} row_0 = query_result[0] if isinstance(row_0, (types.ListType, types.TupleType, types.DictType)): return {row[self._key_index]: row for row in query_result} return {getattr(row, self._key_index): row for row in query_result}
def __init__(self, msg): """ :param msg ???????????????key?locale??? """ if isinstance(msg, types.DictType): country_code, _ = locale.getlocale(locale.LC_ALL) msg = msg[country_code] if isinstance(msg, unicode): super(GirlFriendException, self).__init__(msg.encode("utf-8")) self.msg = msg elif isinstance(msg, str): super(GirlFriendException, self).__init__(msg) self.msg = msg.decode("utf-8") else: raise TypeError
def append(self, row): if not isinstance(row, types.DictType): raise InvalidTypeException(u"DictTable????????????") if len(row) != self.column_num: raise InvalidSizeException( u"??????{0}???????{1}??????".format(len(row), self.column_num) ) self._data.append(row)
def _execute(self, context, template_args): args = self._get_runtime_args(context, template_args) # ??????? executable = self._get_executable(context) result = None if args is None: result = executable(context) elif isinstance(args, SequenceCollectionType): result = executable(context, *args) elif isinstance(args, types.DictType): result = executable(context, **args) return result
def _execute(self, context, sub_args): executable = self._get_executable(context) results = [] for args in sub_args: if self._error_break: # ?????????? return try: result = None if args is None: result = executable(context) elif isinstance(args, SequenceCollectionType): result = executable(context, *args) elif isinstance(args, types.DictType): result = executable(context, **args) except Exception as e: context.logger.exception( u"????'{}'???????????????'{}'".format( self.name, self._error_action)) if self._error_action == "stop": # rethrow???????? self._error_break = True # ??????? raise e elif self._error_action == "continue": # ???????error_handler if self._error_handler is not None: exc_type, exc_value, tb = sys.exc_info() self._error_handler(context, exc_type, exc_value, tb) # ??????????None??????? results.append(self._error_default_value) else: results.append(result) if self._sub_join is not None: return self._sub_join(context, results) return results
def getSource(self): #XXX make state be foo=bar instead of a dict. if self.stateIsDict: stateDict = self.state elif isinstance(self.state, Ref) and isinstance(self.state.obj, types.DictType): stateDict = self.state.obj else: stateDict = None if stateDict is not None: try: return "Instance(%r, %s)" % (self.klass, dictToKW(stateDict)) except NonFormattableDict: return "Instance(%r, %s)" % (self.klass, prettify(stateDict)) return "Instance(%r, %s)" % (self.klass, prettify(self.state))
def prettify(obj): if hasattr(obj, 'getSource'): return obj.getSource() else: #basic type t = type(obj) if t in _SIMPLE_BUILTINS: return repr(obj) elif t is types.DictType: out = ['{'] for k,v in obj.items(): out.append('\n\0%s: %s,' % (prettify(k), prettify(v))) out.append(len(obj) and '\n\0}' or '}') return string.join(out, '') elif t is types.ListType: out = ["["] for x in obj: out.append('\n\0%s,' % prettify(x)) out.append(len(obj) and '\n\0]' or ']') return string.join(out, '') elif t is types.TupleType: out = ["("] for x in obj: out.append('\n\0%s,' % prettify(x)) out.append(len(obj) and '\n\0)' or ')') return string.join(out, '') else: raise TypeError("Unsupported type %s when trying to prettify %s." % (t, obj))
def list_modules(self): def tinsert(to, _from): if type(_from) == DictType: for i in xrange(len(_from)): tinsert(to, _from[i]) else: to.update(_from) modules = self if self.modules: for i in xrange(len(self.modules)): modulas = self.modules[i].list_modules() if modulas: tinsert(modules, modulas) return modules
def merge(x,y): # store a copy of x, but overwrite with y's values where applicable merged = dict(x,**y) xkeys = x.keys() # if the value of merged[key] was overwritten with y[key]'s value # then we need to put back any missing x[key] values for key in xkeys: # if this key is a dictionary, recurse if type(x[key]) is types.DictType and y.has_key(key): merged[key] = merge(x[key],y[key]) return merged