我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用inspect.getfile()。
def __init__(self): date_time_name = datetime.utcnow().strftime("%Y-%m-%d_%H-%M-%S") logging.basicConfig(filename=date_time_name + '.log', level=logging.INFO) path = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))) self.config = configparser.ConfigParser() self.config.read(os.path.join(path, "configuration.txt")) self.sleep_time = int(self.config.get("settings", "time_between_retweets")) self.search_term = self.config.get("settings", "search_query") self.tweet_language = self.config.get("settings", "tweet_language") self.max_age_in_minutes = int(self.config.get("settings", "max_age_in_minutes")) self.last_id_file = self.build_save_point() self.savepoint = self.retrieve_save_point(self.last_id_file) auth = tweepy.OAuthHandler(self.config.get("twitter", "consumer_key"), self.config. get("twitter", "consumer_secret")) auth.set_access_token(self.config.get("twitter", "access_token"), self.config. get("twitter", "access_token_secret")) self.api = tweepy.API(auth)
def registerByPackage(self, pkg): """This function is similar to registerByModule() but works on packages, this is an expensive operation as it requires a recursive search by importing all sub modules and and searching them. :param pkg: The package path to register eg. zoo.libs.apps :type pkg: str """ mod = modules.importModule(pkg) realPath = os.path.dirname(inspect.getfile(mod)) pkgSplitPath = pkg.replace(".", os.path.sep) self.basePaths.append(realPath) for subModule in modules.iterModules(realPath): filename = os.path.splitext(os.path.basename(subModule))[0] if filename.startswith("__") or subModule.endswith(".pyc"): continue newDottedPath = pkg + subModule.split(pkgSplitPath)[-1].replace(os.path.sep, ".").split(".py")[0] subModuleObj = modules.importModule(newDottedPath) for member in modules.iterMembers(subModuleObj, predicate=inspect.isclass): self.registerPlugin(member[1])
def _init(self): """Initializes some basic info about the plugin and the use environment Internal use only: """ try: path = inspect.getfile(self.command.__class__) except: path = "" self.info.update({"id": self.command.id, "creator": self.command.creator, "module": self.command.__class__.__module__, "filepath": path, "application": env.application() }) self.info.update(env.machineInfo())
def flushUnder(dirpath): """Flushes all modules that live under the given directory :param dirpath: the name of the top most directory to search under. :type dirpath: str """ modulePaths = list() for name, module in sys.modules.items(): if module is None: del sys.modules[name] continue try: moduleDirpath = os.path.realpath(os.path.dirname(inspect.getfile(module))) if moduleDirpath.startswith(dirpath): modulePaths.append((name, inspect.getfile(sys.modules[name]))) del sys.modules[name] logger.debug('unloaded module: %s ' % name) except TypeError: continue # Force a garbage collection gc.collect() return modulePaths
def get_tests(config={}): tests = [] tests += list_test_cases(RSATest) try: from Crypto.PublicKey import _fastmath tests += list_test_cases(RSAFastMathTest) except ImportError: from distutils.sysconfig import get_config_var import inspect _fm_path = os.path.normpath(os.path.dirname(os.path.abspath( inspect.getfile(inspect.currentframe()))) +"/../../PublicKey/_fastmath"+get_config_var("SO")) if os.path.exists(_fm_path): raise ImportError("While the _fastmath module exists, importing "+ "it failed. This may point to the gmp or mpir shared library "+ "not being in the path. _fastmath was found at "+_fm_path) if config.get('slow_tests',1): tests += list_test_cases(RSASlowMathTest) return tests
def get_tests(config={}): tests = [] tests += list_test_cases(DSATest) try: from Crypto.PublicKey import _fastmath tests += list_test_cases(DSAFastMathTest) except ImportError: from distutils.sysconfig import get_config_var import inspect _fm_path = os.path.normpath(os.path.dirname(os.path.abspath( inspect.getfile(inspect.currentframe()))) +"/../../PublicKey/_fastmath"+get_config_var("SO")) if os.path.exists(_fm_path): raise ImportError("While the _fastmath module exists, importing "+ "it failed. This may point to the gmp or mpir shared library "+ "not being in the path. _fastmath was found at "+_fm_path) tests += list_test_cases(DSASlowMathTest) return tests
def code_to_ast(code: types.CodeType, file: str = None) -> ast.Module: """ Return node object for code object. """ if code and not isinstance(code, types.CodeType): raise TypeError('Unexpected type: {}'.format(str(type(code)))) result = None try: src = inspect.getsource(code) file = file or inspect.getfile(code) result = source_to_ast(src, file) except IOError: pass return result
def module_to_ast(module: types.ModuleType, file: str = None) -> ast.Module: """ Return node object for python module. """ if module and not isinstance(module, types.ModuleType): raise TypeError('Unexpected type: {}'.format(str(type(module)))) result = None try: src = inspect.getsource(module) file = file or inspect.getfile(module) result = source_to_ast(src, file) except IOError: pass return result
def class_to_ast(class_: type, file: str = None) -> ast.ClassDef: """ """ if class_ and not isinstance(class_, type): raise TypeError('Unexpected type: {}'.format(str(type(class_)))) result = None try: src = inspect.getsource(class_) file = file or inspect.getfile(class_) result = source_to_ast(src, file) except IOError: pass return result
def ast_to_code(node: ast.AST, old_code: types.CodeType = None, file: str = None) -> types.CodeType: """ Compile node object to code. """ if node and not isinstance(node, ast.AST): raise TypeError('Unexpected type for node: {}'.format(str(type(node)))) if old_code and not isinstance(old_code, types.CodeType): raise TypeError('Unexpected type for old_module: {}'.format(str(type(old_code)))) result = old_code if node: file = file or (inspect.getfile(old_code) if old_code else None) result = _call_with_frames_removed\ ( compile , source = node , filename = file or '<file>' , mode = 'exec' , dont_inherit = True ) elif not old_code: raise ValueError('Not specified value') return result
def configure_parser(parser): parser.description = "Run a migration script. These can only be run forward, not in reverse." # Inspect the local directory for which migration modules the user can run. local_files = os.listdir( os.path.dirname( inspect.getfile( inspect.currentframe() ))) migration_files = [f for f in local_files if re.match('^\d{4}.*\.py$', f)] migration_names = [m.rstrip('.py') for m in migration_files] parser.add_argument( 'migration_name', choices=migration_names, help="The name of the migration script you want to run." )
def __init__(self, vgg16_npy_path=None, trainable=True): if vgg16_npy_path is None: path = inspect.getfile(VGG16) path = os.path.abspath(os.path.join(path, os.pardir)) path = os.path.join(path, "vgg16.npy") vgg16_npy_path = path print(path) self.trainable = trainable self.imgs = tf.placeholder(tf.float32, [None, 224, 224, 3], name='images') self.labels = tf.placeholder(tf.float32, [None, FLAGS.num_classes], name='labels') self.var_dict = {} self.temp_value = None self.data_dict = None with open(vgg16_npy_path, 'rb') as f: self.data_dict = np.load(vgg16_npy_path, encoding='latin1').item() # # self.data_dict = pickle.load(f) # pickle.dump(self.data_dict, f, protocol=2) self.lrn_rate = 0.01 print("npy file loaded") self.build() self.loss_layer()
def getSignVeryfy(self,**post): key_sorted = sorted(post.keys()) content = '' sign_type = post['sign_type'] sign = post['sign'] for key in key_sorted: if key not in ["sign","sign_type"]: if post[key]: content = content + key + "=" + post[key] + "&" content = content[:-1] content = content.encode("utf-8") isSign = False if sign_type.upper() == "RSA": directory_path = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))) path = os.path.join(directory_path, 'rsa_public_key.pem') isSign = func.rsaVerify(content,open(path,'r').read(),sign) return isSign
def setUpClass(cls): """On inherited classes, run our `setUp` method""" cls._snapshot_tests = [] cls._snapshot_file = inspect.getfile(cls) if cls is not TestCase and cls.setUp is not TestCase.setUp: orig_setUp = cls.setUp orig_tearDown = cls.tearDown def setUpOverride(self, *args, **kwargs): TestCase.setUp(self) return orig_setUp(self, *args, **kwargs) def tearDownOverride(self, *args, **kwargs): TestCase.tearDown(self) return orig_tearDown(self, *args, **kwargs) cls.setUp = setUpOverride cls.tearDown = tearDownOverride super(TestCase, cls).setUpClass()
def loadatlas(r=5): """Load the atlas from the brainpipe module """ B3Dpath = dirname( abspath(join(getfile(currentframe()), '..', '..', '..', 'atlas'))) # Load talairach atlas : with open(B3Dpath + '/atlas/labels/talairach_atlas.pickle', "rb") as f: TAL = pickle.load(f) label = TAL['label'] strGM = ['No Gray Matter found within +/-'+str(r)+'mm'] label = concat([label, DataFrame({'hemisphere': [strGM], 'lobe':[ strGM], 'gyrus':[strGM], 'matter':[strGM], 'brodmann':[ 0]})]) label = label.set_index([list(n.arange(label.shape[0]))]) return TAL['hdr'], TAL['mask'], TAL['gray'], label
def test(self): # in this example, put your test_plugin/test.html template at airflow/plugins/templates/test_plugin/test.htm attributes = [] data_table = [] operator_data={} for classes in ALL: operator_data['name']=str(classes.__name__) operator_data['type']=str(inspect.getfile(classes).split("_")[-1].split(".")[0]) operator_data['module']=str(inspect.getfile(classes).split("_")[-2]) try: operator_data['args']=str(classes.arguments) if classes.arguments else "NA" except Exception: operator_data['args']= "NOT FOUND" operator_data['desc']=str(inspect.getdoc(classes)) operator_data['loc']=str(inspect.getfile(classes)) data_table.append(copy.deepcopy(operator_data)) #data_table = json.dumps(data_table) return self.render("tac_plugin/tac.html",attributes=attributes,data_table=data_table)
def apply_stub_handler(args: argparse.Namespace, stdout: IO, stderr: IO) -> None: stub = get_stub(args, stdout, stderr) if stub is None: print(f'No traces found', file=stderr) return module = args.module_path[0] mod = importlib.import_module(module) src_path = inspect.getfile(mod) src_dir = os.path.dirname(src_path) pyi_name = module.split('.')[-1] + '.pyi' with tempfile.TemporaryDirectory(prefix='monkeytype') as pyi_dir: pyi_path = os.path.join(pyi_dir, pyi_name) with open(pyi_path, 'w+') as f: f.write(stub.render()) cmd = ' '.join([ 'retype', '--pyi-dir ' + pyi_dir, '--target-dir ' + src_dir, src_path ]) subprocess.run(cmd, shell=True, check=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
def REPIC(obToPrint): '''REPIC stands for Read, Evaluate, Print In Comment. Call this function with an object obToPrint and it will rewrite the current file with the output in the comment in a line after this was called.''' cf = inspect.currentframe() callingFile = inspect.getfile(cf.f_back) callingLine = cf.f_back.f_lineno # print 'Line I am calling REPIC from:', callingLine for line in fileinput.input(callingFile, inplace=1): if callingLine == fileinput.filelineno(): # Make results, but get rid of newlines in output since that will break the comment: resultString = '#OUTPUT: ' + str(obToPrint).replace('\n','\\n') +'\n' writeIndex = line.rfind('\n') # Watch out for last line without newlines, there the end is just the line length. if '\n' not in line: writeIndex = len(line) # Replace old output and/or any comments: if '#' in line: writeIndex = line.rfind('#') output = line[0:writeIndex] + resultString else: output = line # If no REPIC, then don't change the line. sys.stdout.write(output)
def test_module_imports_are_direct(): my_filename = abspath(inspect.getfile(inspect.currentframe())) my_dirname = dirname(my_filename) diagnose_imports_filename = join(my_dirname, 'diagnose_imports.py') diagnose_imports_filename = normpath(diagnose_imports_filename) process = subprocess.Popen( [ sys.executable, normpath(diagnose_imports_filename), '--problems', '--by-importer' ], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=-1) output, _ = process.communicate() assert output == '', "There are import problems:\n" + output.decode()
def setUpClass(cls): cls.moduleName = os.path.splitext(inspect.getfile(cls))[0] cls.log = logging.getLogger(cls.__name__) # Set logging to file and stdout if the LOG_LEVEL environment variable # is set. if os.getenv('LOG_LEVEL'): # Set up formatter. log_fmt = ('{}.%(funcName)s:%(levelname)s:%(asctime)s:' ' %(message)s'.format(cls.__name__)) formatter = logging.Formatter(log_fmt) # Set up the file handler. log_file_name = '%s.log' % cls.moduleName file_handler = logging.FileHandler(log_file_name) file_handler.setFormatter(formatter) cls.log.addHandler(file_handler) # Set the logging level from the environment variable, defaulting # to INFO if it is not a valid level. level = logging._nameToLevel.get(os.getenv('LOG_LEVEL'), logging.INFO) cls.log.setLevel(level)
def test_static_type_check_with_mypy(): commitmsg_file = inspect.getfile(commitmsg) changelog_file = inspect.getfile(changelog) params = ['--ignore-missing-imports', commitmsg_file, changelog_file] result = api.run(params) if result[0]: # FIXME: begin: There are bugs in mypy # * support iteration on enums see https://github.com/python/mypy/issues/2305 # * support NamedTuple # So, we have to remove irrelevant errors check_type_errors = "\n".join( (error for error in result[0].strip().split("\n") if error.split("error: ")[1] not in ( '"CommitType" expects no type arguments, but 1 given', 'Invalid type "commit_type_str"', 'Iterable expected', '"CommitType" has no attribute "__iter__"', 'Right hand side values are not supported in NamedTuple', 'Invalid statement in NamedTuple definition; expected "field_name: field_type"' ))) # FIXME: end if len(check_type_errors) > 0: raise (Exception(check_type_errors)) if result[1]: raise (Exception(result[1]))
def pysourcefiles(self): """All source files of the actual models Python classes and their respective base classes.""" sourcefiles = set() for (name, child) in vars(self).items(): try: parents = inspect.getmro(child) except AttributeError: continue for parent in parents: try: sourcefile = inspect.getfile(parent) except TypeError: break sourcefiles.add(sourcefile) return Lines(*sourcefiles)
def set(subpackage_name, class_name, config=None): """ This function ... :param subpackage_name: :param class_name: :param config: :return: """ # Determine the path to the default configuration file subpackage_directory = os.path.join(inspect.getfile(inspect.currentframe()).split("/core")[0], subpackage_name) default_config = os.path.join(subpackage_directory, "config", class_name + ".cfg") # If we have not created a default configuration file for this class yet ... if not fs.is_file(default_config): default_config = os.path.join(introspection.pts_package_dir, "core", "config", "default.cfg") # Open the default configuration if no configuration file is specified, otherwise adjust the default # settings according to the user defined configuration file if config is None: return open(default_config) else: return open(config, default_config) # -----------------------------------------------------------------
def _get_root_brain_path(): """ Return the full path of the default brain file :Example: brain.brain_file = cls._get_root_brain_path() .. raises:: IOError .. warnings:: Static method and Private """ # get current script directory path. We are in /an/unknown/path/kalliope/core/ConfigurationManager cur_script_directory = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))) # get parent dir. Now we are in /an/unknown/path/kalliope parent_dir = os.path.normpath(cur_script_directory + os.sep + os.pardir + os.sep + os.pardir) brain_path = parent_dir + os.sep + "brain.yml" logger.debug("Real brain.yml path: %s" % brain_path) if os.path.isfile(brain_path): return brain_path raise IOError("Default brain.yml file not found")
def setUp(self): # get current script directory path. We are in /an/unknown/path/kalliope/core/tests cur_script_directory = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))) # get parent dir. Now we are in /an/unknown/path/kalliope root_dir = os.path.normpath(cur_script_directory + os.sep + os.pardir) # get the neuron dir self.neurons_dir = os.path.normpath(root_dir + os.sep + "kalliope/neurons") # get stt dir self.stt_dir = os.path.normpath(root_dir + os.sep + "kalliope/stt") # get tts dir self.tts_dir = os.path.normpath(root_dir + os.sep + "kalliope/tts") # get trigger dir self.trigger_dir = os.path.normpath(root_dir + os.sep + "kalliope/trigger")
def test_context_samples_found(self): all_plugins = context.Context.get_all() context_samples_path = os.path.join(self.samples_path, "contexts") for p in all_plugins: if not inspect.getfile(p).startswith( os.path.dirname(xrally_docker.__file__)): # except contexts which belongs to tests module continue elif p.get_name() == "users": # users is a dummy context for doing nothing continue file_name = p.get_name().replace("_", "-") file_path = os.path.join(context_samples_path, file_name) if not os.path.exists("%s.json" % file_path): self.fail(("There is no json sample file of %s," "plugin location: %s.json" % (p.get_name(), file_path)))
def trace(): """Determine information about where an error was thrown. Returns: tuple: line number, filename, error message """ import traceback, inspect tb = sys.exc_info()[2] tbinfo = traceback.format_tb(tb)[0] filename = inspect.getfile(inspect.currentframe()) # script name + line number line = tbinfo.split(", ")[1] # Get Python syntax error # synerror = traceback.format_exc().splitlines()[-1] return line, filename, synerror
def trace(): """Determines information about where an error was thrown. Returns: tuple: line number, filename, error message Examples: >>> try: ... 1/0 ... except: ... print("Error on '{}'\\nin file '{}'\\nwith error '{}'".format(*trace())) ... Error on 'line 1234' in file 'C:\\foo\\baz.py' with error 'ZeroDivisionError: integer division or modulo by zero' """ tb = sys.exc_info()[2] tbinfo = traceback.format_tb(tb)[0] filename = inspect.getfile(inspect.currentframe()) # script name + line number line = tbinfo.split(", ")[1] # Get Python syntax error # synerror = traceback.format_exc().splitlines()[-1] return line, filename, synerror
def vcr(self): """ Returns a new vcrpy instance. """ cassettes_dir = join(dirname(getfile(self.__class__)), 'cassettes') kwargs = { 'record_mode': getattr(self, 'vcrpy_record_mode', 'once'), 'cassette_library_dir': cassettes_dir, 'match_on': ['method', 'scheme', 'host', 'port', 'path', 'query'], 'filter_query_parameters': FILTER_QUERY_PARAMS, 'filter_post_data_parameters': FILTER_QUERY_PARAMS, 'before_record_response': IGittTestCase.remove_link_headers, 'filter_headers': ['Link'], } kwargs.update(self.vcr_options) return VCR(**kwargs)
def describe_fluent_content_plugin(plugin): render_template = plugin.render_template if render_template: try: path_to_render_template = get_template(render_template).origin.name except TemplateDoesNotExist: path_to_render_template = render_template else: path_to_render_template = None return { 'class': dotted_path_object_type(plugin), 'source_file': inspect.getfile(type(plugin)), 'verbose_name': force_text(plugin.verbose_name), 'render_template': render_template, 'path_to_render_template': path_to_render_template, }
def _molpxdir(join=None): r""" return the directory where molpx is installed :param join: str, default is None _datadir(join='myfile.dat') will return os.path.join(_datadir(),'myfile.dat') :return: directory or filename where the data for the notebook lies """ import molpx if join is None: return _os.path.dirname(getfile(molpx)) else: assert isinstance(join,str), ("parameter join can only be a string", type(join)) return _os.path.join(_os.path.dirname(getfile(molpx)), join) # For python 2.7 compatibility if we don't want to depend also on backports # http://stackoverflow.com/questions/19296146/tempfile-temporarydirectory-context-manager-in-python-2-7