我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用tempfile.TemporaryDirectory()。
def test_zipfile_timestamp(): # An environment variable can be used to influence the timestamp on # TarInfo objects inside the zip. See issue #143. TemporaryDirectory is # not a context manager under Python 3. with temporary_directory() as tempdir: for filename in ('one', 'two', 'three'): path = os.path.join(tempdir, filename) with codecs.open(path, 'w', encoding='utf-8') as fp: fp.write(filename + '\n') zip_base_name = os.path.join(tempdir, 'dummy') # The earliest date representable in TarInfos, 1980-01-01 with environ('SOURCE_DATE_EPOCH', '315576060'): zip_filename = wheel.archive.make_wheelfile_inner( zip_base_name, tempdir) with readable_zipfile(zip_filename) as zf: for info in zf.infolist(): assert info.date_time[:3] == (1980, 1, 1)
def testPrintCopy(self): """test the print for a copy""" with tempfile.TemporaryDirectory() as tmpdir: fake_path_helper = \ fake_sqlite_plugin_path_helper.FakeSQLitePluginPathHelper( self.template_path, 'test', 'db') path = os.path.join(tmpdir, 'testfile') generator = sqlite_generator.SQLiteGenerator( tmpdir, 'test', 'test', ['test'], output_handler_file.OutputHandlerFile(path, file_handler.FileHandler()), self.plugin_helper, fake_path_helper) generator._PrintCopy(path) # pylint: disable=protected-access expected = "copy " + path actual = self._ReadFromFile(path) self.assertEqual(expected, actual)
def testPrintEdit(self): """test the print for a edit""" with tempfile.TemporaryDirectory() as tmpdir: fake_path_helper = \ fake_sqlite_plugin_path_helper.FakeSQLitePluginPathHelper( self.template_path, 'test', 'db') path = os.path.join(tmpdir, 'testfile') generator = sqlite_generator.SQLiteGenerator( tmpdir, 'test', 'test', ['test'], output_handler_file.OutputHandlerFile(path, file_handler.FileHandler()), self.plugin_helper, fake_path_helper) generator._PrintEdit(path) # pylint: disable=protected-access expected = 'edit ' + path actual = self._ReadFromFile(path) self.assertEqual(expected, actual)
def testPrint(self): """test print""" with tempfile.TemporaryDirectory() as tmpdir: fake_path_helper = \ fake_sqlite_plugin_path_helper.FakeSQLitePluginPathHelper( self.template_path, 'test', 'db') path = os.path.join(tmpdir, 'testfile') generator = sqlite_generator.SQLiteGenerator( tmpdir, 'test', 'test', ['test'], output_handler_file.OutputHandlerFile( path, file_handler.FileHandler()), self.plugin_helper, fake_path_helper) arguments = 'test1', 'test2', 'test3', 'test4', 'test5', 'test6', 'test7' generator._Print(*arguments) # pylint: disable=protected-access actual = self._ReadFromFile(path) expected = 'create test1create test2create test3create test4copy ' \ 'test5create test6create test7' self.assertEqual(expected, actual)
def testPluginNameIfExisting(self): """test method after getting the plugin Name from the user if the plugin Name already exists""" with tempfile.TemporaryDirectory() as tmpdir: path = os.path.join(tmpdir, 'testfile') pathlib.Path(path).touch() output_handler = output_handler_file.OutputHandlerFile( path, file_handler.FileHandler(), prompt_info='the_plugin', prompt_error='the_plugin', ) plugin_helper = fake_sqlite_plugin_helper.FakeSQLitePluginHelper( plugin_exists=True, change_bool_after_every_call_plugin_exists=True, valid_name=True) controller = sqlite_controller.SQLiteController(output_handler, plugin_helper) actualName = 'the_plugin' controller._path = 'somepath' actual = controller.PluginName(None, None, actualName) expected = 'Plugin exists. Choose new Name' actual_prompt = self._ReadFromFile(path) self.assertEqual(expected, actual_prompt) self.assertEqual(actualName, actual)
def testCreateSQLQueryModelWithUserInputWithError(self): """test method CreateEventModelWithUserInput""" error_message = "Some Error..." fake_execution = fake_sqlite_query_execution.SQLQueryExecution( sql_query_data.SQLQueryData(has_error=True, error_message=error_message) ) sql_query = 'SELECT createdDate FROM Users ORDER BY createdDate' name = 'Contact' with tempfile.TemporaryDirectory() as tmpdir: path = os.path.join(tmpdir, 'testfile') pathlib.Path(path).touch() output_handler = output_handler_file.OutputHandlerFile( path, file_handler.FileHandler(), prompt_info=name) plugin_helper = fake_sqlite_plugin_helper.FakeSQLitePluginHelper( folder_exists=True) controller = sqlite_controller.SQLiteController(output_handler, plugin_helper) actual = controller._CreateSQLQueryModelWithUserInput(sql_query, False, fake_execution) self.assertIsNone(actual)
def testSourcePathIfNotExisting(self): """test method after getting the source path from the user""" with tempfile.TemporaryDirectory() as tmpdir: path = os.path.join(tmpdir, 'testfile') pathlib.Path(path).touch() output_handler = output_handler_file.OutputHandlerFile( path, file_handler.FileHandler(), prompt_error='the source path') plugin_helper = fake_sqlite_plugin_helper.FakeSQLitePluginHelper( folder_exists=False, change_bool_after_every_call_folder_exists=True) controller = sqlite_controller.SQLiteController(output_handler, plugin_helper) actualPath = 'testpath' source_path = controller.SourcePath(None, None, actualPath) expected = 'Folder does not exists. Enter correct one' actual = self._ReadFromFile(path) self.assertEqual(expected, actual) self.assertEqual(source_path, 'the source path')
def testTestPathIfExisting(self): """test method after getting the source path from the user""" with tempfile.TemporaryDirectory() as tmpdir: path = os.path.join(tmpdir, 'testfile') pathlib.Path(path).touch() output_handler = output_handler_file.OutputHandlerFile( path, file_handler.FileHandler()) plugin_helper = fake_sqlite_plugin_helper.FakeSQLitePluginHelper( file_exists=True) controller = sqlite_controller.SQLiteController(output_handler, plugin_helper) actualPath = os.path.join(path_helper.TestDatabasePath(), 'twitter_ios.db') valid_path = controller.TestPath(None, None, actualPath) actual_output = self._ReadFromFile(path) self.assertEqual(actualPath, controller._testfile) self.assertEqual('', actual_output) self.assertEqual(valid_path, actualPath)
def testValidatePluginNameIfNotOk(self): """test the validate plugin Name method if not ok""" with tempfile.TemporaryDirectory() as tmpdir: path = os.path.join(tmpdir, 'testfile') pathlib.Path(path).touch() output_handler = output_handler_file.OutputHandlerFile( path, file_handler.FileHandler(), prompt_error='valid_name') plugin_helper = fake_sqlite_plugin_helper.FakeSQLitePluginHelper( valid_name=False, change_bool_after_every_call_valid_name=True) controller = sqlite_controller.SQLiteController(output_handler, plugin_helper) valid = controller._ValidatePluginName("the_wrong_plugin_") expected = ('Plugin is not in a valid format. Choose new Name [' 'plugin_name_...]') actual = self._ReadFromFile(path) self.assertEqual(expected, actual) self.assertEqual(valid, 'valid_name')
def testValidateRowNameIfNotOk(self): """test the validate row name method if not ok""" with tempfile.TemporaryDirectory() as tmpdir: path = os.path.join(tmpdir, 'testfile') pathlib.Path(path).touch() output_handler = output_handler_file.OutputHandlerFile( path, file_handler.FileHandler(), prompt_error='TheValidRowName') plugin_helper = fake_sqlite_plugin_helper.FakeSQLitePluginHelper( valid_row_name=False, change_bool_after_every_call_valid_row_name=True) controller = sqlite_controller.SQLiteController(output_handler, plugin_helper) valid = controller._ValidateRowName("theWrongName") expected = ('Row name is not in a valid format. Choose new Name [' 'RowName...]') actual = self._ReadFromFile(path) self.assertEqual(expected, actual) self.assertEqual(valid, 'TheValidRowName')
def testValidateTimestampStringIfNotOk(self): """test the validate timestamp string method if not ok""" with tempfile.TemporaryDirectory() as tmpdir: path = os.path.join(tmpdir, 'testfile') pathlib.Path(path).touch() output_handler = output_handler_file.OutputHandlerFile( path, file_handler.FileHandler(), prompt_error='this,that,bla') plugin_helper = fake_sqlite_plugin_helper.FakeSQLitePluginHelper( valid_comma_separated_string=False, change_bool_after_every_call_valid_comma_separated_string=True) controller = sqlite_controller.SQLiteController(output_handler, plugin_helper) valid = controller._ValidateTimestampString("this, that,bla") expected = ( 'Timestamps are not in valid format. Reenter them correctly [name,' 'name...]') actual = self._ReadFromFile(path) self.assertEqual(expected, actual) self.assertEqual(valid, 'this,that,bla')
def testValidateColumnStringIfNotOk(self): """test the validate column string method if not ok""" with tempfile.TemporaryDirectory() as tmpdir: path = os.path.join(tmpdir, 'testfile') pathlib.Path(path).touch() output_handler = output_handler_file.OutputHandlerFile( path, file_handler.FileHandler(), prompt_error='this,that,bla') plugin_helper = fake_sqlite_plugin_helper.FakeSQLitePluginHelper( valid_comma_separated_string=False, change_bool_after_every_call_valid_comma_separated_string=True) controller = sqlite_controller.SQLiteController(output_handler, plugin_helper) valid = controller._ValidateColumnString("this, that,bla") expected = ( 'Column names are not in valid format. Reenter them correctly [name,' 'name...]') actual = self._ReadFromFile(path) self.assertEqual(expected, actual) self.assertEqual(valid, 'this,that,bla')
def testGenerateIfNotConfirmed(self): """test the generate if confirmed """ template_path = path_helper.TemplatePath() with self.assertRaises(SystemExit): with tempfile.TemporaryDirectory() as tmpdir: file = os.path.join(tmpdir, 'testfile') pathlib.Path(file).touch() output_handler = output_handler_file.OutputHandlerFile( file, file_handler.FileHandler(), confirm=False) plugin_helper = fake_sqlite_plugin_helper.FakeSQLitePluginHelper( valid_name=False, change_bool_after_every_call_valid_name=True) controller = sqlite_controller.SQLiteController(output_handler, plugin_helper) controller.Generate('not used', 'not used') self.assertFalse(template_path)
def testCopyFile(self): """Tests if the copying of a file none existing beforhand works.""" expected_content = "this is test content." with tempfile.TemporaryDirectory() as tmpdir: source = os.path.join(tmpdir, self.file) destination = os.path.join(tmpdir, "copy", self.file) with open(source, "a") as f: f.write(expected_content) handler = file_handler.FileHandler() self.assertFalse(os.path.exists(destination)) handler.CopyFile(source, destination) self.assertTrue(os.path.exists(destination)) self.assertTrue(filecmp.cmp(destination, source))
def testAddContentIfFileExists(self): """Tests if the editing of a file existing works.""" content = "this is test content. " expected = content + content with tempfile.TemporaryDirectory() as tmpdir: source = os.path.join(tmpdir, self.file) with open(source, "a") as f: f.write(content) handler = file_handler.FileHandler() self.assertTrue(os.path.exists(source)) handler.AddContent(source, content) self.assertTrue(os.path.exists(source)) with open(source, "r") as f: actual = f.read() self.assertEqual(expected, actual)
def testCreateOrModifyFileWithContentIfFileExists(self): """Tests if the method create or modify file with content works, if the file exists""" content = "this is test content. " expected = content + content with tempfile.TemporaryDirectory() as tmpdir: source = os.path.join(tmpdir, self.file) with open(source, "a") as f: f.write(content) handler = file_handler.FileHandler() self.assertTrue(os.path.exists(source)) handler.CreateOrModifyFileWithContent(source, content) self.assertTrue(os.path.exists(source)) with open(source, "r") as f: actual = f.read() self.assertEqual(expected, actual)
def testAddContentIfFileAndFolderDoesNotExist(self): """Tests if the method create or modify file with content works, if the file and Folder does not exist""" content = "this is test content. " expected = content with tempfile.TemporaryDirectory() as tmpdir: new_path = os.path.join(tmpdir, "newfolder") source = os.path.join(new_path, self.file) handler = file_handler.FileHandler() self.assertFalse(os.path.exists(source)) handler.CreateOrModifyFileWithContent(source, content) self.assertTrue(os.path.exists(source)) with open(source, "r") as f: actual = f.read() self.assertEqual(expected, actual)
def testUserConfigOverrides(self): """Test that user configs override default.yaml w/ includes""" with TemporaryDirectory() as tmp: os.chdir(tmp) os.mkdir("recipes") with open("default.yaml", "w") as f: f.write("include:\n") f.write(" - included\n") f.write("environment:\n") f.write(" FOO: BAR\n") with open("included.yaml", "w") as f: f.write("environment:\n") f.write(" FOO: BAZ\n") with open("user.yaml", "w") as f: f.write("environment:\n") f.write(" FOO: USER\n") recipeSet = RecipeSet() recipeSet.setConfigFiles(["user"]) recipeSet.parse() assert recipeSet.defaultEnv() == { "FOO":"USER"}
def testChangeRemote(self): """Test that changed remotes in recipe are updated in the working copy""" s1 = self.createGitScm({ 'remote-bar' : 'http://bar.test/baz.git', }) s2 = self.createGitScm({ 'remote-bar' : 'http://bar.test/foo.git', }) with tempfile.TemporaryDirectory() as workspace: remotes = self.callAndGetRemotes(workspace, s1) self.assertEqual(remotes, { "origin" : self.repodir, 'bar' : 'http://bar.test/baz.git', }) remotes = self.callAndGetRemotes(workspace, s2) self.assertEqual(remotes, { "origin" : self.repodir, 'bar' : 'http://bar.test/foo.git', })
def testDirAndFile(self): """Test hashing a directory with one file. The hash sum should stay stable in the long run as this might be used for binary artifact matching in the future. """ with TemporaryDirectory() as tmp: os.mkdir(os.path.join(tmp, "dir")) with open(os.path.join(tmp, "dir", "file"), 'wb') as f: f.write(b'abc') sum1 = hashDirectory(tmp) assert len(sum1) == 20 assert sum1 == binascii.unhexlify( "640f516de78fba0b6d2ddde4451000f142d06b0d") sum2 = hashDirectory(tmp) assert sum1 == sum2
def testRewriteFile(self): """Changing the file content should change the hash sum""" with NamedTemporaryFile() as index: with TemporaryDirectory() as tmp: with open(os.path.join(tmp, "foo"), 'wb') as f: f.write(b'abc') sum1 = hashDirectory(tmp, index.name) with open(index.name, "rb") as f: assert f.read(4) == b'BOB1' with open(os.path.join(tmp, "foo"), 'wb') as f: f.write(b'qwer') sum2 = hashDirectory(tmp, index.name) with open(index.name, "rb") as f: assert f.read(4) == b'BOB1' assert sum1 != sum2
def testUploadPackageNoFail(self): """The nofail option must prevent fatal error on upload failures""" archive = self.__getArchiveInstance({"flags" : ["upload", "download", "nofail"]}) archive.wantUpload(True) with TemporaryDirectory() as tmp: # create simple workspace audit = os.path.join(tmp, "audit.json.gz") content = os.path.join(tmp, "workspace") with open(audit, "wb") as f: f.write(b"AUDIT") os.mkdir(content) with open(os.path.join(content, "data"), "wb") as f: f.write(b"DATA") # must not throw archive.uploadPackage(ERROR_UPLOAD_ARTIFACT, audit, content, 0) archive.uploadPackage(ERROR_UPLOAD_ARTIFACT, audit, content, 1) # also live-build-id upload errors must not throw with nofail archive.uploadLocalLiveBuildId(ERROR_UPLOAD_ARTIFACT, b'\x00', 0) archive.uploadLocalLiveBuildId(ERROR_UPLOAD_ARTIFACT, b'\x00', 1)
def testUploadJenkinsNoFail(self): """The nofail option must prevent fatal error on upload failures""" archive = self.__getArchiveInstance({"flags" : ["upload", "download", "nofail"]}) archive.wantUpload(True) with TemporaryDirectory() as tmp: with open(os.path.join(tmp, "error.buildid"), "wb") as f: f.write(ERROR_UPLOAD_ARTIFACT) self.__createArtifactByName(os.path.join(tmp, "result.tgz")) # these uploads must not fail even though they do not succeed script = archive.upload(None, "error.buildid", "result.tgz") callJenkinsScript(script, tmp) script = archive.uploadJenkinsLiveBuildId(None, "error.buildid", "test.buildid") callJenkinsScript(script, tmp)
def testInvalidServer(self): """Test download on non-existent server""" spec = { 'url' : "https://127.1.2.3:7257" } archive = SimpleHttpArchive(spec) archive.wantDownload(True) archive.wantUpload(True) # Local archive.downloadPackage(b'\x00'*20, "unused", "unused", 0) archive.downloadPackage(b'\x00'*20, "unused", "unused", 1) self.assertEqual(archive.downloadLocalLiveBuildId(b'\x00'*20, 0), None) # Jenkins with TemporaryDirectory() as workspace: with open(os.path.join(workspace, "test.buildid"), "wb") as f: f.write(b'\x00'*20) script = archive.download(None, "test.buildid", "result.tgz") callJenkinsScript(script, workspace)
def set_mapper_number(manifest_file): fastq_counts = 0 if manifest_file.startswith("s3://"): s3 = boto3.resource("s3") bucket_name, key_prefix = manifest_file.strip().strip("/")[5:].split("/", 1) with tempfile.TemporaryDirectory() as tmpdirname: s3.meta.client.download_file(bucket_name, key_prefix, tmpdirname + "/manifest") for line in open(tmpdirname+"/manifest"): fastq_counts += 1 else: for line in open(manifest_file): fastq_counts += 1 return fastq_counts
def set_mapper_number(manifest_file): accession_counts = 0 if manifest_file.startswith("s3://"): s3_client = boto3.resource("s3") bucket_name, key_prefix = manifest_file.strip().strip("/")[5:].split("/", 1) with tempfile.TemporaryDirectory() as tmpdirname: s3_client.Object(bucket_name, key_prefix).download_file(tmpdirname+"/manifest") for line in open(tmpdirname+"/manifest"): accession_counts += 1 else: for line in open(manifest_file): accession_counts += 1 return accession_counts
def copy(contents, config=None, destination_dir=False, **kwargs): if config is None: config = Config(xyz='123') with NamedTemporaryFile('w', delete=False) as tp: tp.write(contents) source = tp.name if destination_dir: with TemporaryDirectory() as destination: path = copy_file(config, source, destination, **kwargs) yield source, destination, path os.remove(source) else: destination = source + '.copy' path = copy_file(config, source, destination, **kwargs) yield source, destination, path os.remove(source) os.remove(path)
def setUp(self): file_path = resource_filename(Requirement.parse('search_google'), 'search_google/config.json') with open(file_path, 'r') as in_file: defaults = json.load(in_file) buildargs = { 'serviceName': 'customsearch', 'version': 'v1', 'developerKey': defaults['build_developerKey'] } cseargs = { 'q': 'google', 'num': 1, 'fileType': 'png', 'cx': defaults['cx'] } self.results = search_google.api.results(buildargs, cseargs) tempfile = TemporaryFile() self.tempfile = str(tempfile.name) tempfile.close() self.tempdir = str(TemporaryDirectory().name)
def initializePage(self): if self.wizard.import_type == 'directory': self.import_dir() self.unarchive_label.hide() self.unarchive_progressbar.hide() elif self.wizard.import_type == 'archive': self.tempdir = tempfile.TemporaryDirectory() self.archivepath = self.field('archivepath') self.unarchive() else: self.download_label.setVisible(True) self.download_progressbar.setVisible(True) self.unarchive_progressbar.setMaximum(1) self.tempdir = tempfile.TemporaryDirectory() self.archivepath = os.path.join(self.tempdir.name, 'db.7z') import_signals.download_complete.connect(self.unarchive) self.download_thread = DownloadThread( session, self.wizard.db_url, self.tempdir.name) import_signals.download_complete.connect(self.download_thread.exit) self.download_thread.start()
def test_spearman(self): alpha_div = pd.Series([2.0, 4.0, 6.0], name='alpha-div', index=['sample1', 'sample2', 'sample3']) md = qiime2.Metadata( pd.DataFrame({'value': ['1.0', '2.0', '3.0']}, index=['sample1', 'sample2', 'sample3'])) with tempfile.TemporaryDirectory() as output_dir: alpha_correlation(output_dir, alpha_div, md) index_fp = os.path.join(output_dir, 'index.html') self.assertTrue(os.path.exists(index_fp)) jsonp_fp = os.path.join(output_dir, 'category-value.jsonp') self.assertTrue(os.path.exists(jsonp_fp)) self.assertTrue('Spearman' in open(jsonp_fp).read()) self.assertTrue('"sampleSize": 3' in open(jsonp_fp).read()) self.assertTrue('"data":' in open(jsonp_fp).read()) self.assertFalse('filtered' in open(jsonp_fp).read())
def test_pearson(self): alpha_div = pd.Series([2.0, 4.0, 6.0], name='alpha-div', index=['sample1', 'sample2', 'sample3']) md = qiime2.Metadata( pd.DataFrame({'value': ['1.0', '2.0', '3.0']}, index=['sample1', 'sample2', 'sample3'])) with tempfile.TemporaryDirectory() as output_dir: alpha_correlation(output_dir, alpha_div, md, method='pearson') index_fp = os.path.join(output_dir, 'index.html') self.assertTrue(os.path.exists(index_fp)) jsonp_fp = os.path.join(output_dir, 'category-value.jsonp') self.assertTrue(os.path.exists(jsonp_fp)) self.assertTrue('Pearson' in open(jsonp_fp).read()) self.assertTrue('"sampleSize": 3' in open(jsonp_fp).read()) self.assertTrue('"data":' in open(jsonp_fp).read()) self.assertFalse('filtered' in open(jsonp_fp).read())
def test_alpha_group_significance(self): alpha_div = pd.Series([2.0, 4.0, 6.0], name='alpha-div', index=['sample1', 'sample2', 'sample3']) md = qiime2.Metadata( pd.DataFrame({'a or b': ['a', 'b', 'b']}, index=['sample1', 'sample2', 'sample3'])) with tempfile.TemporaryDirectory() as output_dir: alpha_group_significance(output_dir, alpha_div, md) index_fp = os.path.join(output_dir, 'index.html') self.assertTrue(os.path.exists(index_fp)) self.assertTrue(os.path.exists( os.path.join(output_dir, 'category-a%20or%20b.jsonp'))) self.assertTrue('Kruskal-Wallis (all groups)' in open(index_fp).read()) self.assertTrue('Kruskal-Wallis (pairwise)' in open(index_fp).read())
def test_alpha_group_significance_some_numeric(self): alpha_div = pd.Series([2.0, 4.0, 6.0], name='alpha-div', index=['sample1', 'sample2', 'sample3']) md = qiime2.Metadata( pd.DataFrame({'a or b': ['a', 'b', 'b'], 'bad': ['1.0', '2.0', '3.0']}, index=['sample1', 'sample2', 'sample3'])) with tempfile.TemporaryDirectory() as output_dir: alpha_group_significance(output_dir, alpha_div, md) index_fp = os.path.join(output_dir, 'index.html') self.assertTrue(os.path.exists(index_fp)) self.assertTrue(os.path.exists( os.path.join(output_dir, 'category-a%20or%20b.jsonp'))) self.assertFalse(os.path.exists( os.path.join(output_dir, 'bad-value.jsonp'))) self.assertTrue('not categorical:' in open(index_fp).read()) self.assertTrue('<strong>bad' in open(index_fp).read())
def test_alpha_group_significance_one_group_all_unique_values(self): alpha_div = pd.Series([2.0, 4.0, 6.0], name='alpha-div', index=['sample1', 'sample2', 'sample3']) md = qiime2.Metadata( pd.DataFrame({'a or b': ['a', 'b', 'b'], 'bad': ['x', 'y', 'z']}, index=['sample1', 'sample2', 'sample3'])) with tempfile.TemporaryDirectory() as output_dir: alpha_group_significance(output_dir, alpha_div, md) index_fp = os.path.join(output_dir, 'index.html') self.assertTrue(os.path.exists(index_fp)) self.assertTrue(os.path.exists( os.path.join(output_dir, 'category-a%20or%20b.jsonp'))) self.assertFalse(os.path.exists( os.path.join(output_dir, 'category-bad.jsonp'))) self.assertTrue('number of samples' in open(index_fp).read()) self.assertTrue('<strong>bad' in open(index_fp).read())
def test_alpha_group_significance_one_group_single_value(self): alpha_div = pd.Series([2.0, 4.0, 6.0], name='alpha-div', index=['sample1', 'sample2', 'sample3']) md = qiime2.Metadata( pd.DataFrame({'a or b': ['a', 'b', 'b'], 'bad': ['x', 'x', 'x']}, index=['sample1', 'sample2', 'sample3'])) with tempfile.TemporaryDirectory() as output_dir: alpha_group_significance(output_dir, alpha_div, md) index_fp = os.path.join(output_dir, 'index.html') self.assertTrue(os.path.exists(index_fp)) self.assertTrue(os.path.exists( os.path.join(output_dir, 'category-a%20or%20b.jsonp'))) self.assertFalse(os.path.exists( os.path.join(output_dir, 'category-bad.jsonp'))) self.assertTrue('only a single' in open(index_fp).read()) self.assertTrue('<strong>bad' in open(index_fp).read())
def test_alpha_rarefaction_with_phylogeny_and_metadata(self): t = biom.Table(np.array([[100, 111, 113], [111, 111, 112]]), ['O1', 'O2'], ['S1', 'S2', 'S3']) p = skbio.TreeNode.read(io.StringIO( '((O1:0.25, O2:0.50):0.25, O3:0.75)root;')) md = qiime2.Metadata( pd.DataFrame({'pet': ['russ', 'milo', 'peanut']}, index=['S1', 'S2', 'S3'])) with tempfile.TemporaryDirectory() as output_dir: alpha_rarefaction(output_dir, t, max_depth=200, phylogeny=p, metadata=md) index_fp = os.path.join(output_dir, 'index.html') self.assertTrue(os.path.exists(index_fp)) self.assertTrue('observed_otus' in open(index_fp).read()) self.assertTrue('shannon' in open(index_fp).read()) self.assertTrue('faith_pd' in open(index_fp).read())
def test_simple(self): d = [[1.04, 1.5, 2., 2.5, 1.18, 2.82, 2.96, 3., 1, 3., 1., 'S1'], [1.04, 1.5, 2., 2.5, 1.18, 2.82, 2.96, 3., 1, 3., 1., 'S2'], [1.04, 1.5, 2., 2.5, 1.18, 2.82, 2.96, 3., 1, 3., 1., 'S3']] data = pd.DataFrame(data=d, columns=['2%', '25%', '50%', '75%', '9%', '91%', '98%', 'count', 'depth', 'max', 'min', 'sample-id']) with tempfile.TemporaryDirectory() as output_dir: _alpha_rarefaction_jsonp(output_dir, 'peanut.jsonp', 'shannon', data, '') jsonp_fp = os.path.join(output_dir, 'peanut.jsonp') self.assertTrue(os.path.exists(jsonp_fp)) jsonp_content = open(jsonp_fp).read() self.assertTrue('load_data' in jsonp_content) self.assertTrue('columns' in jsonp_content) self.assertTrue('index' in jsonp_content) self.assertTrue('data' in jsonp_content) self.assertTrue('sample-id' in jsonp_content) self.assertTrue('shannon' in jsonp_content)
def test_bioenv(self): dm = skbio.DistanceMatrix([[0.00, 0.25, 0.25], [0.25, 0.00, 0.00], [0.25, 0.00, 0.00]], ids=['sample1', 'sample2', 'sample3']) md = qiime2.Metadata( pd.DataFrame([['1.0', 'a'], ['2.0', 'b'], ['3.0', 'c']], index=['sample1', 'sample2', 'sample3'], columns=['metadata1', 'metadata2'])) with tempfile.TemporaryDirectory() as output_dir: bioenv(output_dir, dm, md) index_fp = os.path.join(output_dir, 'index.html') self.assertTrue(os.path.exists(index_fp)) self.assertTrue('metadata1' in open(index_fp).read()) self.assertTrue('not numerical' in open(index_fp).read()) self.assertTrue('<strong>metadata2' in open(index_fp).read()) self.assertFalse('Warning' in open(index_fp).read())
def test_bioenv_extra_metadata(self): dm = skbio.DistanceMatrix([[0.00, 0.25, 0.25], [0.25, 0.00, 0.00], [0.25, 0.00, 0.00]], ids=['sample1', 'sample2', 'sample3']) md = qiime2.Metadata( pd.DataFrame([['1.0', 'a'], ['2.0', 'b'], ['3.0', 'c'], ['4.0', 'd']], index=['sample1', 'sample2', 'sample3', 'sample4'], columns=['metadata1', 'metadata2'])) with tempfile.TemporaryDirectory() as output_dir: bioenv(output_dir, dm, md) index_fp = os.path.join(output_dir, 'index.html') self.assertTrue(os.path.exists(index_fp)) self.assertTrue('metadata1' in open(index_fp).read()) self.assertTrue('not numerical' in open(index_fp).read()) self.assertTrue('<strong>metadata2' in open(index_fp).read()) self.assertFalse('Warning' in open(index_fp).read())
def test_bioenv_zero_variance_column(self): dm = skbio.DistanceMatrix([[0.00, 0.25, 0.25], [0.25, 0.00, 0.00], [0.25, 0.00, 0.00]], ids=['sample1', 'sample2', 'sample3']) md = qiime2.Metadata( pd.DataFrame([['1.0', '2.0'], ['2.0', '2.0'], ['3.0', '2.0']], index=['sample1', 'sample2', 'sample3'], columns=['metadata1', 'metadata2'])) with tempfile.TemporaryDirectory() as output_dir: bioenv(output_dir, dm, md) index_fp = os.path.join(output_dir, 'index.html') self.assertTrue('metadata1' in open(index_fp).read()) self.assertTrue('no variance' in open(index_fp).read()) self.assertTrue('<strong>metadata2' in open(index_fp).read()) self.assertFalse('Warning' in open(index_fp).read())
def run(): urls = [ x.strip() for x in URLS.strip().splitlines() if x.strip() and not x.strip().startswith('#') ] with tempfile.TemporaryDirectory(prefix='symbols') as tmpdirname: downloaded = download_all(urls, tmpdirname) save_filepath = 'symbols-for-systemtests.zip' total_time_took = 0.0 total_size = 0 with zipfile.ZipFile(save_filepath, mode='w') as zf: for uri, (fullpath, time_took, size) in downloaded.items(): total_time_took += time_took total_size += size if fullpath: path = uri.replace('v1/', '') assert os.path.isfile(fullpath) zf.write( fullpath, arcname=path, compress_type=zipfile.ZIP_DEFLATED, )
def make_tempdir(prefix=None, suffix=None): """Decorator that adds a last argument that is the path to a temporary directory that gets deleted after the function has finished. Usage:: @make_tempdir() def some_function(arg1, arg2, tempdir, kwargs1='one'): assert os.path.isdir(tempdir) ... """ def decorator(func): @wraps(func) def inner(*args, **kwargs): with TemporaryDirectory(prefix=prefix, suffix=suffix) as f: args = args + (f,) return func(*args, **kwargs) return inner return decorator
def test_build_error(experiment_class): with testing.postgresql.Postgresql() as postgresql: db_engine = create_engine(postgresql.url()) ensure_db(db_engine) with TemporaryDirectory() as temp_dir: experiment = experiment_class( config=sample_config(), db_engine=db_engine, model_storage_class=FSModelStorageEngine, project_path=os.path.join(temp_dir, 'inspections'), ) with mock.patch.object(experiment, 'build_matrices') as build_mock: build_mock.side_effect = RuntimeError('boom!') with pytest.raises(RuntimeError): experiment()
def test_build_error_cleanup_timeout(_clean_up_mock, experiment_class): with testing.postgresql.Postgresql() as postgresql: db_engine = create_engine(postgresql.url()) ensure_db(db_engine) with TemporaryDirectory() as temp_dir: experiment = experiment_class( config=sample_config(), db_engine=db_engine, model_storage_class=FSModelStorageEngine, project_path=os.path.join(temp_dir, 'inspections'), cleanup_timeout=0.02, # Set short timeout ) with mock.patch.object(experiment, 'build_matrices') as build_mock: build_mock.side_effect = RuntimeError('boom!') with pytest.raises(TimeoutError) as exc_info: experiment() # Last exception is TimeoutError, but earlier error is preserved in # __context__, and will be noted as well in any standard traceback: assert exc_info.value.__context__ is build_mock.side_effect
def get_function_root(self, name): if not hasattr(self, 'functions_output'): self.functions_output = TemporaryDirectory("puresec-serverless-functions-") package_name = self._get_function_package_name(name) function_root = os.path.join(self.functions_output.name, package_name) if os.path.exists(function_root): return function_root try: zipfile = ZipFile(os.path.join(self.serverless_package, "{}.zip".format(package_name)), 'r') except FileNotFoundError: eprint("error: serverless package did not create a function zip for '{}'", name) raise SystemExit(2) except BadZipFile: eprint("error: serverless package did not create a valid function zip for '{}'", name) raise SystemExit(2) with zipfile: zipfile.extractall(function_root) return function_root