我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用pkg_resources.resource_filename()。
def prepare_zip(): from pkg_resources import resource_filename as resource from config import config from json import dumps logger.info('creating/updating gimel.zip') with ZipFile('gimel.zip', 'w', ZIP_DEFLATED) as zipf: info = ZipInfo('config.json') info.external_attr = 0o664 << 16 zipf.writestr(info, dumps(config)) zipf.write(resource('gimel', 'config.py'), 'config.py') zipf.write(resource('gimel', 'gimel.py'), 'gimel.py') zipf.write(resource('gimel', 'logger.py'), 'logger.py') for root, dirs, files in os.walk(resource('gimel', 'vendor')): for file in files: real_file = os.path.join(root, file) relative_file = os.path.relpath(real_file, resource('gimel', '')) zipf.write(real_file, relative_file)
def test_pydist(): """Make sure pydist.json exists and validates against our schema.""" # XXX this test may need manual cleanup of older wheels import jsonschema def open_json(filename): return json.loads(open(filename, 'rb').read().decode('utf-8')) pymeta_schema = open_json(resource_filename('wheel.test', 'pydist-schema.json')) valid = 0 for dist in ("simple.dist", "complex-dist"): basedir = pkg_resources.resource_filename('wheel.test', dist) for (dirname, subdirs, filenames) in os.walk(basedir): for filename in filenames: if filename.endswith('.whl'): whl = ZipFile(os.path.join(dirname, filename)) for entry in whl.infolist(): if entry.filename.endswith('/metadata.json'): pymeta = json.loads(whl.read(entry).decode('utf-8')) jsonschema.validate(pymeta, pymeta_schema) valid += 1 assert valid > 0, "No metadata.json found"
def create_params_file(self, fname): msg = QMessageBox() msg.setIcon(QMessageBox.Question) msg.setText("Parameter file %r not found, do you want SpyKING CIRCUS to " "create it for you?" % fname) msg.setWindowTitle("Generate parameter file?") msg.setInformativeText("This will create a parameter file from a " "template file and open it in your system's " "standard text editor. Fill properly before " "launching the code. See the documentation " "for details") msg.setStandardButtons(QMessageBox.Yes | QMessageBox.No) answer = msg.exec_() if answer == QMessageBox.Yes: user_path = os.path.join(os.path.expanduser('~'), 'spyking-circus') if os.path.exists(user_path + 'config.params'): config_file = os.path.abspath(user_path + 'config.params') else: config_file = os.path.abspath( pkg_resources.resource_filename('circus', 'config.params')) shutil.copyfile(config_file, fname) self.params = fname self.last_log_file = fname.replace('.params', '.log') self.update_params()
def init_gui_layout(self): gui_fname = pkg_resources.resource_filename('circus', os.path.join('qt_GUI', 'qt_merge.ui')) if comm.rank == 0: self.ui = uic.loadUi(gui_fname, self) # print dir(self.ui) self.score_ax1 = self.ui.score_1.axes self.score_ax2 = self.ui.score_2.axes self.score_ax3 = self.ui.score_3.axes self.waveforms_ax = self.ui.waveforms.axes self.detail_ax = self.ui.detail.axes self.data_ax = self.ui.data_overview.axes self.current_order = self.ui.cmb_sorting.currentIndex() self.mpl_toolbar = NavigationToolbar(self.ui.waveforms, None) self.mpl_toolbar.pan() self.ui.show() else: self.ui = None
def init_gui_layout(self): gui_fname = pkg_resources.resource_filename('circus', os.path.join('qt_GUI', 'qt_preview.ui')) self.ui = uic.loadUi(gui_fname, self) self.electrode_ax = self.ui.electrodes.axes self.data_x = self.ui.raw_data.axes if self.show_fit: self.ui.btn_next.clicked.connect(self.increase_time) self.ui.btn_prev.clicked.connect(self.decrease_time) else: self.ui.btn_next.setVisible(False) self.ui.btn_prev.setVisible(False) # Toolbar will not be displayed self.mpl_toolbar = NavigationToolbar(self.ui.raw_data, None) self.mpl_toolbar.pan() self.ui.show()
def generate_pdf(card): """ Make a PDF from a card :param card: dict from fetcher.py :return: Binary PDF buffer """ from eclaire.base import SPECIAL_LABELS pdf = FPDF('L', 'mm', (62, 140)) pdf.set_margins(2.8, 2.8, 2.8) pdf.set_auto_page_break(False, margin=0) pdf.add_page() font = pkg_resources.resource_filename('eclaire', 'font/Clairifont.ttf') pdf.add_font('Clairifont', fname=font, uni=True) pdf.set_font('Clairifont', size=48) pdf.multi_cell(0, 18, txt=card.name.upper(), align='L') qrcode = generate_qr_code(card.url) qrcode_file = mktemp(suffix='.png', prefix='trello_qr_') qrcode.save(qrcode_file) pdf.image(qrcode_file, 118, 35, 20, 20) os.unlink(qrcode_file) # May we never speak of this again. pdf.set_fill_color(255, 255, 255) pdf.rect(0, 55, 140, 20, 'F') pdf.set_font('Clairifont', '', 16) pdf.set_y(-4) labels = ', '.join([label.name for label in card.labels if label.name not in SPECIAL_LABELS]) pdf.multi_cell(0, 0, labels, 0, 'R') return pdf.output(dest='S')
def test_installation(): ''' Test the installation ''' import pkg_resources PYNUFFT_PATH = pkg_resources.resource_filename('pynufft', './') DATA_PATH = pkg_resources.resource_filename('pynufft', 'src/data/') import os.path print('Does pynufft.py exist? ',os.path.isfile(PYNUFFT_PATH+'pynufft.py')) print('Does om1D.npz exist?',os.path.isfile(DATA_PATH+'om1D.npz')) print('Does om2D.npz exist?',os.path.isfile(DATA_PATH+'om2D.npz')) print('Does om3D.npz exist?',os.path.isfile(DATA_PATH+'om3D.npz')) print('Does phantom_3D_128_128_128.npz exist?', os.path.isfile(DATA_PATH+'phantom_3D_128_128_128.npz')) print('Does phantom_256_256.npz exist?', os.path.isfile(DATA_PATH+'phantom_256_256.npz')) print('Does example_1D.py exist?', os.path.isfile(PYNUFFT_PATH+'./tests/example_1D.py')) print('Does example_2D.py exist?', os.path.isfile(PYNUFFT_PATH+'./tests/example_2D.py')) for pkgname in ('reikna', 'pyopencl', 'pycuda'): error_code = test_pkg(pkgname) if 1 == error_code: break
def test_installation(): ''' Test the installation ''' import pkg_resources PYNUFFT_PATH = pkg_resources.resource_filename('pynufft', './') DATA_PATH = pkg_resources.resource_filename('pynufft', 'src/data/') import os.path print('Does pynufft.py exist? ',os.path.isfile(PYNUFFT_PATH+'pynufft.py')) print('Does om1D.npz exist?',os.path.isfile(DATA_PATH+'om1D.npz')) print('Does om2D.npz exist?',os.path.isfile(DATA_PATH+'om2D.npz')) print('Does om3D.npz exist?',os.path.isfile(DATA_PATH+'om3D.npz')) print('Does phantom_3D_128_128_128.npz exist?', os.path.isfile(DATA_PATH+'phantom_3D_128_128_128.npz')) print('Does phantom_256_256.npz exist?', os.path.isfile(DATA_PATH+'phantom_256_256.npz')) print('Does 1D_example.py exist?', os.path.isfile(PYNUFFT_PATH+'example/1D_example.py')) print('Does 2D_example.py exist?', os.path.isfile(PYNUFFT_PATH+'example/1D_example.py'))
def _transfer_ping_script(self, ssh): """ Transfert vping script to VM. Uses SCP to copy the ping script via the SSH client :param ssh: the SSH client :return: """ self.logger.info("Trying to transfer ping.sh") scp = SCPClient(ssh.get_transport()) ping_script = pkg_resources.resource_filename( 'functest.opnfv_tests.openstack.vping', 'ping.sh') try: scp.put(ping_script, "~/") except Exception: # pylint: disable=broad-except self.logger.error("Cannot SCP the file '%s'", ping_script) return False cmd = 'chmod 755 ~/ping.sh' # pylint: disable=unused-variable (stdin, stdout, stderr) = ssh.exec_command(cmd) for line in stdout.readlines(): print line return True
def __init__(self): """Initialize helper object.""" self.functest_test = pkg_resources.resource_filename( 'functest', 'opnfv_tests') self.conf_path = pkg_resources.resource_filename( 'functest', 'opnfv_tests/openstack/refstack_client/refstack_tempest.conf') self.defcore_list = pkg_resources.resource_filename( 'functest', 'opnfv_tests/openstack/refstack_client/defcore.txt') self.confpath = os.path.join(self.functest_test, self.conf_path) self.defcorelist = os.path.join(self.functest_test, self.defcore_list) self.parser = argparse.ArgumentParser() self.parser.add_argument( '-c', '--config', help='the file path of refstack_tempest.conf', default=self.confpath) self.parser.add_argument( '-t', '--testlist', help='Specify the file path or URL of a test list text file. ' 'This test list will contain specific test cases that ' 'should be tested.', default=self.defcorelist)
def test_resources(): # loading by resource cls = bob.bio.base.load_resource("pca", "algorithm") assert isinstance (cls, bob.bio.base.algorithm.PCA) # loading by configuration file cls = bob.bio.base.load_resource(pkg_resources.resource_filename("bob.bio.base.config.algorithm", "pca.py"), "algorithm") assert isinstance (cls, bob.bio.base.algorithm.PCA) # loading by instatiation cls = bob.bio.base.load_resource("bob.bio.base.algorithm.PCA(10, distance_function=scipy.spatial.distance.euclidean)", "algorithm", imports=['bob.bio.base', 'scipy.spatial']) assert isinstance (cls, bob.bio.base.algorithm.PCA) # get list of extensions extensions = bob.bio.base.extensions() assert isinstance(extensions, list) assert 'bob.bio.base' in extensions
def test_pydist(): """Make sure pydist.json exists and validates against our schema.""" # XXX this test may need manual cleanup of older wheels import jsonschema def open_json(filename): with open(filename, 'rb') as json_file: return json.loads(json_file.read().decode('utf-8')) pymeta_schema = open_json(resource_filename('wheel.test', 'pydist-schema.json')) valid = 0 for dist in ("simple.dist", "complex-dist"): basedir = pkg_resources.resource_filename('wheel.test', dist) for (dirname, subdirs, filenames) in os.walk(basedir): for filename in filenames: if filename.endswith('.whl'): whl = ZipFile(os.path.join(dirname, filename)) for entry in whl.infolist(): if entry.filename.endswith('/metadata.json'): pymeta = json.loads(whl.read(entry).decode('utf-8')) jsonschema.validate(pymeta, pymeta_schema) valid += 1 assert valid > 0, "No metadata.json found"
def setUp(self): file_path = resource_filename(Requirement.parse('search_google'), 'search_google/config.json') with open(file_path, 'r') as in_file: defaults = json.load(in_file) buildargs = { 'serviceName': 'customsearch', 'version': 'v1', 'developerKey': defaults['build_developerKey'] } cseargs = { 'q': 'google', 'num': 1, 'fileType': 'png', 'cx': defaults['cx'] } self.results = search_google.api.results(buildargs, cseargs) tempfile = TemporaryFile() self.tempfile = str(tempfile.name) tempfile.close() self.tempdir = str(TemporaryDirectory().name)
def register_adapters(): global adapters_registered if adapters_registered is True: return try: import pkg_resources packageDir = pkg_resources.resource_filename('pyamf', 'adapters') except: packageDir = os.path.dirname(__file__) for f in glob.glob(os.path.join(packageDir, '*.py')): mod = os.path.basename(f).split(os.path.extsep, 1)[0] if mod == '__init__' or not mod.startswith('_'): continue try: register_adapter(mod[1:].replace('_', '.'), PackageImporter(mod)) except ImportError: pass adapters_registered = True
def _instantiate(self, rsc): # First, load the pump with open(resource_filename(__name__, os.path.join(rsc, 'pump.pkl')), 'rb') as fd: self.pump = pickle.load(fd) # Now load the model with open(resource_filename(__name__, os.path.join(rsc, 'model_spec.pkl')), 'rb') as fd: spec = pickle.load(fd) self.model = keras.models.model_from_config(spec) # And the model weights self.model.load_weights(resource_filename(__name__, os.path.join(rsc, 'model.h5'))) # And the version number with open(resource_filename(__name__, os.path.join(rsc, 'version.txt')), 'r') as fd: self.version = fd.read().strip()
def populate_table(table, data_file, print_msg): """Method to populate a table with records stored in a dict loaded from a json file Args: table(api.aws.DynamoTable): the table to write to data_json(dict): the data to write. Should be an array in an object named 'data' Returns: None """ with open(os.path.join(resource_filename("manage", "data"), data_file), 'rt') as df: data = json.load(df) if len(data["data"]) == 1: # Assume this is a campaign file for now. print(" - Example campaign loaded: https://<your_stack>/campaign.html?id={}".format(data["data"][0]["campaign_id"])) for item in data["data"]: table.put_item(item) print(print_msg)
def get_code(self): """Zip up the code and return bytes Returns: bytes """ with open(tempfile.NamedTemporaryFile().name, 'w') as zf: zfh = zipfile.ZipFile(zf.name, mode='w') old_path = os.getcwd() os.chdir(os.path.join(resource_filename("manage", "configs"))) zfh.write("url_rewrite.js") zfh.close() zf.close() os.chdir(old_path) with open(zf.name, "rb") as zfr: return zfr.read()
def main(): insurance_lib_data_dir = resource_filename('resources', 'insurance_lib_v2') print('Using data from {}'.format(insurance_lib_data_dir)) # Either re-use an existing collection id by over riding the below, or leave as is to create one collection_id = "TestCollection-InsLibV2" discovery = DiscoveryProxy() collection_id = discovery.setup_collection(collection_id=collection_id, config_id="889a08c9-cad9-4287-a87d-2f0380363bff") discovery.print_collection_stats(collection_id) # This thing seems to misbehave when run from python notebooks due to its use of multiprocessing, so just # running in a script discovery.upload_documents(collection_id=collection_id, corpus=document_corpus_as_iterable( path.join(insurance_lib_data_dir, 'document_corpus.solr.xml'))) discovery.print_collection_stats(collection_id)
def test_cached_phrases_no_files(self, corpus_base_path, doc_content_stream): from eea.corpus.processing.phrases.process import cached_phrases from pkg_resources import resource_filename base_path = resource_filename('eea.corpus', 'tests/fixtures/') corpus_base_path.return_value = base_path # we want the B.phras.* files in fixtures env = {'phash_id': 'X', 'file_name': 'ignore'} settings = {} stream = cached_phrases(doc_content_stream, env, settings) with pytest.raises(StopIteration): next(stream)
def test_cached_phrases_cached_files(self, corpus_base_path, doc_content_stream): # TODO: this test should be improved. Text quality should be tested from eea.corpus.processing.phrases.process import cached_phrases from pkg_resources import resource_filename base_path = resource_filename('eea.corpus', 'tests/fixtures/') corpus_base_path.return_value = base_path # we want the B.phras.* files in fixtures env = {'phash_id': 'B', 'file_name': 'ignore'} settings = {} stream = cached_phrases(doc_content_stream, env, settings) doc = next(stream) assert 'water_stress_conditions' in doc.text assert 'positive_development' in doc.text
def test_preview_phrases_with_cache_files(self, corpus_base_path): from eea.corpus.processing.phrases.process import preview_phrases from pkg_resources import resource_filename base_path = resource_filename('eea.corpus', 'tests/fixtures/') corpus_base_path.return_value = base_path content = ['hello', 'world'] env = { 'file_name': 'x.csv', 'text_column': 'text', 'phash_id': 'B', } stream = preview_phrases(content, env, {}) assert list(stream) == []
def test_preview_phrases_nocache_files_with_job(self, corpus_base_path, get_assigned_job): from eea.corpus.processing.phrases.process import preview_phrases from pkg_resources import resource_filename get_assigned_job.return_value = Mock(id='job1') base_path = resource_filename('eea.corpus', 'tests/fixtures/') corpus_base_path.return_value = base_path content = ['hello', 'world'] env = { 'file_name': 'x.csv', 'text_column': 'text', 'phash_id': 'X', } stream = preview_phrases(content, env, {}) assert list(stream) == ['hello', 'world']
def test_produce_phrases_with_no_job(self, cached_phrases, corpus_base_path, get_pipeline_for_component, build_phrases ): from eea.corpus.processing.phrases.process import produce_phrases from pkg_resources import resource_filename content = ['hello', 'world'] env = {'phash_id': 'X', 'file_name': 'x.csv', 'text_column': 'text'} base_path = resource_filename('eea.corpus', 'tests/fixtures/') corpus_base_path.return_value = base_path cached_phrases.return_value = ['something', 'else'] stream = produce_phrases(content, env, {}) assert list(stream) == ['something', 'else'] assert corpus_base_path.call_count == 1 assert get_pipeline_for_component.call_count == 1 assert build_phrases.call_count == 1 assert cached_phrases.call_count == 1
def test_produce_phrases_with_ok_job(self, cached_phrases, corpus_base_path, get_pipeline_for_component, build_phrases, get_job_finish_status ): from eea.corpus.processing.phrases.process import produce_phrases from pkg_resources import resource_filename content = ['hello', 'world'] env = {'phash_id': 'X', 'file_name': 'x.csv', 'text_column': 'text'} base_path = resource_filename('eea.corpus', 'tests/fixtures/') corpus_base_path.return_value = base_path cached_phrases.return_value = ['something', 'else'] get_job_finish_status.return_value = True stream = produce_phrases(content, env, {}) assert list(stream) == ['something', 'else'] assert corpus_base_path.call_count == 1 assert get_pipeline_for_component.call_count == 0 assert build_phrases.call_count == 0 assert cached_phrases.call_count == 1
def test_phrase_model_status(self, get_assigned_job): from eea.corpus.processing.phrases.views import phrase_model_status from pkg_resources import resource_filename base_path = resource_filename('eea.corpus', 'tests/fixtures/') o_st = phrase_model_status.__globals__['CORPUS_STORAGE'] phrase_model_status.__globals__['CORPUS_STORAGE'] = base_path req = Mock(matchdict={'phash_id': 'A'}) assert phrase_model_status(req) == {'status': 'OK'} get_assigned_job.return_value = None req = Mock(matchdict={'phash_id': 'X'}) assert phrase_model_status(req) == {'status': 'unavailable'} job = Mock() get_assigned_job.return_value = job job.get_status.return_value = '_job_status_here_' assert phrase_model_status(req) == {'status': 'preview__job_status_here_'} phrase_model_status.__globals__['CORPUS_STORAGE'] = o_st
def test_build_pipeline_for_preview(self, upload_location): from eea.corpus.processing import build_pipeline from pkg_resources import resource_filename file_name = 'test.csv' upload_location.return_value = resource_filename( 'eea.corpus', 'tests/fixtures/test.csv') text_column = 'text' pipeline = [ ('eea_corpus_processing_limit_process', 'ABC', {'max_count': 2}) ] stream = build_pipeline(file_name, text_column, pipeline, preview_mode=True) docs = list(stream) assert len(docs) == 2
def dashboard(global_config, **settings): """ WSGI entry point for the Flask app RQ Dashboard """ redis_uri = os.environ.get('REDIS_URL', 'redis://localhost:6379/0') p = parse.urlparse(redis_uri) host, port = p.netloc.split(':') db = len(p.path) > 1 and p.path[1:] or '0' redis_settings = { 'REDIS_URL': redis_uri, 'REDIS_DB': db, 'REDIS_HOST': host, 'REDIS_PORT': port, } app = Flask(__name__, static_url_path="/static", static_folder=resource_filename("rq_dashboard", "static") ) app.config.from_object(rq_dashboard.default_settings) app.config.update(redis_settings) app.register_blueprint(rq_dashboard.blueprint) return app.wsgi_app
def example_audio_file(): """Get path of audio file. Returns: str: Path of the example audio file. See also: :func:`example_label_file` Examples: >>> from nnmnkwii.util import example_audio_file >>> from scipy.io import wavfile >>> fs, x = wavfile.read(example_audio_file()) """ name = "arctic_a0009" wav_path = pkg_resources.resource_filename( __name__, '_example_data/{}.wav'.format(name)) return wav_path
def example_question_file(): """Get path of example question file. The question file was taken from Merlin_. .. _Merlin: https://github.com/CSTR-Edinburgh/merlin Returns: str: Path of the example audio file. Examples: >>> from nnmnkwii.util import example_question_file >>> from nnmnkwii.io import hts >>> binary_dict, continuous_dict = hts.load_question_set(example_question_file()) """ return pkg_resources.resource_filename( __name__, '_example_data/questions-radio_dnn_416.hed')
def __init__(self, weights_path=None, vocab_path=None): if weights_path is None: weights_path = resource_filename(__name__, 'reactionrnn_weights.hdf5') if vocab_path is None: vocab_path = resource_filename(__name__, 'reactionrnn_vocab.json') with open(vocab_path, 'r') as json_file: self.vocab = json.load(json_file) self.tokenizer = Tokenizer(filters='', char_level=True) self.tokenizer.word_index = self.vocab self.num_classes = len(self.vocab) + 1 self.model = reactionrnn_model(weights_path, self.num_classes) self.model_enc = Model(inputs=self.model.input, outputs=self.model.get_layer('rnn').output)
def main(argv=None): args = docopt(__doc__, argv=argv, version='1.0.3') assert 'config.toml' in (p.name for p in Path().iterdir()), "config.toml not found in directory. Are you sure you're in the project's root?" if args['--init']: notebooks_dir = Path('./notebooks/') notebooks_dir.mkdir(exist_ok=True) with open(resource_filename('hugo_jupyter', '__fabfile.py')) as fp: fabfile = Path('fabfile.py') fabfile.write_text(fp.read()) print(dedent(""" Successfully initialized. From this directory, the following commands are available. Just remember to prepend them with `fab` """)) run(('fab', '-l'))
def test_copy_config(self): tempdir = tempfile.mkdtemp() server_root = pkg_resources.resource_filename(__name__, "testdata") letshelp_le_apache.copy_config(server_root, tempdir) temp_testdata = os.path.join(tempdir, "testdata") self.assertFalse(os.path.exists(os.path.join( temp_testdata, os.path.basename(_PASSWD_FILE)))) self.assertFalse(os.path.exists(os.path.join( temp_testdata, os.path.basename(_KEY_FILE)))) self.assertFalse(os.path.exists(os.path.join( temp_testdata, os.path.basename(_SECRET_FILE)))) self.assertTrue(os.path.exists(os.path.join( temp_testdata, _PARTIAL_CONF_PATH))) self.assertTrue(os.path.exists(os.path.join( temp_testdata, _PARTIAL_LINK_PATH)))
def pkg_file(path): return pkg_resources.resource_filename( pkg_resources.Requirement.parse("docker-utils-aba"), os.path.join("docker_utils_aba", path))
def __init__(self, *args): self.log = logging.getLogger(resource_filename(__name__, __file__)) self.cluster = Cluster(args[0]) self.connection = self.cluster.connect() self.connection.row_factory = tuple_factory self.connection.execute("CREATE KEYSPACE IF NOT EXISTS public \ WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };") self.connection.execute("CREATE KEYSPACE IF NOT EXISTS internal \ WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };") self.connection.execute("CREATE TABLE IF NOT EXISTS public.users ( \ name text PRIMARY KEY, \ n text, \ e text, \ secret text);") self.connection.execute("CREATE TABLE IF NOT EXISTS public.contracts ( \ id uuid PRIMARY KEY, \ owner text, \ package text, \ template blob, \ example blob);")
def __init__(self, *args): self.log = logging.getLogger(resource_filename(__name__, __file__)) self.engine = create_engine(args[0]) self.connection = self.engine.connect()
def __init__(self, round_number): self.round_number = round_number self.train_file_name = 'r' + str(round_number) + '_numerai_training_data.csv' self.test_file_name = 'r' + str(round_number) + '_numerai_tournament_data.csv' self.sorted_file_name = 'r' + str(round_number) + '_numerai_sorted_training_data.csv' if not os.path.exists(resource_filename('numerai.data', self.train_file_name)): raise IOError('File {} not found.'.format(self.train_file_name)) if not os.path.exists(resource_filename('numerai.data', self.test_file_name)): raise IOError('File {} not found.'.format(self.test_file_name))
def training_set(self): return pd.read_csv(resource_filename('numerai.data', self.train_file_name))
def test_set(self): return pd.read_csv(resource_filename('numerai.data', self.test_file_name))
def sorted_training_set(self): return pd.read_csv(resource_filename('numerai.data', self.sorted_file_name))
def teardown_module(): """Delete eggs/wheels created by tests.""" base = pkg_resources.resource_filename('wheel.test', '') for dist in test_distributions: for subdir in ('build', 'dist'): try: rmtree(os.path.join(base, dist, subdir)) except OSError: pass
def build_wheel(): """Build wheels from test distributions.""" for dist in test_distributions: pwd = os.path.abspath(os.curdir) distdir = pkg_resources.resource_filename('wheel.test', dist) os.chdir(distdir) try: sys.argv = ['', 'bdist_wheel'] exec(compile(open('setup.py').read(), 'setup.py', 'exec')) finally: os.chdir(pwd)
def build_egg(): """Build eggs from test distributions.""" for dist in test_distributions: pwd = os.path.abspath(os.curdir) distdir = pkg_resources.resource_filename('wheel.test', dist) os.chdir(distdir) try: sys.argv = ['', 'bdist_egg'] exec(compile(open('setup.py').read(), 'setup.py', 'exec')) finally: os.chdir(pwd)
def test_egg_re(): """Make sure egg_info_re matches.""" egg_names = open(pkg_resources.resource_filename('wheel', 'eggnames.txt')) for line in egg_names: line = line.strip() if not line: continue assert egg2wheel.egg_info_re.match(line), line