我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用textwrap.dedent()。
def _warn_legacy_version(self): LV = packaging.version.LegacyVersion is_legacy = isinstance(self._parsed_version, LV) if not is_legacy: return # While an empty version is technically a legacy version and # is not a valid PEP 440 version, it's also unlikely to # actually come from someone and instead it is more likely that # it comes from setuptools attempting to parse a filename and # including it in the list. So for that we'll gate this warning # on if the version is anything at all or not. if not self.version: return tmpl = textwrap.dedent(""" '{project_name} ({version})' is being parsed as a legacy, non PEP 440, version. You may find odd behavior and sort order. In particular it will be sorted as less than 0.0. It is recommended to migrate to PEP 440 compatible versions. """).strip().replace('\n', ' ') warnings.warn(tmpl.format(**vars(self)), PEP440Warning)
def _patch_usage(): import distutils.core USAGE = textwrap.dedent(""" usage: %(script)s [options] requirement_or_url ... or: %(script)s --help """).lstrip() def gen_usage(script_name): return USAGE % dict( script=os.path.basename(script_name), ) saved = distutils.core.gen_usage distutils.core.gen_usage = gen_usage try: yield finally: distutils.core.gen_usage = saved
def parse_xml(cls, node, runtime, keys, id_generator): """ Parses the source XML in a way that preserves indenting, needed for markdown. """ block = runtime.construct_xblock_from_class(cls, keys) # Load the data for name, value in node.items(): if name in block.fields: value = (block.fields[name]).from_string(value) setattr(block, name, value) # Load content text = node.text if text: # Fix up whitespace. if text[0] == "\n": text = text[1:] text.rstrip() text = textwrap.dedent(text) if text: block.content = text return block
def inspect(self): """ Return a string representation of the data stored in this array. """ return dedent( """\ Adjusted Array ({dtype}): Data: {data!r} Adjustments: {adjustments} """ ).format( dtype=self.dtype.name, data=self.data, adjustments=self.adjustments, )
def test_render(self): csv = textwrap.dedent(self.TILEMAP_CSV).strip() tilemap = (sappho.tiles.TileMap. from_csv_string_and_tilesheet(csv, self.tilesheet)) # Create a surface that has 1x2 strips of red, green, and # blue to against the rendered tilemap. This surface has # to have the SRCALPHA flag and a depth of 32 to match # the surface returned by the render function. test_surface = pygame.surface.Surface((3, 2), pygame.SRCALPHA, 32) test_surface.fill((255, 0, 0), pygame.Rect(0, 0, 1, 2)) test_surface.fill((0, 255, 0), pygame.Rect(1, 0, 1, 2)) test_surface.fill((0, 0, 255), pygame.Rect(2, 0, 1, 2)) # Render the tilemap output_surface = tilemap.to_surface() # Compare the two surfaces assert(compare_surfaces(test_surface, output_surface))
def _conflict_bail(VC_err, version): """ Setuptools was imported prior to invocation, so it is unsafe to unload it. Bail out. """ conflict_tmpl = textwrap.dedent(""" The required version of setuptools (>={version}) is not available, and can't be installed while this script is running. Please install a more recent version first, using 'easy_install -U setuptools'. (Currently using {VC_err.args[0]!r}) """) msg = conflict_tmpl.format(**locals()) sys.stderr.write(msg) sys.exit(2)
def test_info(self, process, process_repr): for cls_or_obj in [ExampleProcess, process]: buf = StringIO() cls_or_obj.info(buf=buf) actual = buf.getvalue() assert actual == process_repr class EmptyProcess(Process): pass expected = dedent("""\ Variables: *empty* Meta: time_dependent: True""") buf = StringIO() EmptyProcess.info(buf=buf) actual = buf.getvalue() assert actual == expected
def test_unicode_is_allowed(self): spec = { 'paths': { '/resource_a': { 'get': { 'description': '\u041f', 'responses': { '200': {'description': 'ok'} } } } } } text = '\n'.join(openapi.openapi2httpdomain(spec)) assert text == textwrap.dedent(''' .. http:get:: /resource_a :synopsis: null \u041f :status 200: ok ''').lstrip()
def upload(self, step, buildIdFile, tgzFile): # only upload if requested if not self.canUploadJenkins(): return "" # upload with curl if file does not exist yet on server return "\n" + textwrap.dedent("""\ # upload artifact cd $WORKSPACE BOB_UPLOAD_BID="$(hexdump -ve '/1 "%02x"' {BUILDID}){GEN}" BOB_UPLOAD_URL="{URL}/${{BOB_UPLOAD_BID:0:2}}/${{BOB_UPLOAD_BID:2:2}}/${{BOB_UPLOAD_BID:4}}{SUFFIX}" if ! curl --output /dev/null --silent --head --fail "$BOB_UPLOAD_URL" ; then BOB_UPLOAD_RSP=$(curl -sSgf -w '%{{http_code}}' -H 'If-None-Match: *' -T {RESULT} "$BOB_UPLOAD_URL" || true) if [[ $BOB_UPLOAD_RSP != 2?? && $BOB_UPLOAD_RSP != 412 ]]; then echo "Upload failed with code $BOB_UPLOAD_RSP"{FAIL} fi fi""".format(URL=self.__url.geturl(), BUILDID=quote(buildIdFile), RESULT=quote(tgzFile), FAIL="" if self._ignoreErrors() else "; exit 1", GEN=ARCHIVE_GENERATION, SUFFIX=ARTIFACT_SUFFIX))
def uploadJenkinsLiveBuildId(self, step, liveBuildId, buildId): # only upload if requested if not self.canUploadJenkins(): return "" # upload with curl if file does not exist yet on server return "\n" + textwrap.dedent("""\ # upload live build-id cd $WORKSPACE BOB_UPLOAD_BID="$(hexdump -ve '/1 "%02x"' {LIVEBUILDID}){GEN}" BOB_UPLOAD_URL="{URL}/${{BOB_UPLOAD_BID:0:2}}/${{BOB_UPLOAD_BID:2:2}}/${{BOB_UPLOAD_BID:4}}{SUFFIX}" BOB_UPLOAD_RSP=$(curl -sSgf -w '%{{http_code}}' -H 'If-None-Match: *' -T {BUILDID} "$BOB_UPLOAD_URL" || true) if [[ $BOB_UPLOAD_RSP != 2?? && $BOB_UPLOAD_RSP != 412 ]]; then echo "Upload failed with code $BOB_UPLOAD_RSP"{FAIL} fi """.format(URL=self.__url.geturl(), LIVEBUILDID=quote(liveBuildId), BUILDID=quote(buildId), FAIL="" if self._ignoreErrors() else "; exit 1", GEN=ARCHIVE_GENERATION, SUFFIX=BUILDID_SUFFIX))
def _extract_scripts_from_project(setup_filename='setup.py'): """Parse setup.py and return scripts""" if not os.path.isfile(setup_filename): return '' mock_setup = textwrap.dedent('''\ def setup(*args, **kwargs): __setup_calls__.append((args, kwargs)) ''') parsed_mock_setup = ast.parse(mock_setup, filename=setup_filename) with open(setup_filename, 'rt') as setup_file: parsed = ast.parse(setup_file.read()) for index, node in enumerate(parsed.body[:]): if (not isinstance(node, ast.Expr) or not isinstance(node.value, ast.Call) or node.value.func.id != 'setup'): continue parsed.body[index:index] = parsed_mock_setup.body break fixed = ast.fix_missing_locations(parsed) codeobj = compile(fixed, setup_filename, 'exec') local_vars = {} global_vars = {'__setup_calls__': []} exec(codeobj, global_vars, local_vars) _, kwargs = global_vars['__setup_calls__'][0] return ','.join([os.path.basename(f) for f in kwargs.get('scripts', [])])
def test_colors(tmpdir): """Test colors. :param tmpdir: pytest fixture. """ script = dedent("""\ import logging from sphinxcontrib.versioning.setup_logging import setup_logging setup_logging(verbose=False, colors=True) logger = logging.getLogger("{logger}") logger.critical("Critical") logger.error("Error") logger.warning("Warning") logger.info("Info") logger.debug("Debug") # Not printed since verbose = False. """).format(logger=ColorFormatter.SPECIAL_SCOPE + '.sample') tmpdir.join('script.py').write(script) output = pytest.run(tmpdir, [sys.executable, 'script.py']) assert '\033[31m=> Critical\033[39m\n' in output assert '\033[31m=> Error\033[39m\n' in output assert '\033[33m=> Warning\033[39m\n' in output assert '\033[36m=> Info\033[39m' in output assert 'Debug' not in output
def format_description(self, description): # leave full control over description to us if description: if hasattr(self.parser, 'main'): label = 'Commands' else: label = 'Description' #some doc strings have inital newlines, some don't description = description.lstrip('\n') #some doc strings have final newlines and spaces, some don't description = description.rstrip() #dedent, then reindent description = self.indent_lines(textwrap.dedent(description), " ") description = '%s:\n%s\n' % (label, description) return description else: return ''
def write_script( scriptdir, rev_id, content, encoding='ascii', sourceless=False): old = scriptdir.revision_map.get_revision(rev_id) path = old.path content = textwrap.dedent(content) if encoding: content = content.encode(encoding) with open(path, 'wb') as fp: fp.write(content) pyc_path = util.pyc_file_from_path(path) if os.access(pyc_path, os.F_OK): os.unlink(pyc_path) script = Script._from_path(scriptdir, path) old = scriptdir.revision_map.get_revision(script.revision) if old.down_revision != script.down_revision: raise Exception("Can't change down_revision " "on a refresh operation.") scriptdir.revision_map.add_revision(script, _replace=True) if sourceless: make_sourceless(path)
def create_sdist(): """ Return an sdist with a setup_requires dependency (of something that doesn't exist) """ with tempdir_context() as dir: dist_path = os.path.join(dir, 'setuptools-test-fetcher-1.0.tar.gz') make_trivial_sdist( dist_path, textwrap.dedent(""" import setuptools setuptools.setup( name="setuptools-test-fetcher", version="1.0", setup_requires = ['does-not-exist'], ) """).lstrip()) yield dist_path
def test_exit_cleanly(self): # Make sure handler fun is called on clean interpreter exit. ret = pyrun(textwrap.dedent( """ import os, imp mod = imp.load_source("mod", r"{}") def foo(): with open(r"{}", "ab") as f: f.write(b"1") mod.register_exit_fun(foo) """.format(os.path.abspath(__file__), TESTFN) )) self.assertEqual(ret, 0) with open(TESTFN, "rb") as f: self.assertEqual(f.read(), b"1")
def test_exception(self): # Make sure handler fun is called on exception. ret = pyrun(textwrap.dedent( """ import os, imp, sys mod = imp.load_source("mod", r"{}") def foo(): with open(r"{}", "ab") as f: f.write(b"1") mod.register_exit_fun(foo) sys.stderr = os.devnull 1 / 0 """.format(os.path.abspath(__file__), TESTFN) )) self.assertEqual(ret, 1) with open(TESTFN, "rb") as f: self.assertEqual(f.read(), b"1")
def test_kinterrupt(self): # Simulates CTRL + C and make sure the exit function is called. ret = pyrun(textwrap.dedent( """ import os, imp, sys mod = imp.load_source("mod", r"{}") def foo(): with open(r"{}", "ab") as f: f.write(b"1") mod.register_exit_fun(foo) sys.stderr = os.devnull raise KeyboardInterrupt """.format(os.path.abspath(__file__), TESTFN) )) self.assertEqual(ret, 1) with open(TESTFN, "rb") as f: self.assertEqual(f.read(), b"1")
def test_called_once(self): # Make sure the registered fun is called once. ret = pyrun(textwrap.dedent( """ import os, imp mod = imp.load_source("mod", r"{}") def foo(): with open(r"{}", "ab") as f: f.write(b"1") mod.register_exit_fun(foo) """.format(os.path.abspath(__file__), TESTFN) )) self.assertEqual(ret, 0) with open(TESTFN, "rb") as f: self.assertEqual(f.read(), b"1")
def test_cascade(self): # Register 2 functions and make sure the last registered # function is executed first. ret = pyrun(textwrap.dedent( """ import functools, os, imp mod = imp.load_source("mod", r"{}") def foo(s): with open(r"{}", "ab") as f: f.write(s) mod.register_exit_fun(functools.partial(foo, b'1')) mod.register_exit_fun(functools.partial(foo, b'2')) """.format(os.path.abspath(__file__), TESTFN) )) self.assertEqual(ret, 0) with open(TESTFN, "rb") as f: self.assertEqual(f.read(), b"21")
def test_all_exit_sigs_with_sig(self): # Same as above but the process is terminated by a signal # instead of exiting cleanly. for sig in TEST_SIGNALS: ret = pyrun(textwrap.dedent( """ import functools, os, signal, imp mod = imp.load_source("mod", r"{modname}") def foo(s): with open(r"{testfn}", "ab") as f: f.write(s) signal.signal({sig}, functools.partial(foo, b'0')) mod.register_exit_fun(functools.partial(foo, b'1')) mod.register_exit_fun(functools.partial(foo, b'2')) os.kill(os.getpid(), {sig}) """.format(modname=os.path.abspath(__file__), testfn=TESTFN, sig=sig) )) self.assertEqual(ret, sig) with open(TESTFN, "rb") as f: self.assertEqual(f.read(), b"210") safe_remove(TESTFN)
def test_as_deco(self): ret = pyrun(textwrap.dedent( """ import imp mod = imp.load_source("mod", r"{}") @mod.register_exit_fun def foo(): with open(r"{}", "ab") as f: f.write(b"1") """.format(os.path.abspath(__file__), TESTFN) )) self.assertEqual(ret, 0) with open(TESTFN, "rb") as f: self.assertEqual(f.read(), b"1")
def create_example1(tmpdir): tmpdir.join("setup.py").write(d(""" from setuptools import setup def main(): setup( name='example1', description='example1 project for testing detox', version='0.4', packages=['example1',], ) if __name__ == '__main__': main() """)) tmpdir.join("tox.ini").write(d(""" [testenv:py] """)) tmpdir.join("example1", "__init__.py").ensure()
def format_description(self, description): # leave full control over description to us if description: if hasattr(self.parser, 'main'): label = 'Commands' else: label = 'Description' # some doc strings have initial newlines, some don't description = description.lstrip('\n') # some doc strings have final newlines and spaces, some don't description = description.rstrip() # dedent, then reindent description = self.indent_lines(textwrap.dedent(description), " ") description = '%s:\n%s\n' % (label, description) return description else: return ''
def print_nav(): """ Print prompt ?????? """ msg = """\n\033[1;32m### ?????????? ### \033[0m 1) ?? \033[32mID\033[0m ???? ? ??\033[32m?? IP,???,??\033[0m ??????(????). 2) ?? \033[32m/\033[0m + \033[32mIP, ??? or ?? \033[0m??. ?: /ip 3) ?? \033[32mP/p\033[0m ?????????. 4) ?? \033[32mG/g\033[0m ??????????. 5) ?? \033[32mG/g\033[0m\033[0m + \033[32m?ID\033[0m ???????. ?: g1 6) ?? \033[32mE/e\033[0m ??????. 7) ?? \033[32mU/u\033[0m ??????. 8) ?? \033[32mD/d\033[0m ??????. 9) ?? \033[32mH/h\033[0m ??. 0) ?? \033[32mQ/q\033[0m ??. """ print textwrap.dedent(msg)
def assert_load(self, main_includes, included_files, include_cwd=None): """ :type main_includes: list :type included_files: dict :type include_cwd: str or None """ __tracebackhide__ = True # pylint: disable=unused-variable indented_includes = '\n'.join([' - {}'.format(include) for include in main_includes]) yaml = substitute_yaml(textwrap.dedent(""" {_default_}: Fruits {_include_}: {main_includes} Fruits: orange: 1 """), main_includes=indented_includes) with patch_open_read(included_files): config = octoconf.loads(yaml, include_cwd=include_cwd) assert {'orange': 1} == config.get_dict()
def test_query(self): triples = Triples([("john", RDF.type, "doe"), ("john", "p", "o")]) res = await self.client.query(""" SELECT * FROM {{graph}} WHERE { {{}} } """, triples) self.assertEqual(res['path'], self.client_kwargs['endpoint']) self.assertIn('post', res) self.assertIn('query', res['post']) self.assertEqual(res['post']['query'], dedent("""\ PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> SELECT * FROM <http://mu.semte.ch/test-application> WHERE { john rdf:type "doe" ; p "o" . }""")) with self.assertRaises(SPARQLRequestFailed): await self.client.query("failure")
def test_update(self): triples = Triples([("john", RDF.type, "doe"), ("john", "p", "o")]) res = await self.client.update(""" WITH {{graph}} INSERT DATA { {{}} } """, triples) self.assertEqual(res['path'], self.client_kwargs['update_endpoint']) self.assertIn('post', res) self.assertIn('update', res['post']) self.assertEqual(res['post']['update'], dedent("""\ PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> WITH <http://mu.semte.ch/test-application> INSERT DATA { john rdf:type "doe" ; p "o" . }""")) with self.assertRaises(SPARQLRequestFailed): await self.client.update("failure")
def test_node_in_node(self): node1 = Node(IRI("john"), [ ("foo", "bar"), ]) node2 = Node(IRI("jane"), [ ("foo", "bar"), ]) node3 = Node("parent", [ ("child1", node1), ("child2", node2), ("foo", "bar"), ]) self.assertEqual(str(node3), dedent("""\ parent child1 <john> ; child2 <jane> ; foo "bar" . <jane> foo "bar" . <john> foo "bar" ."""))