Python textwrap 模块,dedent() 实例源码


项目:python-    作者:secondtonone1
def _warn_legacy_version(self):
        LV = packaging.version.LegacyVersion
        is_legacy = isinstance(self._parsed_version, LV)
        if not is_legacy:

        # While an empty version is technically a legacy version and
        # is not a valid PEP 440 version, it's also unlikely to
        # actually come from someone and instead it is more likely that
        # it comes from setuptools attempting to parse a filename and
        # including it in the list. So for that we'll gate this warning
        # on if the version is anything at all or not.
        if not self.version:

        tmpl = textwrap.dedent("""
            '{project_name} ({version})' is being parsed as a legacy,
            non PEP 440,
            version. You may find odd behavior and sort order.
            In particular it will be sorted as less than 0.0. It
            is recommended to migrate to PEP 440 compatible
            """).strip().replace('\n', ' ')

        warnings.warn(tmpl.format(**vars(self)), PEP440Warning)
项目:markdown-xblock    作者:hastexo
def parse_xml(cls, node, runtime, keys, id_generator):
        Parses the source XML in a way that preserves indenting, needed for markdown.

        block = runtime.construct_xblock_from_class(cls, keys)

        # Load the data
        for name, value in node.items():
            if name in block.fields:
                value = (block.fields[name]).from_string(value)
                setattr(block, name, value)

        # Load content
        text = node.text
        if text:
            # Fix up whitespace.
            if text[0] == "\n":
                text = text[1:]
            text = textwrap.dedent(text)
            if text:
                block.content = text

        return block
