我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用inspect.stack()。
def __ImportFrom(module_xname, xname): """ Import a specified name from a python module into the global namespace. This can be accomplished programatically and inside a method or class. @param module_xname : name of the module to load x.y.z @param xname : symbol to import from loaded module @return method/class : imported item from loaded module """ try: module = __import__(module_xname,globals(),locals(), [xname]) except ImportError: return None try: module = getattr(module,xname) if xname: for x in range( len(inspect.stack()) ): inspect.currentframe(x).f_globals[xname]=module except AttributeError as e: module=None return module
def stat_daily_site(self, start=None, end=None): """ Extracting statistics per site from start to end (unixtimestamps) :param start: :param end: :return: """ unixend = int(time.time()) if end is None else end unixstart = (unixend - 52 * 7 * 24 * 3600) if start is None else start content = self.sitecmdjson('/stat/report/daily.site', { 'attrs': ['bytes', 'wan-tx_bytes', 'wan-rx_bytes', 'wlan_bytes', 'num_sta', 'lan-num_sta', 'wlan-num_sta', 'time'], 'start': unixstart, 'end': unixend }) return self.response(content, inspect.stack()[0].function, 'Daily Statistics for site')
def stat_hourly_ap(self, start=None, end=None): """ Extracting statistics per site from start to end (unixtimestamps) :param start: :param end: :return: """ unixend = int(time.time()) if end is None else end unixstart = (unixend - 7 * 24 * 3600) if start is None else start content = self.sitecmdjson('/stat/report/hourly.ap', { 'attrs': ['bytes', 'num_sta', 'time'], 'start': unixstart, 'end': unixend }) return self.response(content, inspect.stack()[0].function, 'Hourly Statistics for AP')
def set_ap_radiosettings(self, ap_id, radio='ng', channel=1, ht='20', tx_power_mode=0, tx_power=0): """ Set AP settings :param ap_id: :param radio: :param channel: :param ht: :param tx_power_mode: :param tx_power: :return: """ content = self.sitecmdjson('/upd/device/' + urllib.parse.quote(ap_id), { 'radio': radio, 'channel': channel, 'ht': ht, 'tx_power_mode': tx_power_mode, 'tx_power': tx_power }) return self.response(content, inspect.stack()[0].function, 'AP Radio Settings')
def set_ap_network(self, ap_id, type="dhcp", ip="192.168.1.6", netmask="255.255.255.0", gateway="192.168.1.1", dns1="8.8.8.8", dns2="8.8.4.4"): """ Configure network :param ap_id: :param type: :param ip: :param netmask: :param gateway: :param dns1: :param dns2: :return: """ content = self.sitecmdjson('/rest/device/' + str(ap_id), { "config_network": [ { "type": type, "ip": ip, "netmask": netmask, "gateway": gateway, "dns1": dns1, "dns2": dns2 } ] }, method='PUT') return self.response(content, inspect.stack()[0].function, 'AP Network Config')
def add_hotspot2(self, name, network_access_internet=True, network_type=2, venue_group=2, venue_type=0): """ Add HotSpot 2.0, simple settings :param name: :param network_access_internet: :param network_type: :param venue_group: :param venue_type: :return: """ content = self.sitecmdjson('/rest/hotspot2conf', { "name": name, "network_access_internet": network_access_internet, "network_type": network_type, "venue_group": venue_group, "venue_type": venue_type }) return self.response(content, inspect.stack()[0].function, 'Add Hotspot 2.0')
def set_hotspot2(self, hs_id, name=None, network_access_internet=None, network_type=None, venue_group=None, venue_type=None): """ Modify Hotspot 2.0 :param hs_id: :param name: :param network_access_internet: :param network_type: :param venue_group: :param venue_type: :return: """ content = self.sitecmdjson('/rest/hotspot2conf/'+str(hs_id), { "_id": hs_id, "name": name, "network_access_internet": network_access_internet, "network_type": network_type, "venue_group": venue_group, "venue_type": venue_type }, method="PUT") return self.response(content, inspect.stack()[0].function, 'Modify Hotspot 2.0 sites')
def log_assert(bool_, message="", logger=None, logger_name="", verbose=False): """Use this as a replacement for assert if you want the failing of the assert statement to be logged.""" if logger is None: logger = logging.getLogger(logger_name) try: assert bool_, message except AssertionError: # construct an exception message from the code of the calling frame last_stackframe = inspect.stack()[-2] source_file, line_no, func = last_stackframe[1:4] source = "Traceback (most recent call last):\n" + \ ' File "%s", line %s, in %s\n ' % (source_file, line_no, func) if verbose: # include more lines than that where the statement was made source_code = open(source_file).readlines() source += "".join(source_code[line_no - 3:line_no + 1]) else: source += last_stackframe[-2][0].strip() logger.debug("%s\n%s" % (message, source)) raise AssertionError("%s\n%s" % (message, source))
def psome(obj, count): """ The obj will printed only a number of count times. """ global d key = inspect.stack()[1][1]+':'+str(inspect.stack()[1][2]) d[key] = d.get(key,count) -1 if d[key]>=0: print(obj) # From test.py
def parseFile( self, file_or_filename, parseAll=False ): """Execute the parse expression on the given file or filename. If a filename is specified (instead of a file object), the entire file is opened, read, and closed before parsing. """ try: file_contents = file_or_filename.read() except AttributeError: f = open(file_or_filename, "r") file_contents = f.read() f.close() try: return self.parseString(file_contents, parseAll) except ParseBaseException as exc: if ParserElement.verbose_stacktrace: raise else: # catch and re-raise exception from here, clears out pyparsing internal stack trace raise exc
def filter(self, record): # skip this function stack = inspect.stack()[1:] for i, (frame, path, ln, func, line, xx) in enumerate(stack): if (frame.f_globals.get('__name__') == 'pykit.ututil' and func == 'dd'): # this frame is dd(), find the caller _, path, ln, func, line, xx = stack[i + 1] record._fn = os.path.basename(path) record._ln = ln record._func = func return True record._fn = record.filename record._ln = record.lineno record._func = record.funcName return True
def _find_frame_by_self(clz): """ Find the first frame on stack in which there is local variable 'self' of type clz. """ frame = inspect.currentframe() while frame: self = frame.f_locals.get('self') if isinstance(self, clz): return frame frame = frame.f_back return None
def patch(self, orig, replace): if not hasattr(self, '__patches'): self.__patches = [] self.addCleanup(unpatch, self) f = inspect.stack()[1][0] parts = orig.split('.') v = f.f_globals.copy() v.update(f.f_locals) orig = v[parts[0]] for part in parts[1:-1]: orig = getattr(orig, part) to_replace = getattr(orig, parts[-1]) self.__patches.append((orig, parts[-1], to_replace)) setattr(orig, parts[-1], replace)
def debug(self, message): if not self.opts['_debug']: return frm = inspect.stack()[1] mod = inspect.getmodule(frm[0]) if mod is None: modName = "Unknown" else: modName = mod.__name__ if self.dbh is None: print '[' + modName + '] ' + message else: self._dblog("DEBUG", message, modName) return
def goto(self, rTarget, **kwargs): if callable(rTarget) is False: rTarget = self.registry[rTarget] if( "payload" in kwargs ): self.tEvt['payload'] = kwargs['payload'] """Transition to a new state (Cached Version)""" caller = inspect.stack()[1][3] # hsm could be calling its super states # we aren't tracking this in this, so we can # just reference the call stack to find who # discovered the event #print "\nCurrent state: ", self.rCurr; print "Source state: ", self.rSource self.__spy__(caller, self.tEvt['event'], self.tEvt['payload']) # this is how you see the target if self.rCurr['toLcaCache'].get(self.tEvt['event'], None) == None: self.rCurr['toLcaCache'][self.tEvt['event']] = \ self.to_lca(self.hsm_dict[rTarget]) # self.pp.pprint(self.rCurr); print self.exit(self.rCurr['toLcaCache'][self.tEvt['event']]) self.rNext = self.hsm_dict[rTarget]
def check_class(): """ Return the class name for the current frame. If the result is ** None ** means that the call is made from a module. """ # get frames frames = inspect.stack() cls = None # should be the third frame # 0: this function # 1: function/decorator # 2: class that contains the function if len(frames) > 2: frame = frames[2][0] if '__module__' in frame.f_code.co_names: cls = SillyClass(**frame.f_locals) cls.__cls_name__ = frame.f_code.co_name return cls
def debug(fn): """ Function/method decorator to trace calls via debug logging. Is a pass-thru if we're not at LOG_LEVEL=DEBUG. Normally this would have a lot of perf impact but this application doesn't have significant throughput. """ @wraps(fn) def wrapper(*args, **kwargs): try: # because we have concurrent processes running we want # to tag each stack with an identifier for that process arg = "[{}]".format(sys.argv[1]) except IndexError: arg = "[pre_start]" name = '{}{}{}'.format(arg, (len(inspect.stack()) * " "), fn.__name__) log.debug('%s' % name) out = apply(fn, args, kwargs) log.debug('%s: %s', name, out) return out return wrapper
def test_5000_preflight_default_vhost_absolute(self): """Check that the default vhost works and can be identified """ self.real_test = "{0}_{1}".format(inspect.stack()[0][3], self.tested_char_name) self.setGravity(self.GRAVITY_MINOR) if Register.hasFlag('abs_default_vhost', default=False): self.skipTest("Preflight test already done.") self.req.set_method('GET') target_host = self.config.get('SERVER_HOST') self.req.host = self.config.get('SERVER_HOST') self.req.location = 'http://{0}{1}'.format( target_host, self.req.location) self._end_almost_regular_query(regular_expected=True) Register.flag('abs_default_vhost', self.status == self.STATUS_ACCEPTED) self.assertIn(self.status, [self.STATUS_ACCEPTED], 'Bad response status {0}'.format(self.status))
def test_5002_preflight_non_default_vhost(self): """Check that the non default vhost works and can be identified """ # FIXME: on preflight: detect bad proxy configuration with # empty echo query. Means the 2nd vhost proxy is not working # TODO: do this test also on first proxy and test_regular self.real_test = "{0}_{1}".format(inspect.stack()[0][3], self.tested_char_name) if not self.config.getboolean('MULTIPLE_HOSTS_TESTS'): self.skipTest("Tests with multiple virtualhosts are not activated" " in configuration.") if Register.hasFlag('non_default_vhost', default=False): self.skipTest("Preflight test already done.") self.setGravity(self.GRAVITY_MINOR) self.req.set_method('GET') self.req.host = self.config.get('SERVER_NON_DEFAULT_HOST') self.req.location = self.get_non_default_location( with_prefix=self.use_backend_location) self._end_almost_regular_query(regular_expected=True) Register.flag('non_default_vhost', self.status == self.STATUS_ACCEPTED) self.assertIn(self.status, [self.STATUS_ACCEPTED], 'Bad response status {0}'.format(self.status))
def test_0005_bad_pipeline(self): "Chain 3 queries, second is bad, should stop the response stream." self.real_test = "{0}".format(inspect.stack()[0][3]) self._prepare_pipe_test() self.req2.set_location(Tools.NULL, random=True) self.req3 = Request(id(self)) self.req3.set_location(self.config.get('SERVER_DEFAULT_LOCATION'), random=True) with Client() as csock: csock.send(self.req1) csock.send(self.req2) csock.send(self.req3) responses = csock.read_all() outmsg(str(responses)) self.analysis(responses, expected_number=2, regular_expected=False) self.assertTrue((responses.count <= 2)) if (responses.count > 1): self.assertIn(self.status, [self.STATUS_REJECTED, self.STATUS_ERR400], 'Bad response status {0}'.format(self.status))
def test_2000_preflight_regular_chunked_get(self): """Let's start by a regular GET with chunked body """ self.real_test = "{0}".format(inspect.stack()[0][3]) self.setGravity(self.GRAVITY_MINOR) self.req.set_method('GET') self.req.add_header('Transfer-Encoding', 'chunked') self.req.add_header('Content-Type', 'application/x-www-form-urlencoded') self.req.add_chunk('Hello') self.req.add_chunk('World') self._end_almost_regular_query() Register.flag('get_chunk_{0}'.format(self.reverse_proxy_mode), (self.status == self.STATUS_ACCEPTED)) self.assertIn(self.status, [self.STATUS_ACCEPTED, self.STATUS_ERR405], 'Bad response status {0}'.format(self.status))
def test_2001_preflight_regular_chunked_post(self): """If get is not good, try POST for chunked queries. """ self.real_test = "{0}".format(inspect.stack()[0][3]) self.setGravity(self.GRAVITY_MINOR) self.req.set_method('POST') self.req.add_header('Transfer-Encoding', 'chunked') self.req.add_header('Content-Type', 'application/x-www-form-urlencoded') self.req.add_chunk('Hello') self.req.add_chunk('World') self._end_almost_regular_query() Register.flag('post_chunk_{0}'.format(self.reverse_proxy_mode), (self.status == self.STATUS_ACCEPTED)) self.assertIn(self.status, [self.STATUS_ACCEPTED, self.STATUS_ERR405], 'Bad response status {0}'.format(self.status))
def test_2020_bad_chunked_transfer_encoding(self): """chunk not used as last marker in Transfer-Encoding header. """ self.real_test = "{0}".format(inspect.stack()[0][3]) self.setGravity(self.GRAVITY_MINOR) method = self._get_valid_chunk_method() self.req.method = method # chunk MUST be the last marker self.req.add_header('Transfer-Encoding', 'chunked, zorg') self.req.add_header('Content-Type', 'application/x-www-form-urlencoded') self.req.add_chunk('Hello') self.req.add_chunk('World') self._end_expected_error()
def test_3013_line_prefix(self): "Some characters before the query..." self.real_test = "{0}_{1}".format( inspect.stack()[0][3], Tools.show_chars(self.separator)) self.setGravity(BaseTest.GRAVITY_MINOR) self.req.set_first_line_prefix(self.separator) # for RP mode: self.transmission_map = { '{0}GET'.format( self.separator): self.STATUS_TRANSMITTED_EXACT, ' GET': self.STATUS_TRANSMITTED_CRAP, } self._add_default_status_map( valid=self.valid_prefix, always_allow_rejected=self.can_be_rejected ) # local changes self.status_map[self.STATUS_TRANSMITTED_EXACT] = self.GRAVITY_MINOR self.status_map[self.STATUS_TRANSMITTED] = self.GRAVITY_MINOR self._end_1st_line_query()
def test_3014_line_suffix(self): "Let's add some garbage after the protocol." self.real_test = "{0}_{1}".format( inspect.stack()[0][3], Tools.show_chars(self.separator)) self.setGravity(BaseTest.GRAVITY_WARNING) self.req.set_first_line_suffix(self.separator) # for RP mode: self.transmission_map = { ' HTTP/1.0{0}\r\n'.format( self.separator): self.STATUS_TRANSMITTED_EXACT, ' HTTP/1.1{0}\r\n'.format( self.separator): self.STATUS_TRANSMITTED_EXACT, ' HTTP/1.1 \r\n': self.STATUS_TRANSMITTED_CRAP, } self._add_default_status_map( valid=False) self._end_1st_line_query(http09_allowed=False)
def test_3016_line_suffix_with_char_H(self): "Nginx, for example, likes the H character" self.real_test = "{0}_{1}".format( inspect.stack()[0][3], Tools.show_chars(self.separator)) self.setGravity(BaseTest.GRAVITY_WARNING) self.req.set_first_line_suffix(self.separator + u'H') # for RP mode: self.transmission_map = { ' HTTP/1.0{0}H\r\n'.format( self.separator): self.STATUS_TRANSMITTED_EXACT, ' HTTP/1.1{0}H\r\n'.format( self.separator): self.STATUS_TRANSMITTED_EXACT, 'HTTP/1.1{0}H H'.format( self.separator): self.STATUS_TRANSMITTED_CRAP, 'HTTP/1.1 H\r\n': self.STATUS_TRANSMITTED_CRAP, } self._add_default_status_map( valid=False) self._end_1st_line_query(http09_allowed=False)
def test_3017_line_suffix_with_double_HTTP11(self): "Ending first line with two times the protocol" self.real_test = "{0}_{1}".format( inspect.stack()[0][3], Tools.show_chars(self.separator)) self.req.set_first_line_suffix(self.separator + u'HTTP/1.1') # for RP mode: self.transmission_map = { 'HTTP/1.1{0}HTTP/1.1\r\n'.format( self.separator): self.STATUS_TRANSMITTED_EXACT, 'HTTP/1.0{0}HTTP/1.0\r\n'.format( self.separator): self.STATUS_TRANSMITTED_CRAP, 'HTTP/1.0{0}HTTP/1.1\r\n'.format( self.separator): self.STATUS_TRANSMITTED_CRAP, 'HTTP/1.1{0}HTTP/1.0\r\n'.format( self.separator): self.STATUS_TRANSMITTED_CRAP, } self._add_default_status_map( valid=False) self._end_1st_line_query(http09_allowed=False)
def test_3020_multiple_cr_line_prefix(self): # TODO: this is maybe allowed, but we do not have any LF here... self.separator = u'\r\r\r\r\r' self.real_test = "{0}_{1}".format( inspect.stack()[0][3], Tools.show_chars(self.separator)) self.setGravity(BaseTest.GRAVITY_MINOR) self.req.set_first_line_prefix(self.separator) # for RP mode: self.transmission_map = { '{0}GET'.format( self.separator): self.STATUS_TRANSMITTED_EXACT, ' GET': self.STATUS_TRANSMITTED_CRAP, } self._add_default_status_map( valid=False, always_allow_rejected=True) self._end_1st_line_query(http09_allowed=False)
def test_3021_crlf_line_prefix(self): self.separator = u'\r\n' self.real_test = "{0}_{1}".format( inspect.stack()[0][3], Tools.show_chars(self.separator)) self.setGravity(BaseTest.GRAVITY_MINOR) self.req.set_first_line_prefix(self.separator) # for RP mode: self.transmission_map = { '{0}GET'.format( self.separator): self.STATUS_TRANSMITTED_EXACT, ' GET': self.STATUS_TRANSMITTED_CRAP, } self._add_default_status_map( valid=True, always_allow_rejected=False) # this is allowed in RFC self._end_1st_line_query(http09_allowed=False)
def test_3023_crcrcrlf_line_prefix(self): self.separator = u'\r\r\r\r\r\n' self.real_test = "{0}_{1}".format( inspect.stack()[0][3], Tools.show_chars(self.separator)) self.setGravity(BaseTest.GRAVITY_MINOR) self.req.set_first_line_prefix(self.separator) # for RP mode: self.transmission_map = { '{0}GET'.format( self.separator): self.STATUS_TRANSMITTED_EXACT, ' GET': self.STATUS_TRANSMITTED_CRAP, } self._add_default_status_map( valid=True, always_allow_rejected=True) self._end_1st_line_query(http09_allowed=False)
def test_3040_http_65536_9(self): "Integer overflow attempt on protocol version." self.real_test = "{0}".format(inspect.stack()[0][3]) self.req.set_major_version(65536) self.req.set_minor_version(9) self.req.add_argument('last', 'marker') # for RP mode: self.transmission_map = { ' HTTP/65536.9\r\n': self.STATUS_TRANSMITTED_EXACT, 'HTTP/65536.9 H': self.STATUS_TRANSMITTED_CRAP, 'last=marker\r\n': self.STATUS_TRANSMITTED_CRAP, } self._add_default_status_map(valid=False) # local changes self.status_map[self.STATUS_TRANSMITTED] = self.GRAVITY_CRITICAL self.status_map[self.STATUS_09OK] = self.GRAVITY_CRITICAL self.status_map[self.STATUS_09DOWNGRADE] = self.GRAVITY_CRITICAL self._end_1st_line_query(http09_allowed=False)
def test_3041_http_0_9_explicit(self): "HTTP/0.9 does not exists. Should be an empty protocol." self.real_test = "{0}".format(inspect.stack()[0][3]) self.req.set_major_version(0) self.req.set_minor_version(9) self.req.add_argument('last', 'marker') # for RP mode: self.transmission_map = { ' HTTP/0.9\r\n': self.STATUS_TRANSMITTED_EXACT, 'HTTP/0.9 H': self.STATUS_TRANSMITTED_CRAP, 'last=marker\r\n': self.STATUS_TRANSMITTED_CRAP, } self._add_default_status_map(valid=False) # local changes self.status_map[self.STATUS_TRANSMITTED] = self.GRAVITY_CRITICAL self.status_map[self.STATUS_09OK] = self.GRAVITY_CRITICAL self.status_map[self.STATUS_09DOWNGRADE] = self.GRAVITY_CRITICAL self._end_1st_line_query(http09_allowed=False)
def test_3043_http_minus_1_0(self): "HTTP/-1.0 does not exists." self.real_test = "{0}".format(inspect.stack()[0][3]) self.req.set_major_version(-1) self.req.set_minor_version(0) self.req.add_argument('last', 'marker') # for RP mode: self.transmission_map = { ' HTTP/-1.0\r\n': self.STATUS_TRANSMITTED_EXACT, 'HTTP/-1.0 H': self.STATUS_TRANSMITTED_CRAP, 'HTTP/-1.1': self.STATUS_TRANSMITTED_CRAP, 'last=marker\r\n': self.STATUS_TRANSMITTED_CRAP, } self._add_default_status_map(valid=False) # local changes self.status_map[self.STATUS_TRANSMITTED] = self.GRAVITY_CRITICAL self.status_map[self.STATUS_09OK] = self.GRAVITY_CRITICAL self.status_map[self.STATUS_09DOWNGRADE] = self.GRAVITY_CRITICAL self._end_1st_line_query(http09_allowed=False)
def test_3044_http_1_nodigit(self): "HTTP/1.x1 does not exists." self.real_test = "{0}".format(inspect.stack()[0][3]) self.req.set_major_version(1) self.req.set_minor_version('x1') self.req.add_argument('last', 'marker') # for RP mode: self.transmission_map = { ' HTTP/1.x1\r\n': self.STATUS_TRANSMITTED_EXACT, 'HTTP/1.0 H': self.STATUS_TRANSMITTED_CRAP, 'HTTP/1.1 H': self.STATUS_TRANSMITTED_CRAP, 'HTTP/1.x1 H': self.STATUS_TRANSMITTED_CRAP, 'last=marker\r\n': self.STATUS_TRANSMITTED_CRAP, } self._add_default_status_map(valid=False) # local changes self.status_map[self.STATUS_TRANSMITTED] = self.GRAVITY_CRITICAL self.status_map[self.STATUS_09OK] = self.GRAVITY_CRITICAL self.status_map[self.STATUS_09DOWNGRADE] = self.GRAVITY_CRITICAL self._end_1st_line_query(http09_allowed=False)
def test_3045_http_version_too_much_digits(self): "New RFC is quite strict, DIGIT.DIGIT only, in fact." self.real_test = "{0}".format(inspect.stack()[0][3]) self.req.set_major_version(11) self.req.set_minor_version(11) self.req.add_argument('last', 'marker') # for RP mode: self.transmission_map = { ' HTTP/11.11\r\n': self.STATUS_TRANSMITTED_EXACT, 'HTTP/11.0': self.STATUS_TRANSMITTED_CRAP, '.11 H': self.STATUS_TRANSMITTED_CRAP, 'last=marker\r\n': self.STATUS_TRANSMITTED_CRAP, } self._add_default_status_map(valid=False) # local changes self.status_map[self.STATUS_TRANSMITTED] = self.GRAVITY_CRITICAL self.status_map[self.STATUS_09OK] = self.GRAVITY_CRITICAL self.status_map[self.STATUS_09DOWNGRADE] = self.GRAVITY_CRITICAL self._end_1st_line_query(http09_allowed=False)
def _get(self, database_type, ip_address): if not database_type in self.metadata().database_type: caller = inspect.stack()[2][3] raise TypeError("The %s method cannot be used with the " "%s database" % (caller, self.metadata().database_type)) record = self._db_reader.get(ip_address) if record is None: raise geoip2.errors.AddressNotFoundError( "The address %s is not in the database." % ip_address) return record
def __ImportModule( module_name, asName=None ): """ Import a python module as needed into the global namespace. This can be accomplished programatically and inside a method or class. @param module_name : name of the module to load x.y.z @return module : python module object that was loaded """ module = __import__(module_name) for layer in module_name.split('.')[1:]: module = getattr(module,layer) if asName: for x in range( len(inspect.stack()) ): inspect.currentframe(x).f_globals[asName]=module return module
def __aenter__(self): model_nonce = uuid.uuid4().hex[-4:] frame = inspect.stack()[1] test_name = frame.function.replace('_', '-') jujudata = TestJujuData() self._controller = Controller(jujudata=jujudata) controller_name = jujudata.current_controller() user_name = jujudata.accounts()[controller_name]['user'] await self._controller.connect(controller_name) model_name = 'test-{}-{}-{}'.format( test_run_nonce, test_name, model_nonce, ) self._model = await self._controller.add_model(model_name) # Change the JujuData instance so that it will return the new # model as the current model name, so that we'll connect # to it by default. jujudata.set_model( controller_name, user_name + "/" + model_name, self._model.info.uuid, ) # save the model UUID in case test closes model self._model_uuid = self._model.info.uuid return self._model
def get_current_func(self): ''' :return: ??????? ''' return inspect.stack()[1][3]
def __repr__(self): if inspect.stack()[1][3] == '?': self() return '' return '<pydoc.Helper instance>'
def get_stack_path(i=0, depth=None): '''Inspect Stack to get a relative name for this code point ''' stk = inspect.stack()[(2 + i):depth] loc = [] for sf in reversed(stk): _, fn = os.path.split(sf.filename) f = sf.function loc.append('[{}]{}'.format(fn, f)) return ':'.join(loc)
def __init__(self, name, *values, **options): """Create a new rule definition. Simply instantiating a new rule definition will add it to the current ``GramFuzzer`` instance. :param str name: The name of the rule being defined :param list values: The list of values that define the value of the rule (will be concatenated when built) :param str cat: The category to create the rule in (default=``"default"``). :param bool no_prune: If this rule should not be pruned *EVEN IF* it is found to be unreachable (default=``False``) """ self.name = name self.options = options self.values = list(values) self.sep = self.options.setdefault("sep", self.sep) self.cat = self.options.setdefault("cat", self.cat) self.no_prune = self.options.setdefault("no_prune", self.no_prune) self.fuzzer = GramFuzzer.instance() frame,mod_path,_,_,_,_ = inspect.stack()[1] module_name = os.path.basename(mod_path).replace(".pyc", "").replace(".py", "") if "TOP_CAT" in frame.f_locals: self.fuzzer.cat_group_defaults[module_name] = frame.f_locals["TOP_CAT"] self.fuzzer.add_definition(self.cat, self.name, self, no_prune=self.no_prune, gram_file=module_name)
def get_program_name(): return os.path.basename(inspect.stack()[-1][1])
def get_test_path(filename): frame = inspect.stack()[1] module = inspect.getmodule(frame[0]) path = os.path.realpath(module.__file__) root = os.path.abspath(os.path.join(path, os.pardir)) return os.path.join(root, 'test_data', filename)
def __init__(self, name=None, router=None, load_env=True, log_config=LOGGING): if log_config: logging.config.dictConfig(log_config) # Only set up a default log handler if the # end-user application didn't set anything up. if not (logging.root.handlers and log.level == logging.NOTSET and log_config): formatter = logging.Formatter( "%(asctime)s: %(levelname)s: %(message)s") handler = logging.StreamHandler() handler.setFormatter(formatter) log.addHandler(handler) log.setLevel(logging.INFO) # Get name from previous stack frame if name is None: frame_records = stack()[1] name = getmodulename(frame_records[1]) self.name = name self.config = Config(load_env=load_env) self.router = router or PathRouter() self.debug = None self.static_handler = None
def authorize_guest(self, mac, minutes=60, up=None, down=None, mbytes=None, apmac=None): """ Authorize one guest :param mac: the mac of the guest :param minutes: for how many minutes it is authorized :param up: upstream bandwidth in kbits :param down: downstream bandwidth in kbits :param mbytes: mbytes limit in kbits :param apmac: AP mac address (faster performance) :return: """ data = { 'mac': mac.lower(), 'cmd': 'authorize-guest', 'minutes': minutes } if up: data['up'] = up if down: data['down'] = down if mbytes: data['bytes'] = mbytes if apmac: data['ap_mac'] = apmac.lower() content = self.sitecmdjson('/cmd/stamgr', data) return self.response(content, inspect.stack()[0].function, 'Guest Authorization')
def unauthorize_guest(self, mac): """ Unauthorize one guest :param mac: :return: """ content = self.sitecmdjson('/cmd/stamgr', { 'cmd': 'unauthorize-guest', 'mac': mac.lower() }) return self.response(content, inspect.stack()[0].function, 'Guest Unuthorization')
def kick_sta(self, mac): """ Reconnect client device :param mac: :return: """ content = self.sitecmdjson('/cmd/stamgr', { 'cmd': 'kick-sta', 'mac': mac.lower() }) return self.response(content, inspect.stack()[0].function, 'Kicking Station')
def block_sta(self, mac): """ Blocking client device :param mac: :return: """ content = self.sitecmdjson('/cmd/stamgr', { 'cmd': 'block-sta', 'mac': mac.lower() }) return self.response(content, inspect.stack()[0].function, 'Blocking Station')