我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用types.TypeType()。
def _start(self, spin): for args, kwargs in self.subscribers: self.subscribers_init.append(rospy.Subscriber(*args, **kwargs)) is_func = isinstance(self.cl, types.FunctionType) is_class = isinstance(self.cl, types.TypeType) if is_class: targ = self.__start_class elif is_func: targ = self.__start_func self.thread = threading.Thread(target=targ, args=(self.cl,) + self.cl_args, kwargs=self.cl_kwargs) self.thread.daemon = True self.thread.start() if spin: rospy.spin() return self
def addDataSource(self, source=SampleDataSource, name='default', sourceModule=None, sourceArgs=[], sourceKwargs={}): if name in self._sources: raise Exception('Data source "%s" already exists!' % name) if utils.isstr(source): if utils.isstr(sourceModule): sourceModule = __import__(sourceModule) if sourceModule is not None: source = sourceModule.__dict__[source] elif type(source) in [ types.ClassType, types.TypeType]: source = source(*sourceArgs, **sourceKwargs) cds = weakref.WeakSet() self._sources[name] = (source, cds) self._lastSamples[name] = source.initialSamples() for cd in list(self._lostCurveDatas): if self._tryAddToDataSource(cd, name): self._lostCurveDatas.remote(cds)
def publisher(self, *upper_args, **kwargs): if not "queue_size" in kwargs: kwargs["queue_size"] = 1 if isinstance(upper_args[0], str): topic_name, msg_type = upper_args def __decorator(func): args = [topic_name, msg_type] pub = rospy.Publisher(*args, **kwargs) def __inner(*args, **kwargs): msg = func(*args, **kwargs) pub.publish(msg) return msg return __inner return __decorator elif isinstance(upper_args[0], types.TypeType): return self.__multi_publisher(upper_args[0], **kwargs)
def inline_args(f): # print '@@', f.__name__, type(f), isinstance(f, types.FunctionType), isinstance(f, types.TypeType), isinstance(f, types.BuiltinFunctionType) if isinstance(f, types.FunctionType): @functools.wraps(f) def _f_func(self, args): return f(self, *args) return _f_func elif isinstance(f, (type, types.BuiltinFunctionType)): @functools.wraps(f) def _f_builtin(_self, args): return f(*args) return _f_builtin elif isinstance(f, types.MethodType): @functools.wraps(f.__func__) def _f(self, args): return f.__func__(self, *args) return _f else: @functools.wraps(f.__call__.__func__) def _f(self, args): return f.__call__.__func__(self, *args) return _f
def gen_tag(cls, tag, spec): """generate readers and writers for the tag""" tag_type = spec[0] if type(tag_type) is types.TypeType and \ autoxml.basic_cons_map.has_key(tag_type): def readtext(node, tagpath): #print 'read tag', node, tagpath return getNodeText(node, tagpath) def writetext(node, tagpath, text): #print 'write tag', node, tagpath, text addText(node, tagpath, text.encode('utf8')) return cls.gen_anon_basic(tag, spec, readtext, writetext) elif type(tag_type) is types.ListType: return cls.gen_list_tag(tag, spec) elif tag_type is LocalText: return cls.gen_insetclass_tag(tag, spec) elif type(tag_type) is autoxml or type(tag_type) is types.TypeType: return cls.gen_class_tag(tag, spec) else: raise Error(_('gen_tag: unrecognized tag type %s in spec') % str(tag_type))
def _check_type(self): if self.type is None: if self.action in self.ALWAYS_TYPED_ACTIONS: if self.choices is not None: # The "choices" attribute implies "choice" type. self.type = "choice" else: # No type given? "string" is the most sensible default. self.type = "string" else: # Allow type objects or builtin type conversion functions # (int, str, etc.) as an alternative to their names. (The # complicated check of __builtin__ is only necessary for # Python 2.1 and earlier, and is short-circuited by the # first check on modern Pythons.) import __builtin__ if ( type(self.type) is types.TypeType or (hasattr(self.type, "__name__") and getattr(__builtin__, self.type.__name__, None) is self.type) ): self.type = self.type.__name__ if self.type == "str": self.type = "string" if self.type not in self.TYPES: raise OptionError("invalid option type: %r" % self.type, self) if self.action not in self.TYPED_ACTIONS: raise OptionError( "must not supply a type for action %r" % self.action, self)
def register_entry_points_plugins(entry_point="girlfriend.plugin"): """???entry_point?????????????? :param entry_point: ?????entry_point """ global plugin_manager third_party_plugin_mgr = extension.ExtensionManager( namespace=entry_point, invoke_on_load=False) for ext in third_party_plugin_mgr: plugin_object, plugin = ext.plugin, None # ?????? if isinstance(plugin_object, Plugin): plugin = plugin_object # ???? elif isinstance(plugin_object, types.ModuleType): plugin = Plugin.wrap_module(plugin_object) # ???? elif isinstance(plugin_object, types.FunctionType): plugin = Plugin.wrap_function( ext.name, plugin_object.__doc__, plugin_object) # ??? elif isinstance(plugin_object, (types.ClassType, types.TypeType)): plugin = Plugin.wrap_class(plugin_object) # ????????? else: raise InvalidPluginException( u"?? '{}' ??? '{}'????".format( ext.name, type(plugin_object).__name__) ) plugin_manager.register(plugin)
def sort_list(l, key, reverse=False): return l.sort(key=key, reverse=reverse) # In Python 3.x, all objects are "new style" objects descended from 'type', and # thus types.ClassType and types.TypeType don't exist anymore. For # compatibility, we make sure they still work.
def _python_type(t): """returns the type corresponding to a certain Python type""" if not isinstance(t, _types.TypeType): t = type(t) return allTypes[_python_types.get(t, 'object_')]
def _ensureCompatLoaded(): if not _versionsMap: def flattenClassTree(tree): root = tree[0][0] assert isinstance(root, types.TypeType), root yield root if len(tree) > 1: assert len(tree) == 2 rest = tree[1] assert isinstance(rest, types.ListType), rest for c in flattenClassTree(rest): yield c classes = [] regex = re.compile(r'^MxCompatibility[0-9a-z]*$') for name, clazz in inspect.getmembers(sys.modules[__name__], inspect.isclass): m = regex.match(name) if m: classes.append(clazz) previousVersion = None for clazz in flattenClassTree(inspect.getclasstree(classes)): if clazz == object: continue assert previousVersion is None or previousVersion < clazz.version() previousVersion = clazz.version() _versionsMap[previousVersion] = clazz()
def _build_query(self, cls, filters, limit, order_by): import types if len(filters) > 4: raise Exception('Too many filters, max is 4') parts = [] properties = cls.properties(hidden=False) for filter, value in filters: name, op = filter.strip().split() found = False for property in properties: if property.name == name: found = True if types.TypeType(value) == list: filter_parts = [] for val in value: val = self.encode_value(property, val) filter_parts.append("'%s' %s '%s'" % (name, op, val)) parts.append("[%s]" % " OR ".join(filter_parts)) else: value = self.encode_value(property, value) parts.append("['%s' %s '%s']" % (name, op, value)) if not found: raise Exception('%s is not a valid field' % name) if order_by: if order_by.startswith("-"): key = order_by[1:] type = "desc" else: key = order_by type = "asc" parts.append("['%s' starts-with ''] sort '%s' %s" % (key, key, type)) return ' intersection '.join(parts)
def _setup_composite_listener(): import types def _listen_for_type(mapper, class_): for prop in mapper.iterate_properties: if (hasattr(prop, 'composite_class') and (type(prop.composite_class) in (types.ClassType, types.TypeType)) and issubclass(prop.composite_class, MutableComposite)): prop.composite_class._listen_on_attribute( getattr(class_, prop.key), False, class_) if not Mapper.dispatch.mapper_configured._contains(Mapper, _listen_for_type): event.listen(Mapper, 'mapper_configured', _listen_for_type)
def reconstructor(cls, n): if isinstance(n, types.FunctionType): return lambda f: f if isinstance(n, types.MethodType): return lambda f: types.MethodType(f, n.im_self, n.im_class) if isinstance(n, (staticmethod,classmethod)): return lambda f: type(n)(f) if isinstance(n, types.InstanceType): return lambda f: types.InstanceType(type(n), dict(f.__dict__)) if isinstance(n, (types.TypeType,types.ClassType)): return lambda f: type(n)(n.__name__, n.__bases__, dict(f.__dict__)) raise NotImplementedError, type(func)
def load(self, testname ): '''?????????? :param name: ??????? :type name: string :returns list - ???????? ''' self._module_errs = {} if settings.DATA_DRIVE: self._dataset = TestDataLoader().load() if '/' in testname: testname, datakeyname = testname.split('/', 1) else: datakeyname = None obj = self._load(testname) testcases = [] if isinstance(obj, types.ModuleType): if hasattr(obj, '__path__'): testcases = self._load_from_package(obj) else: testcases = self._load_from_module(obj) elif isinstance(obj, types.TypeType): testcases = self._load_from_class(obj) #???????? testcase_dict = {} for testcase in testcases: if datakeyname and str(testcase.casedataname) != datakeyname: continue testcase_dict[testcase.test_name] = testcase return testcase_dict.values()
def _is_testcase_class(self, obj ): '''???????? :returns bool - ?????? ''' return ( isinstance(obj, types.TypeType) and issubclass(obj, TestCase) and hasattr(obj, "runTest") and getattr(obj, "priority", None))
def Instance2Str(o, d): """ Convert an Instance to a string representation. If the __str__() method produces acceptable output, then you don't need to add the class to conversions; it will be handled by the default converter. If the exact class is not found in d, it will use the first class it can find for which o is an instance. """ if d.has_key(o.__class__): return d[o.__class__](o, d) cl = filter(lambda x,o=o: type(x) is types.ClassType and isinstance(o, x), d.keys()) if not cl and hasattr(types, 'ObjectType'): cl = filter(lambda x,o=o: type(x) is types.TypeType and isinstance(o, x) and d[x] is not Instance2Str, d.keys()) if not cl: return d[types.StringType](o,d) d[o.__class__] = d[cl[0]] return d[cl[0]](o, d)
def get_classes(mod, metaclass=None): """""" if metaclass == None: metaclass = tuple([types.TypeType, types.ClassType]) for i in get_callables(mod): if isinstance(i, metaclass): yield i
def __clean_value(self, value): if isinstance(value, (list, tuple,)): value = [self.__clean_value(item) for item in value] elif isinstance(value, (dict, OrderedDict,)): for key in value: value[key] = self.__clean_value(value.get(key)) elif (isinstance(value, (int, long, bool, types.TypeType,)) or value is None): pass else: value = ugettext(value) return value
def _IsTestCaseClass(test_class): return (type(test_class) is types.TypeType and issubclass(test_class, test_case.HostDrivenTestCase) and test_class is not test_case.HostDrivenTestCase)
def is_probably_safe(x): ''' Objects are probably "safe" (unlikly to change) if they are -- immutable -- functions -- modules Obviously, the latter two may change, but in practice it is likely ok. Still, risky! ''' if is_immutable(x): return True if sys.version_info > (3,): probably_fine = (\ types.LambdaType, types.BuiltinMethodType, types.BuiltinFunctionType, types.FunctionType, types.ModuleType, types.MethodType) else: probably_fine = (\ types.LambdaType, types.InstanceType, types.NoneType, types.NotImplementedType, types.TypeType, types.UnicodeType, types.ComplexType, types.ClassType, types.BuiltinMethodType, types.BuiltinFunctionType, types.FunctionType, types.ModuleType, types.MethodType) if type(x) in probably_fine: return True if hasattr(x,'__call__'): return True return False
def isClass(var): return isinstance(var, (type, types.ClassType, types.TypeType))
def spawn(self, aid, klass, *param, **kparam): ''' This method creates an actor attached to this host. It will be an instance of the class *klass* and it will be assigned an ID that identifies it among the host. This method can be called remotely synchronously. :param str. aid: identifier for the spawning actor. Unique within the host. :param class klass: class type of the spawning actor. If you are spawning remotely and the class is not in the server module, you must specify here the path to that class in the form 'module.py/Class' so the server can import the class and create the instance. :param param-kparam: arguments for the init function of the spawning actor class. :return: :class:`~.Proxy` of the actor spawned. :raises: :class:`AlreadyExistsError`, if the ID specified is already in use. :raises: :class:`HostDownError` if the host is not initiated. ''' if param is None: param = [] if not self.alive: raise HostDownError() if isinstance(klass, basestring): module, klass = klass.split('/') module_ = __import__(module, globals(), locals(), [klass], -1) klass_ = getattr(module_, klass) elif isinstance(klass, (types.TypeType, types.ClassType)): klass_ = klass url = '%s://%s/%s' % (self.transport, self.host_url.netloc, aid) if url in self.actors.keys(): raise AlreadyExistsError(url) else: obj = klass_(*param, **kparam) obj.id = aid obj.url = url if self.running: obj.host = self.proxy # else: # obj.host = Exception("Host is not an active actor. \ # Use 'init_host' to make it alive.") if hasattr(klass_, '_parallel') and klass_._parallel: new_actor = parallels.ActorParallel(url, klass_, obj) lock = new_actor.get_lock() self.locks[url] = lock else: new_actor = actor.Actor(url, klass_, obj) obj.proxy = Proxy(new_actor) self.launch_actor(url, new_actor) return Proxy(new_actor)
def lookup_url(self, url, klass, module=None): ''' Gets a proxy reference to the actor indicated by the URL in the parameters. It can be a local reference or a remote direction to another host. This method can be called remotely synchronously. :param srt. url: address that identifies an actor. :param class klass: the class of the actor. :param srt. module: if the actor class is not in the calling module, you need to specify the module where it is here. Also, the *klass* parameter change to be a string. :return: :class:`~.Proxy` of the actor requested. :raises: :class:`NotFoundError`, if the URL specified do not correspond to any actor in the host. :raises: :class:`HostDownError` if the host is down. :raises: :class:`HostError` if there is an error looking for the actor in another server. ''' if not self.alive: raise HostDownError() aurl = urlparse(url) if self.is_local(aurl): if url not in self.actors.keys(): raise NotFoundError(url) else: return Proxy(self.actors[url]) else: try: dispatcher = self.actors[aurl.scheme] if module is not None: try: module_ = __import__(module, globals(), locals(), [klass], -1) klass_ = getattr(module_, klass) except Exception, e: raise HostError("At lookup_url: " + "Import failed for module " + module + ", class " + klass + ". Check this values for the lookup." + " ERROR: " + str(e)) elif isinstance(klass, (types.TypeType, types.ClassType)): klass_ = klass else: raise HostError("The class specified to look up is" + " not a class.") remote_actor = actor.ActorRef(url, klass_, dispatcher.channel) return Proxy(remote_actor) except HostError: raise except Exception, e: raise HostError("ERROR looking for the actor on another " + "server. Hosts must " + "be in http to work properly. " + str(e))
def get_arg_text(ob): """Get a string describing the arguments for the given object, only if it is callable.""" arg_text = "" if ob is not None and hasattr(ob, '__call__'): arg_offset = 0 if type(ob) in (types.ClassType, types.TypeType): # Look for the highest __init__ in the class chain. fob = _find_constructor(ob) if fob is None: fob = lambda: None else: arg_offset = 1 elif type(ob)==types.MethodType: # bit of a hack for methods - turn it into a function # but we drop the "self" param. fob = ob.im_func arg_offset = 1 else: fob = ob # Try to build one for Python defined functions if type(fob) in [types.FunctionType, types.LambdaType]: argcount = fob.func_code.co_argcount real_args = fob.func_code.co_varnames[arg_offset:argcount] defaults = fob.func_defaults or [] defaults = list(map(lambda name: "=%s" % repr(name), defaults)) defaults = [""] * (len(real_args) - len(defaults)) + defaults items = map(lambda arg, dflt: arg + dflt, real_args, defaults) if fob.func_code.co_flags & 0x4: items.append("...") if fob.func_code.co_flags & 0x8: items.append("***") arg_text = ", ".join(items) arg_text = "(%s)" % re.sub("\.\d+", "<tuple>", arg_text) # See if we can use the docstring doc = getattr(ob, "__doc__", "") if doc: doc = doc.lstrip() pos = doc.find("\n") if pos < 0 or pos > 70: pos = 70 if arg_text: arg_text += "\n" arg_text += doc[:pos] return arg_text ################################################# # # Test code #