我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用types.SimpleNamespace()。
def test_water_use_efficiency(): """ Unit test does not quarantee the wue function is correct, but if this fails, something has changed, and better understand why. """ hf_stats = SimpleNamespace( rho_vapor=9.607e-3, rho_co2=658.8e-6, T=28.56 + 273.15, P=100.1e3, cov_w_q=0.1443e-3, cov_w_c=-1.059e-6, cov_w_T=0.1359, ustar=0.4179, rho_totair=1.150) wue = water_use_efficiency(hf_stats, meas_ht=7.11, canopy_ht=4.42, ppath='C3', ci_mod='const_ppm', diff_ratio=(1 / 0.7)) npt.assert_allclose(wue.wue, -6.45e-3, atol=0.005e-3) npt.assert_allclose(wue.ambient_h2o, 12.4e-3, atol=0.05e-3) npt.assert_allclose(wue.ambient_co2, 638.e-6, atol=0.5e-6) npt.assert_allclose(wue.inter_h2o, 28.3e-3, atol=0.05e-3) npt.assert_allclose(wue.inter_co2, 492.e-6, atol=0.5e-6)
def test_package_set(self): self.maxDiff = None args = types.SimpleNamespace() setattr(args, 'arch', 'x86') setattr(args, 'dryrun', False) setattr(args, 'force', True) setattr(args, 'inifile', 'testdata/inifile/setup.ini') setattr(args, 'pkglist', 'testdata/pkglist/cygwin-pkg-maint') setattr(args, 'rel_area', 'testdata/relarea') setattr(args, 'release', 'testing') setattr(args, 'setup_version', '4.321') packages = package.read_packages(args.rel_area, args.arch) package.delete(packages, 'x86/release/nonexistent', 'nosuchfile-1.0.0.tar.xz') self.assertEqual(package.validate_packages(args, packages), True) package.write_setup_ini(args, packages, args.arch) with open(args.inifile) as inifile: results = inifile.read() # fix the timestamp to match expected results = re.sub('setup-timestamp: .*', 'setup-timestamp: 1458221800', results, 1) results = re.sub('generated at .*', 'generated at 2016-03-17 13:36:40 GMT', results, 1) compare_with_expected_file(self, 'testdata/inifile', (results,), 'setup.ini') # XXX: delete a needed package, and check validate fails
def _build_rdf(self, data=None): ''' Parse incoming rdf as self.rdf.orig_graph, create copy at self.rdf.graph Args: data (): payload from GET request, expected RDF content in various serialization formats Returns: None ''' # recreate rdf data self.rdf = SimpleNamespace() self.rdf.data = data self.rdf.prefixes = SimpleNamespace() self.rdf.uris = SimpleNamespace() # populate prefixes for prefix,uri in self.repo.context.items(): setattr(self.rdf.prefixes, prefix, rdflib.Namespace(uri)) # graph self._parse_graph()
def internal_api_patcher(): """ Fixture that patches certain internal app functions for the entire selenium suite execution """ methods_to_patch = [ 'mail.api.MailgunClient._mailgun_request', ] patcher_mocks = [] patchers = [patch(method_name) for method_name in methods_to_patch] for patcher in patchers: mock = patcher.start() mock.name = patcher.attribute patcher_mocks.append(mock) yield SimpleNamespace( patchers=patchers, patcher_mocks=patcher_mocks ) for patcher in patchers: patcher.stop()
def mocked_elasticsearch_module_patcher(): """ Fixture that patches all indexing API functions that communicate directly with ElasticSearch """ patchers = [] patcher_mocks = [] for name, val in tasks.__dict__.items(): # This looks for functions starting with _ because those are the functions which are imported # from indexing_api. The _ lets it prevent name collisions. if callable(val) and name.startswith("_"): patchers.append(patch('search.tasks.{0}'.format(name), autospec=True)) for patcher in patchers: mock = patcher.start() mock.name = patcher.attribute patcher_mocks.append(mock) yield SimpleNamespace( patchers=patchers, patcher_mocks=patcher_mocks ) for patcher in patchers: patcher.stop()
def get_packages_from_lockfile(): """ Return object that contains default and development packages from Pipfile.lock Returns: SimpleNamespace(default=[...], development=[...]) """ result = SimpleNamespace(default=list(), development=list()) lockfile = Path('Pipfile.lock') lockfile_data = json.loads(lockfile.read_text()) for key in ('default', 'develop'): for package, version_info in lockfile_data[key].items(): packages = attrgetter('development' if key == 'develop' else key)(result) packages.append(package + version_info['version']) return result
def test_command_line_invokation(original_db, anonymized, monkeypatch): monkeypatch.setattr('argparse.ArgumentParser.parse_args', lambda self: SimpleNamespace( verbose=False, schema=SCHEMA_PATH, dump_file=DUMP_PATH, **{arg: ORIGINAL_DB_ARGS[arg] for arg in ('dbname', 'user', 'host', 'port', 'password')} )) dump_main() assert os.path.getsize(DUMP_PATH) > 2000 assert_db_empty(anonymized) monkeypatch.setattr('argparse.ArgumentParser.parse_args', lambda self: SimpleNamespace( verbose=False, leave_dump=False, schema=SCHEMA_PATH, dump_file=DUMP_PATH, **{arg: ANONYMIZED_DB_ARGS[arg] for arg in ('dbname', 'user', 'host', 'port', 'password')} )) anonymize_main() assert_db_anonymized(anonymized)
def _connect(self): try: self._conn = psycopg2.connect(database=DATABASE_DB, host=DATABASE_HOST, password=DATABASE_PASSWORD, port=DATABASE_PORT, sslmode=DATABASE_SSL_MODE, sslrootcert=DATABASE_CA_CERT, user=DATABASE_USER) if not self._connected: print('INFO: Connected to PostgreSQL', file=sys.stderr) self._connected = True except Exception as e: print(e, file=sys.stderr) self._conn = SimpleNamespace(closed=1) if self._connected: print('WARNING: Disconnected from PostgreSQL', file=sys.stderr) self._connected = False
def __generate_reference__(self, triple_map, **kwargs): """Generates a RDF entity based on triple map Args: triple_map(SimpleNamespace): Triple Map """ raw_value = self.source.get(str(triple_map.reference)) if raw_value is None or len(raw_value) < 1: return if hasattr(triple_map, "datatype"): if triple_map.datatype == NS_MGR.xsd.anyURI: output = rdflib.URIRef(raw_value) else: output = rdflib.Literal( raw_value, datatype=triple_map.datatype) else: output = rdflib.Literal(raw_value) return output
def __reference_handler__(self, **kwargs): """Internal method for handling rr:reference in triples map Keyword Args: ------------- predicate_obj_map: SimpleNamespace obj: dict subject: rdflib.URIRef """ subjects = [] pred_obj_map = kwargs.get("predicate_obj_map") obj = kwargs.get("obj") subject = kwargs.get("subject") if pred_obj_map.reference is None: return subjects predicate = pred_obj_map.predicate ref_exp = jsonpath_ng.parse(str(pred_obj_map.refernce)) found_objects = [r.value for r in ref_exp(obj)] for row in found_objects: self.output.add((subject, predicate, rdflib.Literal(row)))
def __generate_reference__(self, triple_map, **kwargs): """Internal method takes a triple_map and returns the result of applying to XPath to the current DOM context Args: ----- triple_map: SimpleNamespace element: etree.Element """ element = kwargs.get("element") found_elements = element.xpath( triple_map.reference, namespaces=self.xml_ns) for elem in found_elements: raw_text = elem.text.strip() #! Quick and dirty test for valid URI if not raw_text.startswith("http"): continue return rdflib.URIRef(raw_text)
def _fit_embedding_word(self, embedding_type, construct_docs, tokenize_, d=None): if embedding_type == 'google': embeddings_ = joblib.load('data/google/GoogleNews-vectors-negative300.pickle') embeddings_ = SimpleNamespace(X=embeddings_.syn0, vocab={w: v.index for w, v in embeddings_.vocab.items()}) elif embedding_type == 'twitter': estimator = Pipeline([ ('tokenize', MapCorporas(tokenize_)), ('word2vec', MergeSliceCorporas(CachedFitTransform(Word2Vec( sg=1, size=d, window=10, hs=0, negative=5, sample=1e-3, min_count=1, iter=20, workers=16 ), self.memory))), ]).fit([self.train_docs, self.unsup_docs[:10**6], self.val_docs, self.test_docs]) embeddings_ = estimator.named_steps['word2vec'].estimator embeddings_ = SimpleNamespace(X=embeddings_.syn0, vocab={w: v.index for w, v in embeddings_.vocab.items()}) else: embeddings_ = SimpleNamespace(X=np.empty((0, d)), vocab={}) estimator = Pipeline([ ('tokenize', MapCorporas(tokenize_)), # 0.25 is chosen so the unknown vectors have approximately the same variance as google pre-trained ones ('embeddings', MapCorporas(Embeddings( embeddings_, rand=lambda shape: get_rng().uniform(-0.25, 0.25, shape).astype('float32'), include_zero=True ))), ]) estimator.fit(construct_docs) return estimator.named_steps['embeddings'].estimator
def setUp(self): self.rpc = FakeTransmissionRPC() self.torrent = FakeTorrentAPI() srvapi = SimpleNamespace(rpc=self.rpc, torrent=self.torrent, loop=self.loop) self.api = StatusAPI(srvapi, interval=1) self.rpc.fake_stats = { 'downloadSpeed': 789, 'uploadSpeed': 0, 'activeTorrentCount': 1, 'pausedTorrentCount': 2, 'torrentCount': 3, } self.torrent.fake_tlist = SimpleNamespace( torrents=({'status': Status((Status.ISOLATED,)), 'rate-up': 0, 'rate-down': 0}, {'status': Status((Status.DOWNLOAD,)), 'rate-up': 0, 'rate-down': 456}, {'status': Status((Status.DOWNLOAD, Status.UPLOAD)), 'rate-up': 123, 'rate-down': 456}), )
def run(self, command, *args, **kws): if ['lb', 'config'] == command[-2:]: self.call_args_list.append(command) return SimpleNamespace(returncode=1) elif ['lb', 'build'] == command[-2:]: self.call_args_list.append(command) # Create dummy top-level filesystem layout. chroot_dir = os.path.join(self.root_dir, 'chroot') for dir_name in DIRS_UNDER_ROOTFS: os.makedirs(os.path.join(chroot_dir, dir_name)) return SimpleNamespace(returncode=1) elif command.startswith('dpkg -L'): self.call_args_list.append(command) stdout = kws.pop('stdout', PIPE) stderr = kws.pop('stderr', PIPE) return subprocess_run( command, stdout=stdout, stderr=stderr, universal_newlines=True, **kws)
def test_attrdel(self): ns1 = types.SimpleNamespace() ns2 = types.SimpleNamespace(x=1, y=2, w=3) with self.assertRaises(AttributeError): del ns1.spam with self.assertRaises(AttributeError): del ns2.spam del ns2.y self.assertEqual(vars(ns2), dict(w=3, x=1)) ns2.y = 'spam' self.assertEqual(vars(ns2), dict(w=3, x=1, y='spam')) del ns2.y self.assertEqual(vars(ns2), dict(w=3, x=1)) ns1.spam = 5 self.assertEqual(vars(ns1), dict(spam=5)) del ns1.spam self.assertEqual(vars(ns1), {})
def setup_edit_test(tweet_factory, repository, mock, *, nvim_returncode): tweet_1 = tweet_factory.make_tweet(42, "First tweet!", date="2017-07-07") tweet_2 = tweet_factory.make_tweet(57, "Second tweet", date="2017-08-02") repository.add_tweets([tweet_1, tweet_2]) spy = types.SimpleNamespace() spy.cmd = None def fake_run(cmd): stub_process = mock.Mock() spy.cmd = cmd stub_process.returncode = nvim_returncode path = cmd[1] with open(path, "w") as stream: stream.write("changed") return stub_process mock.patch("subprocess.run", fake_run) return spy
def test_walker_set_heat(): foo_var = SimpleNamespace(draw_random=lambda: True, draw_value=0.1, name="foo", current_value=0.15) bar_var = SimpleNamespace(draw_random=lambda: True, draw_value=0.5, name="bar", current_value=0.51) walker = mcmcmc._Walker(variables=[foo_var, bar_var], func=lambda *_: 1, heat=0.1, min_max=(1, 5), quiet=True) assert walker.heat == 0.1 walker.set_heat(0.75) assert walker.heat == 0.75 with pytest.raises(ValueError) as err: walker.set_heat(-1) assert "heat values must be positive, between 0.000001 and 1.0." in str(err) with pytest.raises(ValueError) as err: walker.set_heat(0) assert "heat values must be positive, between 0.000001 and 1.0." in str(err) with pytest.raises(ValueError) as err: walker.set_heat(1.01) assert "heat values must be positive, between 0.000001 and 1.0." in str(err)
def test_chain_init(): foo_var = SimpleNamespace(draw_random=lambda: True, draw_value=0.1, name="foo", current_value=0.15) bar_var = SimpleNamespace(draw_random=lambda: True, draw_value=0.5, name="bar", current_value=0.51) walker1 = SimpleNamespace(variables=[foo_var, bar_var]) walker2 = SimpleNamespace(variables=[foo_var, bar_var]) tmp_file = br.TempFile() chain = mcmcmc._Chain(walkers=[walker1, walker2], outfile=tmp_file.path, cold_heat=0.01, hot_heat=0.2) assert chain.walkers == [walker1, walker2] assert chain.outfile == tmp_file.path assert chain.cold_heat == 0.01 assert chain.hot_heat == 0.2 assert chain.step_counter == 0 assert chain.best_score_ever_seen == 0 assert tmp_file.read() == """\ Gen\tfoo\tbar\tresult """
def test_chain_dump_obj(): walker1 = SimpleNamespace(_dump_obj=lambda *_: "walker1") walker2 = SimpleNamespace(_dump_obj=lambda *_: "walker2") tmp_file = br.TempFile() tmp_file.write("outfile results") chain = SimpleNamespace(walkers=[walker1, walker2], outfile=tmp_file.path, cold_heat=0.1, hot_heat=0.2, step_counter=20, best_score_ever_seen=100, _dump_obj=mcmcmc._Chain._dump_obj) dump = chain._dump_obj(chain) assert dump["walkers"] == ["walker1", "walker2"] assert dump["cold_heat"] == 0.1 assert dump["hot_heat"] == 0.2 assert dump["step_count"] == 20 assert dump["best_score"] == 100 assert dump["results"] == "outfile results"
def test_chain_apply_dump(capsys): walker1 = SimpleNamespace(_apply_dump=lambda *_: print("Applying dump to walker1")) walker2 = SimpleNamespace(_apply_dump=lambda *_: print("Applying dump to walker2")) tmp_file = br.TempFile() chain = SimpleNamespace(walkers=[walker1, walker2], outfile=tmp_file.path, cold_heat=None, hot_heat=None, step_counter=None, best_score_ever_seen=None, _apply_dump=mcmcmc._Chain._apply_dump) var_dict = {"walkers": [None, None], "cold_heat": 0.1, "hot_heat": 0.2, "step_count": 20, "best_score": 100, "results": "Some results"} chain._apply_dump(chain, var_dict) assert chain.walkers == [walker1, walker2] out, err = capsys.readouterr() assert out == "Applying dump to walker1\nApplying dump to walker2\n" assert chain.cold_heat == 0.1 assert chain.hot_heat == 0.2 assert chain.step_counter == 20 assert chain.best_score_ever_seen == 100 assert tmp_file.read() == "Some results"
def test_mcmcmc_resume(capsys): mc_obj = SimpleNamespace(dumpfile="does_not_exist", resume=mcmcmc.MCMCMC.resume) assert mc_obj.resume(mc_obj) is False tmp_file = br.TempFile(byte_mode=True) dill.dump(["a", "b", "c"], tmp_file) mc_obj.dumpfile = tmp_file.path chain1 = SimpleNamespace(_apply_dump=lambda *_: print("applying chain1")) chain2 = SimpleNamespace(_apply_dump=lambda *_: print("applying chain2")) chain3 = SimpleNamespace(_apply_dump=lambda *_: print("applying chain3")) mc_obj.chains = [chain1, chain2, chain3] mc_obj.run = lambda *_: print("Running") assert mc_obj.resume(mc_obj) is True out, err = capsys.readouterr() assert out == "applying chain1\napplying chain2\napplying chain3\nRunning\n", print(out)
def test_mcmcmc_check_convergence(hf): csv_path = os.path.join(hf.resource_path, "mcmcmc", "chain") chain1 = SimpleNamespace(step_counter=99, outfile=csv_path + "1.csv") chain2 = SimpleNamespace(step_counter=99, outfile=csv_path + "2.csv") chain3 = SimpleNamespace(step_counter=99, outfile=csv_path + "3.csv") mc_obj = SimpleNamespace(_check_convergence=mcmcmc.MCMCMC._check_convergence, chains=[chain1, chain2, chain3], convergence=1.01) # Return False when step_counter < 100 assert mc_obj._check_convergence(mc_obj) is False # Return False when convergence is not met chain1.step_counter = chain2.step_counter = chain3.step_counter = 100 assert mc_obj._check_convergence(mc_obj) is False # Return True on convergence mc_obj.convergence = 1.1 assert mc_obj._check_convergence(mc_obj) is True
def test_mcmcmc_reset_params(): walker1 = SimpleNamespace(params=[1, 2]) walker2 = SimpleNamespace(params=[3, 4]) walker3 = SimpleNamespace(params=[5, 6]) walker4 = SimpleNamespace(params=[7, 8]) chain1 = SimpleNamespace(walkers=[walker1, walker2]) chain2 = SimpleNamespace(walkers=[walker3, walker4]) mc_obj = SimpleNamespace(reset_params=mcmcmc.MCMCMC.reset_params, chains=[chain1, chain2]) with pytest.raises(AttributeError) as e: mc_obj.reset_params(mc_obj, ['a', 'b', 'c']) assert "Incorrect number of params supplied in reset_params(). 2 expected; 3 supplied; ['a', 'b', 'c']" in str(e) mc_obj.reset_params(mc_obj, ['a', 'b']) assert walker2.params == ['a', 'b']
def add_option(self, key, default, *, reset=True): """Add a new option without adding widgets to the setting dialog. ``section[key]`` will be *default* unless something else is specified. If *reset* is True, the setting dialog's reset button sets this option to *default*. .. note:: The *reset* argument should be False for settings that cannot be changed with the dialog. That way, clicking the reset button resets only the settings that are shown in the dialog. """ self._infos[key] = types.SimpleNamespace( default=default, # not validated reset=reset, callbacks=[], errorvar=tkinter.BooleanVar(), # true when the triangle is showing )
def __init__(self, data_dir, work_dir): self.RandomState = np.random.RandomState(seed=20161013) self.data_mean = None self.data_std = None self.class_count = None self.meta = None self.train_meta = None self.test_data = types.SimpleNamespace(X=None, y=None, meta=None) self.validation_data = types.SimpleNamespace(X=None, y=None, meta=None) self.data_dir = data_dir self.work_dir = work_dir self.data_dir = os.path.abspath(self.data_dir) + '/' self.work_dir = os.path.abspath(self.work_dir) + '/' if not os.path.isdir(self.data_dir): raise NotADirectoryError("{} is not a proper dataset directory.".format(self.data_dir)) if not os.path.isdir(self.work_dir): raise NotADirectoryError("{} is not a proper working directory.".format(self.work_dir))
def test_html_writer(self): self.maxDiff = None htdocs = 'testdata/htdocs' args = types.SimpleNamespace() setattr(args, 'arch', 'x86') setattr(args, 'htdocs', htdocs) setattr(args, 'rel_area', 'testdata/relarea') setattr(args, 'dryrun', False) setattr(args, 'force', True) setattr(args, 'pkglist', 'testdata/pkglist/cygwin-pkg-maint') packages = package.read_packages(args.rel_area, args.arch) package.validate_packages(args, packages) pkg2html.update_package_listings(args, packages, args.arch) # compare the output files with expected for (dirpath, subdirs, files) in os.walk(htdocs): relpath = os.path.relpath(dirpath, htdocs) for f in files: results = os.path.join(htdocs, relpath, f) expected = os.path.join('testdata/htdocs.expected', relpath, f) if not filecmp.cmp(results, expected, shallow=False): logging.info("%s different", os.path.join(relpath, f)) with open(results) as r, open(expected) as e: self.assertMultiLineEqual(e.read(), r.read()) else: logging.info("%s identical", os.path.join(relpath, f))
def test_scan_uploads(self): self.maxDiff = None args = types.SimpleNamespace() setattr(args, 'arch', 'x86') setattr(args, 'rel_area', 'testdata/relarea') setattr(args, 'dryrun', False) pkglist = ['after-ready', 'not-ready', 'testpackage', 'testpackage2'] mlist = {} mlist = maintainers.Maintainer.add_directories(mlist, 'testdata/homes') m = mlist['Blooey McFooey'] m.pkgs.extend(pkglist + ['not-on-package-list']) ready_fns = [(os.path.join(m.homedir(), 'x86', 'release', 'testpackage', '!ready'), ''), (os.path.join(m.homedir(), 'x86', 'release', 'testpackage2', 'testpackage2-subpackage', '!ready'), ''), (os.path.join(m.homedir(), 'x86', 'release', 'after-ready', '!ready'), '-t 198709011700'), (os.path.join(m.homedir(), 'x86', 'release', 'corrupt', '!ready'), '')] for (f, t) in ready_fns: os.system('touch %s "%s"' % (t, f)) scan_result = uploads.scan(m, pkglist + ['not-on-maintainer-list'], args.arch, args) self.assertEqual(scan_result.error, False) compare_with_expected_file(self, 'testdata/uploads', dict(scan_result.to_relarea), 'move') self.assertCountEqual(scan_result.to_vault, {'x86/release/testpackage': ['x86/release/testpackage/testpackage-0.1-1.tar.bz2']}) self.assertCountEqual(scan_result.remove_always, [f for (f, t) in ready_fns]) self.assertEqual(scan_result.remove_success, ['testdata/homes/Blooey McFooey/x86/release/testpackage/-testpackage-0.1-1-src.tar.bz2', 'testdata/homes/Blooey McFooey/x86/release/testpackage/-testpackage-0.1-1.tar.bz2']) with pprint_patch(): compare_with_expected_file(self, 'testdata/uploads', dict(scan_result.packages), 'pkglist')
def __init__(self, repo, uri=None, response=None, rdf_prefixes_mixins=None): # repository handle is pinned to resource instance here self.repo = repo # parse uri with parse_uri() from repo instance self.uri = self.repo.parse_uri(uri) # parse response # if response provided, parse and set to attributes if response: self.response = response self.data = self.response.content self.headers = self.response.headers self.status_code = self.response.status_code # if response, and status_code is 200, set True if self.status_code == 200: self.exists = True # if not response, set all blank else: self.response = None self.data = None self.headers = {} self.status_code = None self.exists = False # RDF self._build_rdf(data=self.data) # versions self.versions = SimpleNamespace()
def parse_object_like_triples(self): ''' method to parse triples from self.rdf.graph for object-like access Args: None Returns: None: sets self.rdf.triples ''' # parse triples as object-like attributes in self.rdf.triples self.rdf.triples = SimpleNamespace() # prepare triples for s,p,o in self.rdf.graph: # get ns info ns_prefix, ns_uri, predicate = self.rdf.graph.compute_qname(p) # if prefix as list not yet added, add if not hasattr(self.rdf.triples, ns_prefix): setattr(self.rdf.triples, ns_prefix, SimpleNamespace()) # same for predicate if not hasattr(getattr(self.rdf.triples, ns_prefix), predicate): setattr(getattr(self.rdf.triples, ns_prefix), predicate, []) # append object for this prefix getattr(getattr(self.rdf.triples, ns_prefix), predicate).append(o)
def _diff_graph(self): ''' Uses rdflib.compare diff, https://github.com/RDFLib/rdflib/blob/master/rdflib/compare.py When a resource is retrieved, the graph retrieved and parsed at that time is saved to self.rdf._orig_graph, and all local modifications are made to self.rdf.graph. This method compares the two graphs and returns the diff in the format of three graphs: overlap - triples shared by both removed - triples that exist ONLY in the original graph, self.rdf._orig_graph added - triples that exist ONLY in the modified graph, self.rdf.graph These are used for building a sparql update query for self.update. Args: None Returns: None: sets self.rdf.diffs and adds the three graphs mentioned, 'overlap', 'removed', and 'added' ''' overlap, removed, added = graph_diff( to_isomorphic(self.rdf._orig_graph), to_isomorphic(self.rdf.graph)) diffs = SimpleNamespace() diffs.overlap = overlap diffs.removed = removed diffs.added = added self.rdf.diffs = diffs
def model(self): return types.SimpleNamespace()
def test_discovery_from_dotted_namespace_packages(self): if not getattr(types, 'SimpleNamespace', None): raise unittest.SkipTest('Namespaces not supported') loader = unittest.TestLoader() orig_import = __import__ package = types.ModuleType('package') package.__path__ = ['/a', '/b'] package.__spec__ = types.SimpleNamespace( loader=None, submodule_search_locations=['/a', '/b'] ) def _import(packagename, *args, **kwargs): sys.modules[packagename] = package return package def cleanup(): builtins.__import__ = orig_import self.addCleanup(cleanup) builtins.__import__ = _import _find_tests_args = [] def _find_tests(start_dir, pattern, namespace=None): _find_tests_args.append((start_dir, pattern)) return ['%s/tests' % start_dir] loader._find_tests = _find_tests loader.suiteClass = list suite = loader.discover('package') self.assertEqual(suite, ['/a/tests', '/b/tests'])
def get(self, url): for matcher in self.url_matches: if matcher[0].fullmatch(url): return SimpleNamespace(data=json.load( open(os.path.join(app.root_path, 'mock/fixtures/' + matcher[1])) )) return SimpleNamespace(data={})
def __init__(self, numLed, hwBackend, port=6606): self.ctx = zmq.Context() self.numLed = numLed self.port = port self.loop = IOLoop.instance() self.caller = PeriodicCallback(self._on_nextFrame, 1000/30, self.loop) self.hwComm = hwBackend self.hwComm.connect() self.zmqCollector = GlinAppZmqCollector(self, self.ctx) self.zmqPublisher = GlinAppZmqPublisher(self, self.ctx) # server side configuration self.config = SimpleNamespace() self.config.maxFps = 60 # current state (somehow client side configuration) self.state = SimpleNamespace() self.state.animationClasses = [] self.state.activeSceneId = None self.state.activeAnimation = None self.state.scenes = {} self.state.brightness = 1.0 self.state.sceneIdCtr = 0 self.state.mainswitch = True self.state.targetFps = 0 self.state.lastFrameSent = None
def __init__(self, repo): # Must have run git-annex init if repo.lookup_branch('git-annex') is None: fmt = 'Repository {} is not a git-annex repo.' msg = fmt.format(repo) raise NotAGitAnnexRepoError(msg) self.repo = repo self.processes = types.SimpleNamespace() self.processes.metadata = \ GitAnnexMetadataBatchJsonProcess(self.repo.workdir) self.processes.contentlocation = \ GitAnnexContentlocationBatchProcess(self.repo.workdir)
def base_test_data(): """ Fixture for test data that should be available to any test case in the suite """ # Create a live program with valid prices and financial aid program = ProgramFactory.create( live=True, financial_aid_availability=True, price=1000, ) CourseRunFactory.create(course__program=program) TierProgramFactory.create_properly_configured_batch(2, program=program) # Create users staff_user, student_user = (create_user_for_login(is_staff=True), create_user_for_login(is_staff=False)) ProgramEnrollment.objects.create(program=program, user=staff_user) ProgramEnrollment.objects.create(program=program, user=student_user) Role.objects.create( role=Staff.ROLE_ID, user=staff_user, program=program, ) return SimpleNamespace( staff_user=staff_user, student_user=student_user, program=program )
def program_data(): """ Fixture for program and tier_program test data """ program, tier_programs = create_program() return SimpleNamespace( program=program, tier_programs=tier_programs, )
def test_props(self, program_data): """ Fixture that provides test properties for FinancialAidDetailView test cases """ user = create_enrolled_profile(program_data.program).user pending_fa = FinancialAidFactory.create( user=user, tier_program=program_data.tier_programs["25k"], status=FinancialAidStatus.PENDING_DOCS ) docs_sent_url = reverse( "financial_aid", kwargs={"financial_aid_id": pending_fa.id} ) docs_sent_date = now_in_utc().date() docs_sent_request_params = dict( content_type="application/json", data=json.dumps({ "date_documents_sent": docs_sent_date.strftime("%Y-%m-%d") }) ) return SimpleNamespace( user=user, pending_fa=pending_fa, docs_sent_url=docs_sent_url, docs_sent_request_params=docs_sent_request_params, docs_sent_date=docs_sent_date, )
def __getattr__(self, name): try: return super(SimpleNamespace, self).__getitem__(name) except KeyError: raise AttributeError('{0} has no attribute {1}' .format(self.__class__.__name__, name))
def __setattr__(self, name, value): super(SimpleNamespace, self).__setitem__(name, value)
def record_to_dict(record: Record): """ Converts a record/namespace to a dictionary. Notes: If you want to convert a dict to a record, simply call ``record(**D)``. """ if isinstance(record, SimpleNamespace): return dict(record.__dict__) return dict(record._items())
def push_args(): args = types.SimpleNamespace() args.accept = False args.assignee = None args.force = False args.merge = False args.mr_title = None args.push_spec = None args.ready = False args.reviewers = None args.target_branch = "master" args.title = None args.wip = False return args
def load(name): return SimpleNamespace(**_templates[name]) # autoload all templates
def read_config(cfgfile='settings.cfg', section='DEFAULT'): """Parse the Hydrus configuration file.""" cfg = SimpleNamespace() parser = ConfigParser() parser.read(cfgfile) cfg.RAPIDCLUS = parser.getboolean(section, 'RAPIDCLUS') cfg.QUADRATURE = parser.getboolean(section, 'QUADRATURE') cfg.INFILE = parser.get(section, 'INFILE') cfg.MEASURE_SETTINGS = parser.get(section, 'MEASURE_SETTINGS') return cfg
def dispatch_custom_completer(self, text): if not self.custom_completers: return line = self.line_buffer if not line.strip(): return None # Create a little structure to pass all the relevant information about # the current completion to any custom completer. event = SimpleNamespace() event.line = line event.symbol = text cmd = line.split(None,1)[0] event.command = cmd event.text_until_cursor = self.text_until_cursor # for foo etc, try also to find completer for %foo if not cmd.startswith(self.magic_escape): try_magic = self.custom_completers.s_matches( self.magic_escape + cmd) else: try_magic = [] for c in itertools.chain(self.custom_completers.s_matches(cmd), try_magic, self.custom_completers.flat_matches(self.text_until_cursor)): try: res = c(event) if res: # first, try case sensitive match withcase = [r for r in res if r.startswith(text)] if withcase: return withcase # if none, then case insensitive ones are ok too text_low = text.lower() return [r for r in res if r.lower().startswith(text_low)] except TryNext: pass return None
def get(self, edit_count): request = types.SimpleNamespace() request.user = UserFactory(edit_count=edit_count) request.method = "GET" return request
def post(self, edit_count): request = types.SimpleNamespace() request.user = UserFactory(edit_count=edit_count) request.POST = {} request.method = "POST" return request
def get(self): request = types.SimpleNamespace() request.method = "GET" return request
def post(self, post): request = types.SimpleNamespace() request.POST = post request.method = "POST" return request