我们从Python开源项目中,提取了以下31个代码示例,用于说明如何使用new.instancemethod()。
def __get__(self, instance, cls): """Descriptor protocol support. This makes an instance of this class function correctly when it is used to decorate a method on a user-defined class. If called as a bound method, we store the decorated function in the instance dictionary, so we will not be called again for that instance. If called as an unbound method, we store a reference to the decorated function internally and use it on future unbound method calls. """ if instance: deco = instancemethod(self._decorated(cls, instance), instance, cls) # This prevents us from being called again for this instance setattr(instance, self._func.__name__, deco) elif cls: if not self.__clsfunc: self.__clsfunc = instancemethod(self._decorated(cls), None, cls) deco = self.__clsfunc else: raise ValueError("Must supply instance or class to descriptor.") return deco
def _unjelly_method(self, rest): ''' (internal) unjelly a method ''' im_name = rest[0] im_self = self.unjelly(rest[1]) im_class = self.unjelly(rest[2]) if type(im_class) is not types.ClassType: raise InsecureJelly("Method found with non-class class.") if im_class.__dict__.has_key(im_name): if im_self is None: im = getattr(im_class, im_name) elif isinstance(im_self, NotKnown): im = _InstanceMethod(im_name, im_self, im_class) else: im = instancemethod(im_class.__dict__[im_name], im_self, im_class) else: raise 'instance method changed' return im
def __run(self): global count count += 1 print '%s Epg Downloaded' % count self['progress'].setValue(count) if count > config.plugins.ldteam.epgmhw2wait.value: self.ctimer.stop() self.session.nav.playService(eServiceReference(config.tv.lastservice.value)) rDialog.stopDialog(self.session) epgcache = new.instancemethod(_enigma.eEPGCache_load, None, eEPGCache) epgcache = eEPGCache.getInstance().save() if config.osd.language.value == 'es_ES': self.mbox = self.session.open(MessageBox, _('Epg Actualizado'), MessageBox.TYPE_INFO, timeout=5) else: self.mbox = self.session.open(MessageBox, _('Updated Epg'), MessageBox.TYPE_INFO, timeout=5) from Screens.Standby import inStandby if inStandby: self.session.nav.stopService() self.close() return
def setUp(self): SpamcontrolTestCase.setUp(self) with open(b3.getAbsolutePath('@b3/conf/plugin_spamcontrol.ini')) as default_conf: self.init_plugin(default_conf.read()) self.joe = FakeClient(self.console, name="Joe", exactName="Joe", guid="zaerezarezar", groupBits=1) self.joe.connects("1") # let's say our game has a new event : EVT_CLIENT_RADIO EVT_CLIENT_RADIO = self.console.Events.createEvent('EVT_CLIENT_RADIO', 'Event client radio') # teach the Spamcontrol plugin how to react on such events def onRadio(this, event): new_event = Event(type=event.type, client=event.client, target=event.target, data=event.data['text']) this.onChat(new_event) self.p.onRadio = new.instancemethod(onRadio, self.p, SpamcontrolPlugin) self.p.registerEvent('EVT_CLIENT_RADIO', self.p.onRadio) # patch joe to make him able to send radio messages def radios(me, text): me.console.queueEvent(Event(type=EVT_CLIENT_RADIO, client=me, data={'text': text})) self.joe.radios = new.instancemethod(radios, self.joe, FakeClient)
def connect(self): SQLObjectStore.connect(self) if self._autocommit: # Since our autocommit patch above does not get applied to pooled # connections, we have to monkey-patch the pool connection method try: pool = self._pool connection = pool.connection except AttributeError: pass else: def newConnection(self): conn = self._normalConnection() try: conn.autocommit(True) except AttributeError: pass return conn pool._normalConnection = connection pool._autocommit = self._autocommit pool.connection = new.instancemethod( newConnection, pool, pool.__class__)
def enhance_method(klass, method_name, replacement): 'replace a method with an enhancement' method = getattr(klass, method_name) setattr(klass, method_name, new.instancemethod( lambda *args, **kwds: replacement(method, *args, **kwds), None, klass))
def enhance__init__(klass, f): ki = klass.__init__ klass.__init__ = new.instancemethod( lambda *args, **kwds: f(ki, *args, **kwds),None,klass)
def addStr(anInstance): anInstance.__str__ = new.instancemethod(__str__, anInstance, anInstance.__class__) # Test it
def save_instancemethod(self, obj): """ Save an instancemethod object """ # Instancemethods are re-created each time they are accessed so this will not be memoized args = (obj.im_func, obj.im_self, obj.im_class) self.save_reduce(new.instancemethod, args)
def __setitem__(self, n, obj): assert n == 0, "only zero index allowed" if not isinstance(obj, NotKnown): self.resolveDependants(instancemethod(self.my_class.__dict__[self.name], obj, self.my_class))
def unpickleMethod(im_name, im_self, im_class): 'support function for copy_reg to unpickle method refs' try: unbound = getattr(im_class,im_name) if im_self is None: return unbound bound=instancemethod(unbound.im_func, im_self, im_class) return bound except AttributeError: log.msg("Method",im_name,"not on class",im_class) assert im_self is not None,"No recourse: no instance to guess from." # Attempt a common fix before bailing -- if classes have # changed around since we pickled this method, we may still be # able to get it by looking on the instance's current class. unbound = getattr(im_self.__class__,im_name) log.msg("Attempting fixup with",unbound) if im_self is None: return unbound bound=instancemethod(unbound.im_func, im_self, im_self.__class__) return bound
def install(self, methodIdentifier): """Install myself on my instance in place of this method. """ oldMethod = getattr(self.instance, methodIdentifier, None) # XXX: this conditional probably isn't effective. if oldMethod is not self: # avoid triggering __setattr__ self.instance.__dict__[methodIdentifier] = ( new.instancemethod(self, self.instance, self.instance.__class__)) self.oldMethod = (methodIdentifier, oldMethod)
def make_instancemethod(function, instance): return new.instancemethod(function.im_func, instance, instance.__class__)
def save(self): config.misc.epgcache_filename.value = '%sepg.dat' % config.misc.epgcachepath.value config.misc.epgcache_filename.save() config.misc.epgcachepath.save() epgcache = new.instancemethod(_enigma.eEPGCache_save, None, eEPGCache) epgcache = eEPGCache.getInstance().save() config.plugins.ldteam.epgmhw2wait.save() config.epg.save() config.epg.maxdays.save() configfile.save() self.mbox = self.session.open(MessageBox, _('configuration is saved'), MessageBox.TYPE_INFO, timeout=4)
def process_task(self, server_info, command, options_dict=None): ssl_connection = server_info.get_preconfigured_ssl_connection() ssl_connection.ssl_version = server_info.highest_ssl_version_supported # Needed by the heartbleed payload # Awful hack #1: replace nassl.sslClient.do_handshake() with a heartbleed # checking SSL handshake so that all the SSLyze options # (startTLS, proxy, etc.) still work ssl_connection.do_handshake = new.instancemethod(do_handshake_with_heartbleed, ssl_connection, None) heartbleed = None try: # Perform the SSL handshake ssl_connection.connect() except HeartbleedSent: # Awful hack #2: directly read the underlying network socket heartbleed = ssl_connection._sock.recv(16381) finally: ssl_connection.close() # Text output is_vulnerable_to_heartbleed = False if heartbleed is None: raise ValueError("Error: connection failed.") elif '\x01\x01\x01\x01\x01\x01\x01\x01\x01' in heartbleed: # Server replied with our hearbeat payload is_vulnerable_to_heartbleed = True return HeartbleedResult(server_info, command, options_dict, is_vulnerable_to_heartbleed)
def method(cls): """Adds the function as a method to the given class.""" import new def _wrap(f): cls.__dict__[f.func_name] = new.instancemethod(f,None,cls) return None return _wrap
def patch_spamcontrolPlugin(self): """ This method alters the Spamcontrol plugin after it started to make it aware of RADIO spam. """ self.info("Patching spamcontrol plugin...") def onRadio(this, event): new_event = Event(type=event.type, client=event.client, target=event.target, data=repr(event.data)) this.onChat(new_event) self.spamcontrolPlugin.onRadio = new.instancemethod(onRadio, self.spamcontrolPlugin, SpamcontrolPlugin) self.spamcontrolPlugin.registerEvent('EVT_CLIENT_RADIO', self.spamcontrolPlugin.onRadio)
def __init__(self, glade_filename, window_name, app_name=None): #load glade file if app_name: self.xml = gtk.glade.XML(glade_filename, window_name, app_name) else: self.xml = gtk.glade.XML(glade_filename, window_name) #find and store methods as bound callbacks callbacks = {} class_methods = self.__class__.__dict__ for method_name in class_methods.keys(): method = class_methods[method_name] if type(method) == types.FunctionType: callbacks[method_name] = new.instancemethod(method, self, self.__class__) #autoconnect signals self.xml.signal_autoconnect(callbacks)
def watchObject(self, object, identifier, callback): """Watch the given object. Whenever I think the object might have changed, I'll send an ObjectLink of it to the callback. The identifier argument is used to generate identifiers for objects which are members of this one. """ if type(object) is not types.InstanceType: raise TypeError, "Sorry, can only place a watch on Instances." # uninstallers = [] dct = {} reflect.addMethodNamesToDict(object.__class__, dct, '') for k in object.__dict__.keys(): dct[k] = 1 members = dct.keys() clazzNS = {} clazz = new.classobj('Watching%s%X' % (object.__class__.__name__, id(object)), (_MonkeysSetattrMixin, object.__class__,), clazzNS) clazzNS['_watchEmitChanged'] = new.instancemethod( lambda slf, i=identifier, b=self, cb=callback: cb(b.browseObject(slf, i)), None, clazz) # orig_class = object.__class__ object.__class__ = clazz for name in members: m = getattr(object, name) # Only hook bound methods. if ((type(m) is types.MethodType) and (m.im_self is not None)): # What's the use of putting watch monkeys on methods # in addition to __setattr__? Well, um, uh, if the # methods modify their attributes (i.e. add a key to # a dictionary) instead of [re]setting them, then # we wouldn't know about it unless we did this. # (Is that convincing?) monkey = _WatchMonkey(object) monkey.install(name) # uninstallers.append(monkey.uninstall) # XXX: This probably prevents these objects from ever having a # zero refcount. Leak, Leak! ## self.watchUninstallers[object] = uninstallers
def patch_b3_admin_plugin(self): """ Monkey patches the admin plugin """ def new_cmd_kick(this, data, client=None, cmd=None): """ <name> [<reason>] - kick a player <fullexactname> [<reason>] - kick an incompletely authed player """ m = this.parseUserCmd(data) if not m: client.message('^7Invalid parameters') return False cid, keyword = m reason = this.getReason(keyword) if not reason and client.maxLevel < this._noreason_level: client.message('^1ERROR: ^7You must supply a reason') return False sclient = this.findClientPrompt(cid, client) if sclient: if sclient.cid == client.cid: this.console.say(self.getMessage('kick_self', client.exactName)) return True elif sclient.maxLevel >= client.maxLevel: if sclient.maskGroup: client.message('^7%s ^7is a masked higher level player, can\'t kick' % sclient.exactName) else: message = this.getMessage('kick_denied', sclient.exactName, client.exactName, sclient.exactName) this.console.say(message) return True else: sclient.kick(reason, keyword, client) return True elif re.match('^[0-9]+$', cid): # failsafe, do a manual client id kick this.console.kick(cid, reason, client) else: this.console.kickbyfullname(cid, reason, client) admin_plugin = self.getPlugin('admin') command = admin_plugin._commands['kick'] command.func = new.instancemethod(new_cmd_kick, admin_plugin) command.help = new_cmd_kick.__doc__.strip()
def patch_b3_admin_plugin(self): """ Monkey patches the admin plugin """ def parse_map_parameters(this, data, client): """ Method that parses a command parameters to extract map and gamemode. Expecting one or two parameters separated by a space. <map> <gamemode> """ parts = data.split() if len(parts) < 2: gamemode_data = '' else: gamemode_data = parts[1] map_data = parts[0] return map_data, gamemode_data # Monkey patch the cmd_map method of the loaded AdminPlugin instance # to require 2nd parameter which is the game mode def new_cmd_map(this, data, client, cmd=None): """ <map> <gamemode> - switch current map. Specify a gamemode by separating them from the map name with a space """ if not data: allavailablemaps = this.console.getAllAvailableMaps() maplist = '' for m in allavailablemaps: maplist = maplist + ', ' + m client.message("Full list of maps on the server" + maplist) client.message("PVP Gametypes are: ambush, firefight, flashpoint, infiltrate, occupy, push, skirmish, " "strike") client.message("Coop Gametypes are: checkpoint, outpost, hunt, survival") client.message('For more help, type !help map') return parsed_data = this.parse_map_parameters(data, client) map_id, gamemode_id = parsed_data suggestions = this.console.changeMap(map_id, gamemode_id) if type(suggestions) == list: client.message('do you mean : %s ?' % ', '.join(suggestions)) adminPlugin = self.getPlugin('admin') adminPlugin.parse_map_parameters = new.instancemethod(parse_map_parameters, adminPlugin) command = adminPlugin._commands['map'] command.func = new.instancemethod(new_cmd_map, adminPlugin) command.help = new_cmd_map.__doc__.strip()
def InstallInWebKit(appServer): # This function gets called by the app server during initialization. if not appServer.setting('EnableCOM', False): return # enabling COM was not requested # This must be done BEFORE pythoncom is imported -- see the book mentioned above. import sys sys.coinit_flags = 0 # Get the win32 extensions import pythoncom # Set references to the COM initialize and uninitialize functions appServer._initCOM = pythoncom.COINIT_MULTITHREADED appServer.initCOM = pythoncom.CoInitializeEx appServer.closeCOM = pythoncom.CoUninitialize # Monkey-patch this instance of the appServer # Grab references to the original initThread and delThread bound # methods, which we will replace appServer.originalInitThread = appServer.initThread appServer.originalDelThread = appServer.delThread # Create new versions of initThread and delThread which will call the # old versions def newInitThread(self): # This must be called at the beginning of any thread that uses COM self.initCOM(self._initCOM) # Call the original initThread self.originalInitThread() def newDelThread(self): # Call the original delThread self.originalDelThread() # Uninitialize COM self.closeCOM() # Replace the initThread and delThread methods with our new versions import new appServer.initThread = new.instancemethod(newInitThread, appServer, appServer.__class__) appServer.delThread = new.instancemethod(newDelThread, appServer, appServer.__class__) print 'COM has been enabled.'