我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用types.ClassType()。
def struct_fields(value): if isinstance(value, types.ClassType) or hasattr(value, '__bases__'): clazz = value else: clazz = type(value) # Try to get the correct field ordering, if it is available, otherwise # just look at the class dictionary to find the fields. fields = getattr(clazz, '__fields', None) if fields is None: fields = [p for p in clazz.__dict__.itervalues() if isinstance(p, simple_property)] fields += [p for p in clazz.__dict__.itervalues() if isinstance(p, simpleseq_property)] members = inspect.getmembers(value) for member in members: if isinstance(member[1], simple_property) or isinstance(member[1], simpleseq_property): foundMatch = False for field in fields: if member[1].id_ == field.id_: foundMatch = True break if not foundMatch: fields += [member[1]] return fields
def runTests(self): if self.catchbreak: installHandler() if self.testRunner is None: self.testRunner = runner.TextTestRunner if isinstance(self.testRunner, (type, types.ClassType)): try: testRunner = self.testRunner(verbosity=self.verbosity, failfast=self.failfast, buffer=self.buffer) except TypeError: # didn't accept the verbosity, buffer or failfast arguments testRunner = self.testRunner() else: # it is assumed to be a TestRunner instance testRunner = self.testRunner self.result = testRunner.run(self.test) if self.exit: sys.exit(not self.result.wasSuccessful())
def latestVersionOf(self, anObject): """Get the latest version of an object. This can handle just about anything callable; instances, functions, methods, and classes. """ t = type(anObject) if t == types.FunctionType: return latestFunction(anObject) elif t == types.MethodType: if anObject.im_self is None: return getattr(anObject.im_class, anObject.__name__) else: return getattr(anObject.im_self, anObject.__name__) elif t == types.InstanceType: # Kick it, if it's out of date. getattr(anObject, 'nothing', None) return anObject elif t == types.ClassType: return latestClass(anObject) else: log.msg('warning returning anObject!') return anObject
def loadAnything(self, thing, recurse=False): """ Given a Python object, return whatever tests that are in it. Whatever 'in' might mean. @param thing: A Python object. A module, method, class or package. @param recurse: Whether or not to look in subpackages of packages. Defaults to False. @return: A C{TestCase} or C{TestSuite}. """ if isinstance(thing, types.ModuleType): if isPackage(thing): return self.loadPackage(thing, recurse) return self.loadModule(thing) elif isinstance(thing, types.ClassType): return self.loadClass(thing) elif isinstance(thing, type): return self.loadClass(thing) elif isinstance(thing, types.MethodType): return self.loadMethod(thing) raise TypeError("No loader for %r. Unrecognized type" % (thing,))
def minimalBases(classes): """Reduce a list of base classes to its ordered minimum equivalent""" classes = [c for c in classes if c is not ClassType] candidates = [] for m in classes: for n in classes: if issubclass(n,m) and m is not n: break else: # m has no subclasses in 'classes' if m in candidates: candidates.remove(m) # ensure that we're later in the list candidates.append(m) return candidates
def get_formatted_content(self, pyobj): tc = type(pyobj) if isinstance(pyobj, object): tc = pyobj.__class__ if hasattr(pyobj, 'typecode'): #serializer = pyobj.typecode.serialmap.get(tc) serializer = pyobj.typecode else: serializer = Any.serialmap.get(tc) if not serializer: tc = (types.ClassType, pyobj.__class__.__name__) serializer = Any.serialmap.get(tc) else: serializer = Any.serialmap.get(tc) if not serializer and isinstance(pyobj, time.struct_time): from pysphere.ZSI.TCtimes import gDateTime serializer = gDateTime() if serializer: return serializer.get_formatted_content(pyobj) raise EvaluateException('Failed to find serializer for pyobj %s' %pyobj)
def load_component(self, class_name): """ Loads and adds new component to editor. Returns component instance. """ if class_name in self.loaded_components: return self.loaded_components[class_name] mod = importlib.import_module("scc.gui.ae.%s" % (class_name,)) for x in mod.__all__: cls = getattr(mod, x) if isinstance(cls, (type, types.ClassType)) and issubclass(cls, AEComponent): if cls is not AEComponent: instance = cls(self.app, self) self.loaded_components[class_name] = instance self.components.append(instance) return instance
def register(cls, action): action_logger.info("Registering action :%s" % action) if isinstance(action, (types.FunctionType, staticmethod)): name = action.__name__ cls._actions[name.upper()] = action setattr(cls, name, action) elif isinstance(action, types.ClassType) and hasattr(action, "__call__"): name = action.__name__ action = action() cls._actions[name.upper()] = action setattr(cls, name, action) elif (isinstance(action, (types.InstanceType, types.ObjectType)) and hasattr(action, "__call__")): if isinstance(action, type): name = action.__name__ action = action() else: name = action.__class__.__name__ cls._actions[name.upper()] = action setattr(cls, name, action) else: name = str(action) action_logger.error("Error registering action :%s" % action) raise UnknownAction("unable to register action %s!!" %name)
def register(action_klass): """ Registers a chaos action class with Action class Args: action_klass: Returns: """ def _register(chaos_klass): if chaos_klass.enabled: name = chaos_klass.__name__ if isinstance(chaos_klass, (type, types.ClassType)): chaos_klass = chaos_klass() setattr(action_klass, name, chaos_klass) action_klass._actions.update([(name.upper(), chaos_klass)]) return chaos_klass return _register
def register(cls, event): exe_logger.info("Registering event :%s" % event) if isinstance(event, (types.FunctionType, staticmethod)): name = event.__name__ cls._chaos_events[name.upper()] = event setattr(cls, name, event) elif isinstance(event, types.ClassType) and hasattr(event, "__call__"): name = event.__name__ event = event() cls._chaos_events[name.upper()] = event setattr(cls, name, event) elif (isinstance(event, (types.InstanceType, types.ObjectType)) and hasattr(event, "__call__")): if isinstance(event, type): name = event.__name__ event = event() else: name = event.__class__.__name__ cls._chaos_events[name.upper()] = event setattr(cls, name, event) else: name = str(event) exe_logger.error("Error registering event :%s" % event) raise UnknownChaosEvent("unable to register event %s!!" % name)
def register(executor_klass): """ Registers a chaos event class with ChaosExecutor class Args: executor_klass: Returns: """ def _register(chaos_klass): if chaos_klass.enabled: name = chaos_klass.__name__ if isinstance(chaos_klass, (type, types.ClassType)): chaos_klass = chaos_klass() setattr(executor_klass, name, chaos_klass) # event_klass._chaos_events.update([(name.upper(), chaos_klass)]) executor_klass.update_events([(name.upper(), chaos_klass)]) return chaos_klass return _register
def simplefilter(self, action, category=Warning, lineno=0, append=0, caller_id = 0, severity=0): with self.warn_lock: assert action in ("error", "ignore", "always", "default", "module", "once"), "invalid action: %r" % (action,) assert isinstance(category, (type, types.ClassType)), \ "category must be a class" assert issubclass(category, Warning), "category must be a Warning subclass" assert isinstance(lineno, int) and lineno >= 0, \ "lineno must be an int >= 0" item = (action, None, category, None, lineno, caller_id, severity) if item in self.filters: self.filters.remove(item) if append: self.filters.append(item) else: self.filters.insert(0, item)
def filterwarnings(self, action, message="", category=Warning, module="", lineno=0, append=0, caller_id = 0, severity=0): with self.warn_lock: assert action in ("error", "ignore", "always", "default", "module", "once"), "invalid action: %r" % (action,) assert isinstance(message, basestring), "message must be a string" assert isinstance(category, (type, types.ClassType)), \ "category must be a class" assert issubclass(category, Warning), "category must be a Warning subclass" assert isinstance(module, basestring), "module must be a string" assert isinstance(lineno, int) and lineno >= 0, \ "lineno must be an int >= 0" item = (action, re.compile(message, re.I), category, re.compile(module), lineno, caller_id, severity) if item in self.filters: self.filters.remove(item) if append: self.filters.append(item) else: self.filters.insert(0, item) # end _Warnings()
def call_async(self, fn, *args, **kwargs): LOG.debug('%r.call_async(%r, *%r, **%r)', self, fn, args, kwargs) if isinstance(fn, types.MethodType) and \ isinstance(fn.im_self, (type, types.ClassType)): klass = fn.im_self.__name__ else: klass = None recv = self.send_async( mitogen.core.Message.pickled( (fn.__module__, klass, fn.__name__, args, kwargs), handle=mitogen.core.CALL_FUNCTION, ) ) recv.raise_channelerror = False return recv
def addDataSource(self, source=SampleDataSource, name='default', sourceModule=None, sourceArgs=[], sourceKwargs={}): if name in self._sources: raise Exception('Data source "%s" already exists!' % name) if utils.isstr(source): if utils.isstr(sourceModule): sourceModule = __import__(sourceModule) if sourceModule is not None: source = sourceModule.__dict__[source] elif type(source) in [ types.ClassType, types.TypeType]: source = source(*sourceArgs, **sourceKwargs) cds = weakref.WeakSet() self._sources[name] = (source, cds) self._lastSamples[name] = source.initialSamples() for cd in list(self._lostCurveDatas): if self._tryAddToDataSource(cd, name): self._lostCurveDatas.remote(cds)
def skip(reason): """Unconditionally skip a test.""" import types def decorator(test_item): #if not isinstance(test_item, (type, types.ClassType)): # @functools.wraps(test_item) # def skip_wrapper(*args, **kwargs): # raise SkipTest(reason) # test_item = skip_wrapper # #test_item.__unittest_skip__ = True #test_item.__unittest_skip_why__ = reason #return test_item # In older version of Python, tracking skipped tests is # problematic since the test loader and runner handle this # internally. For this reason, this compatibility wrapper # simply wraps skipped tests in a functoin that passes # silently: @functools.wraps(test_item) def skip_wrapper(*args, **kwargs): pass return skip_wrapper return decorator
def validate_type(arg_name, arg, valid_types): if long_exists: if valid_types is int: valid_types = (int, long) elif type(valid_types) is list and int in valid_types and long not in valid_types: valid_types.append(long) if type(valid_types) is list: valid_types = tuple(valid_types) if (type(valid_types) is type or # single type, not array of types type(valid_types) is tuple or # several valid types as tuple type(valid_types) is types.ClassType): # old style class if isinstance(arg, valid_types): return raise STLTypeError(arg_name, type(arg), valid_types) else: raise STLError('validate_type: valid_types should be type or list or tuple of types') # throws STLError if not exactly one argument is present
def configure_custom(self, config): """Configure an object with a user-supplied factory.""" c = config.pop('()') if not hasattr(c, '__call__') and hasattr(types, 'ClassType') and type(c) != types.ClassType: c = self.resolve(c) props = config.pop('.', None) # Check for valid identifiers kwargs = dict((k, config[k]) for k in config if valid_ident(k)) result = c(**kwargs) if props: for name, value in props.items(): setattr(result, name, value) return result
def loadTestsFromName(self, name, module=None): """Return a suite of all tests cases given a string specifier. The name may resolve either to a module, a test case class, a test method within a test case class, or a callable object which returns a TestCase or TestSuite instance. The method optionally resolves the names relative to a given module. """ parent, obj = self._resolveName(name, module) if type(obj) == types.ModuleType: return self.loadTestsFromModule(obj) elif (isinstance(obj, (type, types.ClassType)) and issubclass(obj, unittest.TestCase)): return self.loadTestsFromTestCase(obj) elif (type(obj) == types.UnboundMethodType and isinstance(parent, (type, types.ClassType)) and issubclass(parent, unittest.TestCase)): return self.loadSpecificTestFromTestCase(parent, obj.__name__) elif isinstance(obj, unittest.TestSuite): return obj elif hasattr(obj, '__call__'): test = obj() if isinstance(test, unittest.TestSuite): return test elif isinstance(test, unittest.TestCase): return self.suiteClass([test]) else: raise TypeError("calling %s returned %s, not a test" % (obj, test)) else: raise TypeError("don't know how to make test from: %s" % obj)
def __init__(self, id_, structdef, name=None, mode="readwrite", configurationkind=("configure","property"), description=None, fget=None, fset=None, fval=None): """Construct a property of type struct. @param structdef the structure definition, which is any class with simple_property attributes """ # struct properties have been extended to support multiple kinds, # similar to simple and simplesequence. For backwards compatibility, # convert string values into tuples. if isinstance(configurationkind, str): configurationkind = (configurationkind,) _property.__init__(self, id_, None, name, None, mode, "external", configurationkind, description, fget, fset, fval) if type(structdef) is types.ClassType: raise ValueError("structdef must be a new-style python class (i.e. inherits from object)") self.structdef = structdef self.fields = {} # Map field id's to attribute names for name, attr in self.structdef.__dict__.items(): if type(attr) is simple_property: self.fields[attr.id_] = (name, attr) elif type(attr) is simpleseq_property: self.fields[attr.id_] = (name, attr)
def __init__(self, id_, structdef, name=None, defvalue=None, mode="readwrite", configurationkind=("configure","property"), description=None, fget=None, fset=None, fval=None): # structsequence properties have been extended to support multiple # kinds, similar to simple and simplesequence. For backwards # compatibility, convert string values into tuples. if isinstance(configurationkind, str): configurationkind = (configurationkind,) _sequence_property.__init__(self, id_, None, name, None, mode, "external", configurationkind, description, fget, fset, fval) if type(structdef) is types.ClassType: raise ValueError("structdef must be a new-style python class (i.e. inherits from object)") self.structdef = structdef self.fields = {} # Map field id's to attribute names for name, attr in self.structdef.__dict__.items(): if type(attr) is simple_property: self.fields[attr.id_] = (name, attr) elif type(attr) is simpleseq_property: self.fields[attr.id_] = (name, attr) self.defvalue = defvalue
def str_to_class(s): if s in globals() and isinstance(globals()[s], types.ClassType): return globals()[s] return None
def filterwarnings(action, message="", category=Warning, module="", lineno=0, append=0): """Insert an entry into the list of warnings filters (at the front). 'action' -- one of "error", "ignore", "always", "default", "module", or "once" 'message' -- a regex that the warning message must match 'category' -- a class that the warning must be a subclass of 'module' -- a regex that the module name must match 'lineno' -- an integer line number, 0 matches all warnings 'append' -- if true, append to the list of filters """ import re assert action in ("error", "ignore", "always", "default", "module", "once"), "invalid action: %r" % (action,) assert isinstance(message, basestring), "message must be a string" assert isinstance(category, (type, types.ClassType)), \ "category must be a class" assert issubclass(category, Warning), "category must be a Warning subclass" assert isinstance(module, basestring), "module must be a string" assert isinstance(lineno, int) and lineno >= 0, \ "lineno must be an int >= 0" item = (action, re.compile(message, re.I), category, re.compile(module), lineno) if append: filters.append(item) else: filters.insert(0, item)
def build_opener(*handlers): """Create an opener object from a list of handlers. The opener will use several default handlers, including support for HTTP, FTP and when applicable, HTTPS. If any of the handlers passed as arguments are subclasses of the default handlers, the default handlers will not be used. """ import types def isclass(obj): return isinstance(obj, (types.ClassType, type)) opener = OpenerDirector() default_classes = [ProxyHandler, UnknownHandler, HTTPHandler, HTTPDefaultErrorHandler, HTTPRedirectHandler, FTPHandler, FileHandler, HTTPErrorProcessor] if hasattr(httplib, 'HTTPS'): default_classes.append(HTTPSHandler) skip = set() for klass in default_classes: for check in handlers: if isclass(check): if issubclass(check, klass): skip.add(klass) elif isinstance(check, klass): skip.add(klass) for klass in skip: default_classes.remove(klass) for klass in default_classes: opener.add_handler(klass()) for h in handlers: if isclass(h): h = h() opener.add_handler(h) return opener
def configure_custom(self, config): """Configure an object with a user-supplied factory.""" c = config.pop('()') if not hasattr(c, '__call__') and hasattr(types, 'ClassType') and type(c) != types.ClassType: c = self.resolve(c) props = config.pop('.', None) # Check for valid identifiers kwargs = dict([(k, config[k]) for k in config if valid_ident(k)]) result = c(**kwargs) if props: for name, value in props.items(): setattr(result, name, value) return result
def __str__(self): exc = self.exc if type(exc) is types.ClassType: exc = exc.__name__ return 'problem in %s - %s: %s' % (self.filename, exc, self.value)
def register(cls, subclass): """Register a virtual subclass of an ABC.""" if not isinstance(subclass, (type, types.ClassType)): raise TypeError("Can only register classes") if issubclass(subclass, cls): return # Already a subclass # Subtle: test for cycles *after* testing for "already a subclass"; # this means we allow X.register(X) and interpret it as a no-op. if issubclass(cls, subclass): # This would create a cycle, which is bad for the algorithm below raise RuntimeError("Refusing to create an inheritance cycle") cls._abc_registry.add(subclass) ABCMeta._abc_invalidation_counter += 1 # Invalidate negative cache
def pickle(ob_type, pickle_function, constructor_ob=None): if type(ob_type) is _ClassType: raise TypeError("copy_reg is not intended for use with classes") if not hasattr(pickle_function, '__call__'): raise TypeError("reduction functions must be callable") dispatch_table[ob_type] = pickle_function # The constructor_ob function is a vestige of safe for unpickling. # There is no reason for the caller to pass it anymore. if constructor_ob is not None: constructor(constructor_ob)
def _get_service_func(self, method, params): """ @raise InvalidServiceMethodError: Calls to private methods are not allowed. @raise UnknownServiceMethodError: Unknown method. @raise InvalidServiceMethodError: Service method must be callable. """ service = None if isinstance(self.service, (type, types.ClassType)): service = self.service() else: service = self.service if method is not None: method = str(method) if method.startswith('_'): raise InvalidServiceMethodError( "Calls to private methods are not allowed") try: func = getattr(service, method) except AttributeError: raise UnknownServiceMethodError( "Unknown method %s" % str(method)) if not python.callable(func): raise InvalidServiceMethodError( "Service method %s must be callable" % str(method)) return func if not python.callable(service): raise UnknownServiceMethodError( "Unknown method %s" % str(self.service)) return service
def register_entry_points_plugins(entry_point="girlfriend.plugin"): """???entry_point?????????????? :param entry_point: ?????entry_point """ global plugin_manager third_party_plugin_mgr = extension.ExtensionManager( namespace=entry_point, invoke_on_load=False) for ext in third_party_plugin_mgr: plugin_object, plugin = ext.plugin, None # ?????? if isinstance(plugin_object, Plugin): plugin = plugin_object # ???? elif isinstance(plugin_object, types.ModuleType): plugin = Plugin.wrap_module(plugin_object) # ???? elif isinstance(plugin_object, types.FunctionType): plugin = Plugin.wrap_function( ext.name, plugin_object.__doc__, plugin_object) # ??? elif isinstance(plugin_object, (types.ClassType, types.TypeType)): plugin = Plugin.wrap_class(plugin_object) # ????????? else: raise InvalidPluginException( u"?? '{}' ??? '{}'????".format( ext.name, type(plugin_object).__name__) ) plugin_manager.register(plugin)
def skip(reason): """ Unconditionally skip a test. """ def decorator(test_item): if not isinstance(test_item, (type, types.ClassType)): @functools.wraps(test_item) def skip_wrapper(*args, **kwargs): raise SkipTest(reason) test_item = skip_wrapper test_item.__unittest_skip__ = True test_item.__unittest_skip_why__ = reason return test_item return decorator
def isinst(inst,clazz): if type(inst) != types.InstanceType or type(clazz)!=types.ClassType: return isinstance(inst,clazz) cl = inst.__class__ cl2 = getcurrent(cl) clazz = getcurrent(clazz) if issubclass(cl2,clazz): if cl == cl2: return WAS else: inst.__class__ = cl2 return IS else: return ISNT
def __call__(self, request, node, model, viewName): """Get a name from my namespace. """ from twisted.web.woven import widgets if viewName == "None": return widgets.DefaultWidget(model) vc = getattr(self.namespace, viewName, None) # don't call modules and random crap in the namespace, only widgets if vc and isinstance(vc, (type, ClassType)) and issubclass(vc, widgets.Widget): return vc(model)
def loadClass(self, klass): """ Given a class which contains test cases, return a sorted list of C{TestCase} instances. """ if not (isinstance(klass, type) or isinstance(klass, types.ClassType)): raise TypeError("%r is not a class" % (klass,)) if not isTestCase(klass): raise ValueError("%r is not a test case" % (klass,)) names = self.getTestCaseNames(klass) tests = self.sort([self._makeCase(klass, self.methodPrefix+name) for name in names]) return self.suiteFactory(tests)
def instance(klass, d): if isinstance(klass, types.ClassType): return new.instance(klass, d) elif isinstance(klass, type): o = object.__new__(klass) o.__dict__ = d return o else: raise TypeError, "%s is not a class" % klass
def _maybeClass(classnamep): try: object except NameError: isObject = 0 else: isObject = isinstance(classnamep, type) if isinstance(classnamep, ClassType) or isObject: return qual(classnamep) return classnamep
def setUnjellyableForClassTree(module, baseClass, prefix=None): """ Set all classes in a module derived from C{baseClass} as copiers for a corresponding remote class. When you have a heirarchy of Copyable (or Cacheable) classes on one side, and a mirror structure of Copied (or RemoteCache) classes on the other, use this to setCopierForClass all your Copieds for the Copyables. Each copyTag (the \"classname\" argument to getTypeToCopyFor, and what the Copyable's getTypeToCopyFor returns) is formed from adding a prefix to the Copied's class name. The prefix defaults to module.__name__. If you wish the copy tag to consist of solely the classname, pass the empty string \'\'. @param module: a module object from which to pull the Copied classes. (passing sys.modules[__name__] might be useful) @param baseClass: the base class from which all your Copied classes derive. @param prefix: the string prefixed to classnames to form the unjellyableRegistry. """ if prefix is None: prefix = module.__name__ if prefix: prefix = "%s." % prefix for i in dir(module): i_ = getattr(module, i) if type(i_) == types.ClassType: if issubclass(i_, baseClass): setUnjellyableForClass('%s%s' % (prefix, i), i_)
def _unjelly_class(self, rest): clist = string.split(rest[0], '.') modName = string.join(clist[:-1], '.') if not self.taster.isModuleAllowed(modName): raise InsecureJelly("module %s not allowed" % modName) klaus = namedObject(rest[0]) if type(klaus) is not types.ClassType: raise InsecureJelly("class %s unjellied to something that isn't a class: %s" % (repr(rest[0]), repr(klaus))) if not self.taster.isClassAllowed(klaus): raise InsecureJelly("class not allowed: %s" % qual(klaus)) return klaus