项目:zipline-chinese    作者:zhanghan1990
def inspect(self):
        Return a string representation of the data stored in this array.
        return dedent(
            Adjusted Array ({dtype}):


项目:sappho    作者:lily-mayfield
def test_render(self):
        csv = textwrap.dedent(self.TILEMAP_CSV).strip()
        tilemap = (sappho.tiles.TileMap.
                   from_csv_string_and_tilesheet(csv, self.tilesheet))

        # Create a surface that has 1x2 strips of red, green, and
        # blue to against the rendered tilemap. This surface has
        # to have the SRCALPHA flag and a depth of 32 to match
        # the surface returned by the render function.
        test_surface = pygame.surface.Surface((3, 2), pygame.SRCALPHA, 32)
        test_surface.fill((255, 0, 0), pygame.Rect(0, 0, 1, 2))
        test_surface.fill((0, 255, 0), pygame.Rect(1, 0, 1, 2))
        test_surface.fill((0, 0, 255), pygame.Rect(2, 0, 1, 2))

        # Render the tilemap
        output_surface = tilemap.to_surface()

        # Compare the two surfaces
        assert(compare_surfaces(test_surface, output_surface))
项目:xarray-simlab    作者:benbovy
def test_info(self, process, process_repr):
        for cls_or_obj in [ExampleProcess, process]:
            buf = StringIO()
            actual = buf.getvalue()
            assert actual == process_repr

        class EmptyProcess(Process):

        expected = dedent("""\
            time_dependent: True""")

        buf = StringIO()
        actual = buf.getvalue()
        assert actual == expected
项目:sphinxcontrib-openapi    作者:ikalnytskyi
def test_unicode_is_allowed(self):
        spec = {
            'paths': {
                '/resource_a': {
                    'get': {
                        'description': '\u041f',
                        'responses': {
                            '200': {'description': 'ok'}

        text = '\n'.join(openapi.openapi2httpdomain(spec))

        assert text == textwrap.dedent('''
            .. http:get:: /resource_a
               :synopsis: null


               :status 200:
项目:bob    作者:BobBuildTool
def upload(self, step, buildIdFile, tgzFile):
        # only upload if requested
        if not self.canUploadJenkins():
            return ""

        # upload with curl if file does not exist yet on server
        return "\n" + textwrap.dedent("""\
            # upload artifact
            cd $WORKSPACE
            BOB_UPLOAD_BID="$(hexdump -ve '/1 "%02x"' {BUILDID}){GEN}"
            if ! curl --output /dev/null --silent --head --fail "$BOB_UPLOAD_URL" ; then
                BOB_UPLOAD_RSP=$(curl -sSgf -w '%{{http_code}}' -H 'If-None-Match: *' -T {RESULT} "$BOB_UPLOAD_URL" || true)
                if [[ $BOB_UPLOAD_RSP != 2?? && $BOB_UPLOAD_RSP != 412 ]]; then
                    echo "Upload failed with code $BOB_UPLOAD_RSP"{FAIL}
            fi""".format(URL=self.__url.geturl(), BUILDID=quote(buildIdFile), RESULT=quote(tgzFile),
                         FAIL="" if self._ignoreErrors() else "; exit 1",
项目:bob    作者:BobBuildTool
def uploadJenkinsLiveBuildId(self, step, liveBuildId, buildId):
        # only upload if requested
        if not self.canUploadJenkins():
            return ""

        # upload with curl if file does not exist yet on server
        return "\n" + textwrap.dedent("""\
            # upload live build-id
            cd $WORKSPACE
            BOB_UPLOAD_BID="$(hexdump -ve '/1 "%02x"' {LIVEBUILDID}){GEN}"
            BOB_UPLOAD_RSP=$(curl -sSgf -w '%{{http_code}}' -H 'If-None-Match: *' -T {BUILDID} "$BOB_UPLOAD_URL" || true)
            if [[ $BOB_UPLOAD_RSP != 2?? && $BOB_UPLOAD_RSP != 412 ]]; then
                echo "Upload failed with code $BOB_UPLOAD_RSP"{FAIL}
            """.format(URL=self.__url.geturl(), LIVEBUILDID=quote(liveBuildId),
                       FAIL="" if self._ignoreErrors() else "; exit 1",
项目:shub-image    作者:scrapinghub
def _extract_scripts_from_project(setup_filename=''):
    """Parse and return scripts"""
    if not os.path.isfile(setup_filename):
        return ''
    mock_setup = textwrap.dedent('''\
    def setup(*args, **kwargs):
        __setup_calls__.append((args, kwargs))
    parsed_mock_setup = ast.parse(mock_setup, filename=setup_filename)
    with open(setup_filename, 'rt') as setup_file:
        parsed = ast.parse(
        for index, node in enumerate(parsed.body[:]):
            if (not isinstance(node, ast.Expr) or
                    not isinstance(node.value, ast.Call) or
           != 'setup'):
            parsed.body[index:index] = parsed_mock_setup.body
    fixed = ast.fix_missing_locations(parsed)
    codeobj = compile(fixed, setup_filename, 'exec')
    local_vars = {}
    global_vars = {'__setup_calls__': []}
    exec(codeobj, global_vars, local_vars)
    _, kwargs = global_vars['__setup_calls__'][0]
    return ','.join([os.path.basename(f) for f in kwargs.get('scripts', [])])
项目:sphinxcontrib-versioning    作者:Robpol86
def test_colors(tmpdir):
    """Test colors.

    :param tmpdir: pytest fixture.
    script = dedent("""\
    import logging
    from sphinxcontrib.versioning.setup_logging import setup_logging

    setup_logging(verbose=False, colors=True)
    logger = logging.getLogger("{logger}")
    logger.debug("Debug")  # Not printed since verbose = False.
    """).format(logger=ColorFormatter.SPECIAL_SCOPE + '.sample')

    output =, [sys.executable, ''])
    assert '\033[31m=> Critical\033[39m\n' in output
    assert '\033[31m=> Error\033[39m\n' in output
    assert '\033[33m=> Warning\033[39m\n' in output
    assert '\033[36m=> Info\033[39m' in output
    assert 'Debug' not in output
项目:Flask_Blog    作者:sugarguo
def format_description(self, description):
        # leave full control over description to us
        if description:
            if hasattr(self.parser, 'main'):
                label = 'Commands'
                label = 'Description'
            #some doc strings have inital newlines, some don't
            description = description.lstrip('\n')
            #some doc strings have final newlines and spaces, some don't
            description = description.rstrip()
            #dedent, then reindent
            description = self.indent_lines(textwrap.dedent(description), "  ")
            description = '%s:\n%s\n' % (label, description)
            return description
            return ''
项目:Flask_Blog    作者:sugarguo
def write_script(
        scriptdir, rev_id, content, encoding='ascii', sourceless=False):
    old = scriptdir.revision_map.get_revision(rev_id)
    path = old.path

    content = textwrap.dedent(content)
    if encoding:
        content = content.encode(encoding)
    with open(path, 'wb') as fp:
    pyc_path = util.pyc_file_from_path(path)
    if os.access(pyc_path, os.F_OK):
    script = Script._from_path(scriptdir, path)
    old = scriptdir.revision_map.get_revision(script.revision)
    if old.down_revision != script.down_revision:
        raise Exception("Can't change down_revision "
                        "on a refresh operation.")
    scriptdir.revision_map.add_revision(script, _replace=True)

    if sourceless:
项目:Flask_Blog    作者:sugarguo
def create_sdist():
        Return an sdist with a setup_requires dependency (of something that
        doesn't exist)
        with tempdir_context() as dir:
            dist_path = os.path.join(dir, 'setuptools-test-fetcher-1.0.tar.gz')
                    import setuptools
                        setup_requires = ['does-not-exist'],
            yield dist_path
项目:code    作者:ActiveState
def test_exit_cleanly(self):
            # Make sure handler fun is called on clean interpreter exit.
            ret = pyrun(textwrap.dedent(
                import os, imp
                mod = imp.load_source("mod", r"{}")

                def foo():
                    with open(r"{}", "ab") as f:

                """.format(os.path.abspath(__file__), TESTFN)
            self.assertEqual(ret, 0)
            with open(TESTFN, "rb") as f:
                self.assertEqual(, b"1")
项目:code    作者:ActiveState
def test_exception(self):
            # Make sure handler fun is called on exception.
            ret = pyrun(textwrap.dedent(
                import os, imp, sys
                mod = imp.load_source("mod", r"{}")

                def foo():
                    with open(r"{}", "ab") as f:

                sys.stderr = os.devnull
                1 / 0
                """.format(os.path.abspath(__file__), TESTFN)
            self.assertEqual(ret, 1)
            with open(TESTFN, "rb") as f:
                self.assertEqual(, b"1")
项目:code    作者:ActiveState
def test_kinterrupt(self):
            # Simulates CTRL + C and make sure the exit function is called.
            ret = pyrun(textwrap.dedent(
                import os, imp, sys
                mod = imp.load_source("mod", r"{}")

                def foo():
                    with open(r"{}", "ab") as f:

                sys.stderr = os.devnull
                raise KeyboardInterrupt
                """.format(os.path.abspath(__file__), TESTFN)
            self.assertEqual(ret, 1)
            with open(TESTFN, "rb") as f:
                self.assertEqual(, b"1")
项目:code    作者:ActiveState
def test_called_once(self):
            # Make sure the registered fun is called once.
            ret = pyrun(textwrap.dedent(
                import os, imp
                mod = imp.load_source("mod", r"{}")

                def foo():
                    with open(r"{}", "ab") as f:

                """.format(os.path.abspath(__file__), TESTFN)
            self.assertEqual(ret, 0)
            with open(TESTFN, "rb") as f:
                self.assertEqual(, b"1")
项目:code    作者:ActiveState
def test_cascade(self):
            # Register 2 functions and make sure the last registered
            # function is executed first.
            ret = pyrun(textwrap.dedent(
                import functools, os, imp
                mod = imp.load_source("mod", r"{}")

                def foo(s):
                    with open(r"{}", "ab") as f:

                mod.register_exit_fun(functools.partial(foo, b'1'))
                mod.register_exit_fun(functools.partial(foo, b'2'))
                """.format(os.path.abspath(__file__), TESTFN)
            self.assertEqual(ret, 0)
            with open(TESTFN, "rb") as f:
                self.assertEqual(, b"21")
项目:code    作者:ActiveState
def test_all_exit_sigs_with_sig(self):
            # Same as above but the process is terminated by a signal
            # instead of exiting cleanly.
            for sig in TEST_SIGNALS:
                ret = pyrun(textwrap.dedent(
                    import functools, os, signal, imp
                    mod = imp.load_source("mod", r"{modname}")

                    def foo(s):
                        with open(r"{testfn}", "ab") as f:

                    signal.signal({sig}, functools.partial(foo, b'0'))
                    mod.register_exit_fun(functools.partial(foo, b'1'))
                    mod.register_exit_fun(functools.partial(foo, b'2'))
                    os.kill(os.getpid(), {sig})
                               testfn=TESTFN, sig=sig)
                self.assertEqual(ret, sig)
                with open(TESTFN, "rb") as f:
                    self.assertEqual(, b"210")
项目:code    作者:ActiveState
def test_as_deco(self):
            ret = pyrun(textwrap.dedent(
                import imp
                mod = imp.load_source("mod", r"{}")

                def foo():
                    with open(r"{}", "ab") as f:

                """.format(os.path.abspath(__file__), TESTFN)
            self.assertEqual(ret, 0)
            with open(TESTFN, "rb") as f:
                self.assertEqual(, b"1")
项目:detox    作者:tox-dev
def create_example1(tmpdir):
        from setuptools import setup

        def main():
                description='example1 project for testing detox',
        if __name__ == '__main__':
    tmpdir.join("example1", "").ensure()
项目:geekcloud    作者:Mr-Linus
def print_nav():
        Print prompt
        msg = """\n\033[1;32m###    ??????????   ### \033[0m

        1) ?? \033[32mID\033[0m ???? ? ??\033[32m?? IP,???,??\033[0m ??????(????).
        2) ?? \033[32m/\033[0m + \033[32mIP, ??? or ?? \033[0m??. ?: /ip
        3) ?? \033[32mP/p\033[0m ?????????.
        4) ?? \033[32mG/g\033[0m ??????????.
        5) ?? \033[32mG/g\033[0m\033[0m + \033[32m?ID\033[0m ???????. ?: g1
        6) ?? \033[32mE/e\033[0m ??????.
        7) ?? \033[32mU/u\033[0m ??????.
        8) ?? \033[32mD/d\033[0m ??????.
        9) ?? \033[32mH/h\033[0m ??.
        0) ?? \033[32mQ/q\033[0m ??.
        print textwrap.dedent(msg)
项目:octoconf    作者:andras-tim
def assert_load(self, main_includes, included_files, include_cwd=None):
        :type main_includes: list
        :type included_files: dict
        :type include_cwd: str or None
        __tracebackhide__ = True  # pylint: disable=unused-variable

        indented_includes = '\n'.join(['  - {}'.format(include)
                                       for include in main_includes])

        yaml = substitute_yaml(textwrap.dedent("""
            {_default_}: Fruits

              orange: 1
            """), main_includes=indented_includes)

        with patch_open_read(included_files):
            config = octoconf.loads(yaml, include_cwd=include_cwd)

        assert {'orange': 1} == config.get_dict()
项目:aiosparql    作者:aio-libs
def test_query(self):
        triples = Triples([("john", RDF.type, "doe"), ("john", "p", "o")])
        res = await self.client.query("""
            SELECT *
            FROM {{graph}}
            WHERE {
            """, triples)
        self.assertEqual(res['path'], self.client_kwargs['endpoint'])
        self.assertIn('post', res)
        self.assertIn('query', res['post'])
        self.assertEqual(res['post']['query'], dedent("""\
            PREFIX rdf: <>

            SELECT *
            FROM <>
            WHERE {
                john rdf:type "doe" ;
                    p "o" .
        with self.assertRaises(SPARQLRequestFailed):
            await self.client.query("failure")
项目:aiosparql    作者:aio-libs
def test_update(self):
        triples = Triples([("john", RDF.type, "doe"), ("john", "p", "o")])
        res = await self.client.update("""
            WITH {{graph}}
            INSERT DATA {
            """, triples)
        self.assertEqual(res['path'], self.client_kwargs['update_endpoint'])
        self.assertIn('post', res)
        self.assertIn('update', res['post'])
        self.assertEqual(res['post']['update'], dedent("""\
            PREFIX rdf: <>

            WITH <>
            INSERT DATA {
                john rdf:type "doe" ;
                    p "o" .
        with self.assertRaises(SPARQLRequestFailed):
            await self.client.update("failure")
项目:aiosparql    作者:aio-libs
def test_node_in_node(self):
        node1 = Node(IRI("john"), [
            ("foo", "bar"),
        node2 = Node(IRI("jane"), [
            ("foo", "bar"),
        node3 = Node("parent", [
            ("child1", node1),
            ("child2", node2),
            ("foo", "bar"),
        self.assertEqual(str(node3), dedent("""\
            parent child1 <john> ;
                child2 <jane> ;
                foo "bar" .

            <jane> foo "bar" .

            <john> foo "bar" ."""))