Python os 模块,linesep() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.linesep()

项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def _install_message(self, message):
        """Format a message and blindly write to self._file."""
        from_line = None
        if isinstance(message, str) and message.startswith('From '):
            newline = message.find('\n')
            if newline != -1:
                from_line = message[:newline]
                message = message[newline + 1:]
            else:
                from_line = message
                message = ''
        elif isinstance(message, _mboxMMDFMessage):
            from_line = 'From ' + message.get_from()
        elif isinstance(message, email.message.Message):
            from_line = message.get_unixfrom()  # May be None.
        if from_line is None:
            from_line = 'From MAILER-DAEMON %s' % time.asctime(time.gmtime())
        start = self._file.tell()
        self._file.write(from_line + os.linesep)
        self._dump_message(message, self._file, self._mangle_from_)
        stop = self._file.tell()
        return (start, stop)
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def _generate_toc(self):
        """Generate key-to-(start, stop) table of contents."""
        starts, stops = [], []
        self._file.seek(0)
        while True:
            line_pos = self._file.tell()
            line = self._file.readline()
            if line.startswith('From '):
                if len(stops) < len(starts):
                    stops.append(line_pos - len(os.linesep))
                starts.append(line_pos)
            elif line == '':
                stops.append(line_pos)
                break
        self._toc = dict(enumerate(zip(starts, stops)))
        self._next_key = len(self._toc)
        self._file_length = self._file.tell()
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def get_message(self, key):
        """Return a Message representation or raise a KeyError."""
        start, stop = self._lookup(key)
        self._file.seek(start)
        self._file.readline()   # Skip '1,' line specifying labels.
        original_headers = StringIO.StringIO()
        while True:
            line = self._file.readline()
            if line == '*** EOOH ***' + os.linesep or line == '':
                break
            original_headers.write(line.replace(os.linesep, '\n'))
        visible_headers = StringIO.StringIO()
        while True:
            line = self._file.readline()
            if line == os.linesep or line == '':
                break
            visible_headers.write(line.replace(os.linesep, '\n'))
        body = self._file.read(stop - self._file.tell()).replace(os.linesep,
                                                                 '\n')
        msg = BabylMessage(original_headers.getvalue() + body)
        msg.set_visible(visible_headers.getvalue())
        if key in self._labels:
            msg.set_labels(self._labels[key])
        return msg
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def get_string(self, key):
        """Return a string representation or raise a KeyError."""
        start, stop = self._lookup(key)
        self._file.seek(start)
        self._file.readline()   # Skip '1,' line specifying labels.
        original_headers = StringIO.StringIO()
        while True:
            line = self._file.readline()
            if line == '*** EOOH ***' + os.linesep or line == '':
                break
            original_headers.write(line.replace(os.linesep, '\n'))
        while True:
            line = self._file.readline()
            if line == os.linesep or line == '':
                break
        return original_headers.getvalue() + \
               self._file.read(stop - self._file.tell()).replace(os.linesep,
                                                                 '\n')
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def fix_script(path):
    """Replace #!python with #!/path/to/python
    Return True if file was changed."""
    # XXX RECORD hashes will need to be updated
    if os.path.isfile(path):
        script = open(path, 'rb')
        try:
            firstline = script.readline()
            if not firstline.startswith(binary('#!python')):
                return False
            exename = sys.executable.encode(sys.getfilesystemencoding())
            firstline = binary('#!') + exename + binary(os.linesep)
            rest = script.read()
        finally:
            script.close()
        script = open(path, 'wb')
        try:
            script.write(firstline)
            script.write(rest)
        finally:
            script.close()
        return True
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def main(initial_args=None):
    if initial_args is None:
        initial_args = sys.argv[1:]

    autocomplete()

    try:
        cmd_name, cmd_args = parseopts(initial_args)
    except PipError:
        e = sys.exc_info()[1]
        sys.stderr.write("ERROR: %s" % e)
        sys.stderr.write(os.linesep)
        sys.exit(1)

    command = commands[cmd_name]()
    return command.main(cmd_args)
项目:pip-update-requirements    作者:alanhamlett    | 项目源码 | 文件源码
def main(args=None):
    if args is None:
        args = sys.argv[1:]

    # Configure our deprecation warnings to be sent through loggers
    deprecation.install_warning_logger()

    autocomplete()

    try:
        cmd_name, cmd_args = parseopts(args)
    except PipError as exc:
        sys.stderr.write("ERROR: %s" % exc)
        sys.stderr.write(os.linesep)
        sys.exit(1)

    # Needed for locale.getpreferredencoding(False) to work
    # in pip._internal.utils.encoding.auto_decode
    try:
        locale.setlocale(locale.LC_ALL, '')
    except locale.Error as e:
        # setlocale can apparently crash if locale are uninitialized
        logger.debug("Ignoring error %s when setting locale", e)
    command = commands_dict[cmd_name](isolated=check_isolated(cmd_args))
    return command.main(cmd_args)
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def remove(path, pth_file=None):
    pth_file = pth_file or PATH
    try:
        with open(pth_file, "r") as f:
            lines = f.read().splitlines()[2:]
    except IOError:
        raise ValueError("not found: {}".format(path))

    try:
        lines.remove(path)
    except ValueError:
        raise ValueError("not found: {}".format(path))

    lines.insert(0, HEADER)
    with open(pth_file, "w") as f:
        f.write(os.linesep.join(lines))
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def _printf(fmt, *args, **print3opts):
    '''Formatted print to sys.stdout or given stream.

        *print3opts* -- print keyword arguments, like Python 3.+
    '''
    if print3opts:  # like Python 3.0
        f = print3opts.get('file', None) or sys.stdout
        if args:
            f.write(fmt % args)
        else:
            f.write(fmt)
        f.write(print3opts.get('end', linesep))
        if print3opts.get('flush', False):
            f.flush()
    elif args:
        print(fmt % args)
    else:
        print(fmt)
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def set_exit_code(bld):
    """
    If any of the tests fail waf will exit with that exit code.
    This is useful if you have an automated build system which need
    to report on errors from the tests.
    You may use it like this:

        def build(bld):
            bld(features='cxx cxxprogram test', source='main.c', target='app')
            from waflib.Tools import waf_unit_test
            bld.add_post_fun(waf_unit_test.set_exit_code)
    """
    lst = getattr(bld, 'utest_results', [])
    for (f, code, out, err) in lst:
        if code:
            msg = []
            if out:
                msg.append('stdout:%s%s' % (os.linesep, out.decode('utf-8')))
            if err:
                msg.append('stderr:%s%s' % (os.linesep, err.decode('utf-8')))
            bld.fatal(os.linesep.join(msg))
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def write_json(self, data, pretty=True):
        """
        Writes a python object as JSON to disk. Files are always written as UTF8 as per the JSON standard::

            def build(bld):
                bld.path.find_node('xyz.json').write_json(199)

        :type  data: object
        :param data: The data to write to disk
        :type  pretty: boolean
        :param pretty: Determines if the JSON will be nicely space separated
        """
        import json # Python 2.6 and up
        indent = 2
        separators = (',', ': ')
        sort_keys = pretty
        newline = os.linesep
        if not pretty:
            indent = None
            separators = (',', ':')
            newline = ''
        output = json.dumps(data, indent=indent, separators=separators, sort_keys=sort_keys) + newline
        self.write(output, encoding='utf-8')
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def set_exit_code(bld):
    """
    If any of the tests fail waf will exit with that exit code.
    This is useful if you have an automated build system which need
    to report on errors from the tests.
    You may use it like this:

        def build(bld):
            bld(features='cxx cxxprogram test', source='main.c', target='app')
            from waflib.Tools import waf_unit_test
            bld.add_post_fun(waf_unit_test.set_exit_code)
    """
    lst = getattr(bld, 'utest_results', [])
    for (f, code, out, err) in lst:
        if code:
            msg = []
            if out:
                msg.append('stdout:%s%s' % (os.linesep, out.decode('utf-8')))
            if err:
                msg.append('stderr:%s%s' % (os.linesep, err.decode('utf-8')))
            bld.fatal(os.linesep.join(msg))
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def write_json(self, data, pretty=True):
        """
        Writes a python object as JSON to disk. Files are always written as UTF8 as per the JSON standard::

            def build(bld):
                bld.path.find_node('xyz.json').write_json(199)

        :type  data: object
        :param data: The data to write to disk
        :type  pretty: boolean
        :param pretty: Determines if the JSON will be nicely space separated
        """
        import json # Python 2.6 and up
        indent = 2
        separators = (',', ': ')
        sort_keys = pretty
        newline = os.linesep
        if not pretty:
            indent = None
            separators = (',', ':')
            newline = ''
        output = json.dumps(data, indent=indent, separators=separators, sort_keys=sort_keys) + newline
        self.write(output, encoding='utf-8')
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def set_exit_code(bld):
    """
    If any of the tests fail waf will exit with that exit code.
    This is useful if you have an automated build system which need
    to report on errors from the tests.
    You may use it like this:

        def build(bld):
            bld(features='cxx cxxprogram test', source='main.c', target='app')
            from waflib.Tools import waf_unit_test
            bld.add_post_fun(waf_unit_test.set_exit_code)
    """
    lst = getattr(bld, 'utest_results', [])
    for (f, code, out, err) in lst:
        if code:
            msg = []
            if out:
                msg.append('stdout:%s%s' % (os.linesep, out.decode('utf-8')))
            if err:
                msg.append('stderr:%s%s' % (os.linesep, err.decode('utf-8')))
            bld.fatal(os.linesep.join(msg))
项目:TerminalView    作者:Wramberg    | 项目源码 | 文件源码
def __init__(self, to=sys.stdout, only=(), *args, **kwargs):
        super(DebugStream, self).__init__(*args, **kwargs)

        def safe_str(chunk):
            if isinstance(chunk, bytes):
                chunk = chunk.decode("utf-8")
            elif not isinstance(chunk, str):
                chunk = str(chunk)

            return chunk

        class Bugger(object):
            __before__ = __after__ = lambda *args: None

            def __getattr__(self, event):
                def inner(*args, **kwargs):
                    to.write(event.upper() + " ")
                    to.write("; ".join(map(safe_str, args)))
                    to.write(" ")
                    to.write(", ".join("{0}: {1}".format(k, safe_str(v))
                                       for k, v in kwargs.items()))
                    to.write(os.linesep)
                return inner

        self.attach(Bugger(), only=only)
项目:merac-o-matic    作者:RogueAI42    | 项目源码 | 文件源码
def generate():
    global sound
    statement = coremeraco.constructRegularStatement() + os.linesep
    textArea.insert(Tkinter.END, statement)
    textArea.yview(Tkinter.END)
    if readOption and readOutputState.get():
        if sound:
            sound.stop()
        sound = playsnd.playsnd()
        threading.Thread(target=sound.play, args=(procfest.text2wave(statement),)).start()
    if saveOutputState.get():
        try:
            outputFileHandler = open(saveOutputDestination.get(), 'a') # 'a' means append mode
            outputFileHandler.write(statement)
            outputFileHandler.close()
        except IOError as Argument:
            tkMessageBox.showerror(productName, meracolocale.getLocalisedString(localisationFile, "ioErrorMessage") + str(Argument))
        except Exception as Argument:
            tkMessageBox.showerror(productName, meracolocale.getLocalisedString(localisationFile, "genericErrorMessage") + str(Argument))
项目:aws-encryption-sdk-cli    作者:awslabs    | 项目源码 | 文件源码
def test_append_metadata_file(tmpdir):
    initial_data = 'some data'
    my_metadata = {
        'some': 'data',
        'for': 'this metadata'
    }
    output_file = tmpdir.join('metadata')
    output_file.write_binary((initial_data + os.linesep).encode('utf-8'))

    with metadata.MetadataWriter(suppress_output=False)(str(output_file)) as writer:
        writer.write_metadata(**my_metadata)

    lines = output_file.readlines()
    assert len(lines) == 2
    assert lines[0].strip() == initial_data
    assert json.loads(lines[1].strip()) == my_metadata
项目:aws-encryption-sdk-cli    作者:awslabs    | 项目源码 | 文件源码
def test_overwrite_metdata_file(tmpdir):
    initial_data = 'some data'
    my_metadata = {
        'some': 'data',
        'for': 'this metadata'
    }
    output_file = tmpdir.join('metadata')
    output_file.write(initial_data + os.linesep)

    overwrite_writer = metadata.MetadataWriter(suppress_output=False)(str(output_file))
    overwrite_writer.force_overwrite()

    with overwrite_writer as writer:
        writer.write_metadata(**my_metadata)

    lines = output_file.readlines()
    assert len(lines) == 1
    assert json.loads(lines[0].strip()) == my_metadata
项目:aws-encryption-sdk-cli    作者:awslabs    | 项目源码 | 文件源码
def write_metadata(self, **metadata):
        # type: (**Any) -> Optional[int]
        """Writes metadata to the output stream if output is not suppressed.

        :param **metadata: JSON-serializeable metadata kwargs to write
        """
        if self.suppress_output:
            return 0  # wrote 0 bytes

        metadata_line = json.dumps(metadata, sort_keys=True) + os.linesep
        metadata_output = ''  # type: Union[str, bytes]
        if 'b' in self._output_mode:
            metadata_output = metadata_line.encode('utf-8')
        else:
            metadata_output = metadata_line
        return self._output_stream.write(metadata_output)
项目:abusehelper    作者:Exploit-install    | 项目源码 | 文件源码
def _collect(self, room_name, compress):
        event = yield idiokit.next()

        while True:
            current = datetime.utcnow().day

            with _open_archive(self.archive_dir, time.time(), room_name) as archive:
                self.log.info("Opened archive {0!r}".format(archive.name))

                while current == datetime.utcnow().day:
                    json_dict = dict((key, event.values(key)) for key in event.keys())
                    archive.write(json.dumps(json_dict) + os.linesep)

                    event = yield idiokit.next()

            yield compress.queue(0.0, _rename(archive.name))
项目:robograph    作者:csparpa    | 项目源码 | 文件源码
def output(self):
        if self._params['delimiter'] is None:
            delim = ','
        else:
            delim = self._params['delimiter']

        if self._params['linesep'] is None:
            eol = os.linesep
        else:
            eol = self._params['linesep']

        lines = eol.join([delim.join(map(str, row)) for row in self._params['data_matrix']])
        if self._params['header_list']:
            header = delim.join(self._params['header_list'])
        else:
            header = ''
        return header + eol + lines
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def main(args=None):
    if args is None:
        args = sys.argv[1:]

    # Configure our deprecation warnings to be sent through loggers
    deprecation.install_warning_logger()

    autocomplete()

    try:
        cmd_name, cmd_args = parseopts(args)
    except PipError as exc:
        sys.stderr.write("ERROR: %s" % exc)
        sys.stderr.write(os.linesep)
        sys.exit(1)

    # Needed for locale.getpreferredencoding(False) to work
    # in pip.utils.encoding.auto_decode
    locale.setlocale(locale.LC_ALL, '')
    command = commands_dict[cmd_name](isolated=check_isolated(cmd_args))
    return command.main(cmd_args)


# ###########################################################
# # Writing freeze files
项目:gui_tool    作者:UAVCAN    | 项目源码 | 文件源码
def keyPressEvent(self, qkeyevent):
        if qkeyevent.matches(QKeySequence.Copy):
            selected_rows = [x.row() for x in self.selectionModel().selectedRows()]
            logger.info('Copy to clipboard requested [%r rows]' % len(selected_rows))

            out_string = ''
            for row in selected_rows:
                out_string += self.get_row_as_string(row) + os.linesep

            if out_string:
                QApplication.clipboard().setText(out_string)
        else:
            super(BasicTable, self).keyPressEvent(qkeyevent)

        if qkeyevent.matches(QKeySequence.InsertParagraphSeparator):
            if self.hasFocus():
                self.on_enter_pressed([(x.row(), x.column()) for x in self.selectedIndexes()])
项目:colorize    作者:BruXy    | 项目源码 | 文件源码
def print_help():
    """Print help encoded as initial script's comment between #< and #>."""
    show_line = False
    fp = open(sys.argv[0], 'r')

    for line in fp:
        if line[0:2] == '#<':  # start token
            show_line = True
            continue
        elif line[0:2] == '#>':  # end token
            return

        if show_line == True:
            print(line.rstrip(os.linesep)[1:])

    fp.close()
项目:colorize    作者:BruXy    | 项目源码 | 文件源码
def gui_ask_for_api():
    """Gtk dialog for API key insert."""
    message = gtk.MessageDialog(type=gtk.MESSAGE_QUESTION, buttons=gtk.BUTTONS_OK_CANCEL)
    message.set_markup(colorize.MSG_ASK_API.replace(colorize.URL,"<u>" + colorize.URL +"</u>"))

    entry = gtk.Entry(max=64)
    entry.set_text("Enter your API key")
    entry.show()
    message.vbox.pack_end(entry)
    entry.connect("activate", lambda _: d.response(gtk.RESPONSE_OK))
    message.set_default_response(gtk.RESPONSE_OK)
    message.run()

    api_key = entry.get_text().decode('utf8')
    fp = open(colorize.HOME + colorize.API_KEY_FILE, 'w')
    fp.write("YOUR_API_KEY={0}{1}".format(api_key, os.linesep))
    fp.close()

    # process buttong click immediately
    message.destroy()
    while gtk.events_pending():
        gtk.main_iteration()
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def testOutputSignal(self):
        # Use SIGKILL here because it's guaranteed to be delivered. Using
        # SIGHUP might not work in, e.g., a buildbot slave run under the
        # 'nohup' command.
        exe = sys.executable
        scriptFile = self.makeSourceFile([
            "import sys, os, signal",
            "sys.stdout.write('stdout bytes\\n')",
            "sys.stderr.write('stderr bytes\\n')",
            "sys.stdout.flush()",
            "sys.stderr.flush()",
            "os.kill(os.getpid(), signal.SIGKILL)"
            ])

        def gotOutputAndValue(err):
            (out, err, sig) = err.value # XXX Sigh wtf
            self.assertEquals(out, "stdout bytes" + os.linesep)
            self.assertEquals(err, "stderr bytes" + os.linesep)
            self.assertEquals(sig, signal.SIGKILL)

        d = utils.getProcessOutputAndValue(exe, ['-u', scriptFile])
        return d.addErrback(gotOutputAndValue)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def fix_script(path):
    """Replace #!python with #!/path/to/python
    Return True if file was changed."""
    # XXX RECORD hashes will need to be updated
    if os.path.isfile(path):
        script = open(path, 'rb')
        try:
            firstline = script.readline()
            if not firstline.startswith(binary('#!python')):
                return False
            exename = sys.executable.encode(sys.getfilesystemencoding())
            firstline = binary('#!') + exename + binary(os.linesep)
            rest = script.read()
        finally:
            script.close()
        script = open(path, 'wb')
        try:
            script.write(firstline)
            script.write(rest)
        finally:
            script.close()
        return True
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def main(initial_args=None):
    if initial_args is None:
        initial_args = sys.argv[1:]

    autocomplete()

    try:
        cmd_name, cmd_args = parseopts(initial_args)
    except PipError:
        e = sys.exc_info()[1]
        sys.stderr.write("ERROR: %s" % e)
        sys.stderr.write(os.linesep)
        sys.exit(1)

    command = commands[cmd_name]()
    return command.main(cmd_args)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def _install_message(self, message):
        """Format a message and blindly write to self._file."""
        from_line = None
        if isinstance(message, str) and message.startswith('From '):
            newline = message.find('\n')
            if newline != -1:
                from_line = message[:newline]
                message = message[newline + 1:]
            else:
                from_line = message
                message = ''
        elif isinstance(message, _mboxMMDFMessage):
            from_line = 'From ' + message.get_from()
        elif isinstance(message, email.message.Message):
            from_line = message.get_unixfrom()  # May be None.
        if from_line is None:
            from_line = 'From MAILER-DAEMON %s' % time.asctime(time.gmtime())
        start = self._file.tell()
        self._file.write(from_line + os.linesep)
        self._dump_message(message, self._file, self._mangle_from_)
        stop = self._file.tell()
        return (start, stop)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def get_message(self, key):
        """Return a Message representation or raise a KeyError."""
        start, stop = self._lookup(key)
        self._file.seek(start)
        self._file.readline()   # Skip '1,' line specifying labels.
        original_headers = StringIO.StringIO()
        while True:
            line = self._file.readline()
            if line == '*** EOOH ***' + os.linesep or line == '':
                break
            original_headers.write(line.replace(os.linesep, '\n'))
        visible_headers = StringIO.StringIO()
        while True:
            line = self._file.readline()
            if line == os.linesep or line == '':
                break
            visible_headers.write(line.replace(os.linesep, '\n'))
        body = self._file.read(stop - self._file.tell()).replace(os.linesep,
                                                                 '\n')
        msg = BabylMessage(original_headers.getvalue() + body)
        msg.set_visible(visible_headers.getvalue())
        if key in self._labels:
            msg.set_labels(self._labels[key])
        return msg
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def get_string(self, key):
        """Return a string representation or raise a KeyError."""
        start, stop = self._lookup(key)
        self._file.seek(start)
        self._file.readline()   # Skip '1,' line specifying labels.
        original_headers = StringIO.StringIO()
        while True:
            line = self._file.readline()
            if line == '*** EOOH ***' + os.linesep or line == '':
                break
            original_headers.write(line.replace(os.linesep, '\n'))
        while True:
            line = self._file.readline()
            if line == os.linesep or line == '':
                break
        return original_headers.getvalue() + \
               self._file.read(stop - self._file.tell()).replace(os.linesep,
                                                                 '\n')
项目:idem    作者:mazerty    | 项目源码 | 文件源码
def hard(args):
    ref = collections.defaultdict(list)

    with open(args.ref, "rb") as f:
        b = f.read(1)
        i = 0
        while b:
            ref[b].append(i)
            i = i + 1
            b = f.read(1)

    if len(ref) < 256:
        raise Exception

    output = []

    with open(args.input, "rb") as f:
        b = f.read(1)
        while b:
            output.append(ref[b][random.randrange(len(ref[b]))])
            b = f.read(1)

    with open(args.output, "w") as f:
        for item in output:
            f.write(str(item) + os.linesep)
项目:packet_analysis    作者:tanjiti    | 项目源码 | 文件源码
def __parse_data(self, proto_parse_class, data, file_hd):
        """

        Args:
            proto_parse_class:
            data:
            file_hd:

        Returns:

        """

        proto_parse_ob = proto_parse_class(data)
        data_yield = proto_parse_ob.parse_data(sep="\x00")
        for d in data_yield:
            if d:
                if file_hd:
                    file_hd.write("%r%s" % (d, os.linesep))
                if self.std_output_enable:
                    if not isinstance(d, dict):
                        d = d.toDict()
                    print json.dumps(d, indent=4)
项目:PSyclone    作者:stfc    | 项目源码 | 文件源码
def __str__(self):
        res = "DynArgDescriptor03 object" + os.linesep
        res += "  argument_type[0]='{0}'".format(self._type)
        if self._vector_size > 1:
            res += "*"+str(self._vector_size)
        res += os.linesep
        res += "  access_descriptor[1]='{0}'".format(self._access_descriptor) \
               + os.linesep
        if self._type == "gh_field":
            res += "  function_space[2]='{0}'".format(self._function_space1) \
                   + os.linesep
        elif self._type in VALID_OPERATOR_NAMES:
            res += "  function_space_to[2]='{0}'".\
                   format(self._function_space1) + os.linesep
            res += "  function_space_from[3]='{0}'".\
                   format(self._function_space2) + os.linesep
        elif self._type in VALID_SCALAR_NAMES:
            pass  # we have nothing to add if we're a scalar
        else:  # we should never get to here
            raise ParseError("Internal error in DynArgDescriptor03.__str__")
        return res
项目:core-workflow    作者:python    | 项目源码 | 文件源码
def amend_commit_message(self, cherry_pick_branch):
        """ prefix the commit message with (X.Y) """

        commit_prefix = ""
        if self.prefix_commit:
            commit_prefix = f"[{get_base_branch(cherry_pick_branch)}] "
        updated_commit_message = f"{commit_prefix}{self.get_commit_message(self.commit_sha1)}{os.linesep}(cherry picked from commit {self.commit_sha1})"
        updated_commit_message = updated_commit_message.replace('#', 'GH-')
        if self.dry_run:
            click.echo(f"  dry-run: git commit --amend -m '{updated_commit_message}'")
        else:
            try:
                subprocess.check_output(["git", "commit", "--amend", "-m",
                                         updated_commit_message],
                                         stderr=subprocess.STDOUT)
            except subprocess.CalledProcessError as cpe:
                click.echo("Failed to amend the commit message  \u2639")
                click.echo(cpe.output)
        return updated_commit_message
项目:Generative-ConvACs    作者:HUJI-Deep    | 项目源码 | 文件源码
def convert_and_parse_xml(src_model_fname):
    dst_model_fname = os.path.basename(src_model_fname).split('.')[0] + '.xml.mdl'
    with open(dst_model_fname, 'wb') as wfile:
        wfile.write('<MODEL>\n')
        with open(src_model_fname, 'rb') as rfile:
            for line in rfile.readlines():
                newline = line
                if '<CNT>' in line:
                    newline = line.strip() + '</CNT>'
                elif '<MEAN>' in line:
                    newline = line.strip() + '</MEAN>'

                elif pn_re.findall(line):
                    newline = pn_re.sub(r'@ \2 \3 \4 \5 @',line)

                wfile.write(newline.strip() + os.linesep)
            wfile.write('</MODEL>\n')

    xmldoc = minidom.parse(dst_model_fname)
    os.remove(dst_model_fname)
    return xmldoc
项目:eyeD3    作者:nicfit    | 项目源码 | 文件源码
def start(self, args, config):
        super(DisplayPlugin, self).start(args, config)

        if args.pattern_help:
            self.__print_pattern_help()
            return

        if not _have_grako:
            console.printError(u"Unknown module 'grako'" + os.linesep +
                               u"Please install grako! " +
                               u"E.g. $ pip install grako")
            self.__return_code = 2
            return

        if args.pattern_string is not None:
            self.__pattern = Pattern(args.pattern_string)
        if args.pattern_file is not None:
            pfile = open(args.pattern_file, "r")
            self.__pattern = Pattern(''.join(pfile.read().splitlines()))
            pfile.close()
        self.__output_ending = "" if args.no_newline else os.linesep
项目:eyeD3    作者:nicfit    | 项目源码 | 文件源码
def __print_pattern_help(self):
        # FIXME: Force some order
        print(console.formatText("ID3 Tags:", b=True))
        self.__print_complex_pattern_help(TagPattern)
        print(os.linesep)

        print(console.formatText("Functions:", b=True))
        self.__print_complex_pattern_help(FunctionPattern)
        print(os.linesep)

        print(console.formatText("Special characters:", b=True))
        print(console.formatText("\tescape seq.   character"))
        for i in range(len(TextPattern.SPECIAL_CHARACTERS)):
            print(("\t\\%s" + (" " * 12) + "%s") %
                  (TextPattern.SPECIAL_CHARACTERS[i],
                   TextPattern.SPECIAL_CHARACTERS_DESCRIPTIONS[i]))
项目:TCP-IP    作者:JackZ0    | 项目源码 | 文件源码
def _wrap_lines(msg):
    """Format lines nicely to 80 chars.

    :param str msg: Original message

    :returns: Formatted message respecting newlines in message
    :rtype: str

    """
    lines = msg.splitlines()
    fixed_l = []

    for line in lines:
        fixed_l.append(textwrap.fill(
            line,
            80,
            break_long_words=False,
            break_on_hyphens=False))

    return os.linesep.join(fixed_l)
项目:TCP-IP    作者:JackZ0    | 项目源码 | 文件源码
def _ask_user_to_confirm_new_names(config, new_domains, certname, old_domains):
    """Ask user to confirm update cert certname to contain new_domains.
    """
    if config.renew_with_new_domains:
        return

    msg = ("You are updating certificate {0} to include domains: {1}{br}{br}"
           "It previously included domains: {2}{br}{br}"
           "Did you intend to make this change?".format(
               certname,
               ", ".join(new_domains),
               ", ".join(old_domains),
               br=os.linesep))
    obj = zope.component.getUtility(interfaces.IDisplay)
    if not obj.yesno(msg, "Update cert", "Cancel", default=True):
        raise errors.ConfigurationError("Specified mismatched cert name and domains.")
项目:TCP-IP    作者:JackZ0    | 项目源码 | 文件源码
def test_get_version(self, mock_script):
        mock_script.return_value = (
            "Server Version: Apache/2.4.2 (Debian)", "")
        self.assertEqual(self.config.get_version(), (2, 4, 2))

        mock_script.return_value = (
            "Server Version: Apache/2 (Linux)", "")
        self.assertEqual(self.config.get_version(), (2,))

        mock_script.return_value = (
            "Server Version: Apache (Debian)", "")
        self.assertRaises(errors.PluginError, self.config.get_version)

        mock_script.return_value = (
            "Server Version: Apache/2.3{0} Apache/2.4.7".format(
                os.linesep), "")
        self.assertRaises(errors.PluginError, self.config.get_version)

        mock_script.side_effect = errors.SubprocessError("Can't find program")
        self.assertRaises(errors.PluginError, self.config.get_version)
项目:bpy_lambda    作者:bcongdon    | 项目源码 | 文件源码
def update_init_file(version: tuple):
    import os
    import re

    print('Updating __init__.py to have the correct version.')
    version_line_re = re.compile(r'^\s+[\'"]version[\'"]: (\([0-9,]+\)),')

    with open('__init__.py', 'r') as infile, \
         open('__init__.py~whl~installer~', 'w') as outfile:

        for line in infile:
            if version_line_re.match(line):
                outfile.write("    'version': %s,%s" % (version, os.linesep))
            else:
                outfile.write(line)

    os.unlink('__init__.py')
    os.rename('__init__.py~whl~installer~', '__init__.py')
项目:dask-ml    作者:dask    | 项目源码 | 文件源码
def _copy_partial_doc(cls):
    for base in cls.mro():
        if base.__module__.startswith('sklearn'):
            break
    lines = base.__doc__.split(os.linesep)
    header, rest = lines[0], lines[1:]

    insert = """

    This class wraps scikit-learn's {classname}. When a dask-array is passed
    to our ``fit`` method, the array is passed block-wise to the scikit-learn
    class' ``partial_fit`` method. This will allow you to fit the estimator
    on larger-than memory datasets sequentially (block-wise), but without an
    parallelism, or any ability to distribute across a cluster.""".format(
        classname=base.__name__)

    doc = '\n'.join([header + insert] + rest)

    cls.__doc__ = doc
    return cls
项目:PhonePerformanceMeasure    作者:KyleCe    | 项目源码 | 文件源码
def replace_line_in_file(f, start_exp, to_replace):
    lines = read_lines_of_file(f)
    for i in range(len(lines)):
        if lines[i].startswith(start_exp):
            lines[i] = to_replace + os.linesep
    write_lines_into_file(Res.project_path + f, lines)
项目:txt2evernote    作者:Xunius    | 项目源码 | 文件源码
def ENMLtoText(contentENML):
        soup = BeautifulSoup(contentENML.decode('utf-8'))

        for section in soup.select('li > p'):
            section.replace_with( section.contents[0] )

        for section in soup.select('li > br'):
            if section.next_sibling:
                next_sibling = section.next_sibling.next_sibling
                if next_sibling:
                    if next_sibling.find('li'):
                        section.extract()
                else:
                    section.extract()

        Editor.checklistInENMLtoSoup(soup)

        for section in soup.findAll('en-todo', checked='true'):
            section.replace_with('[x]')

        for section in soup.findAll('en-todo'):
            section.replace_with('[ ]')

        content = html2text.html2text(str(soup).decode('utf-8'), '', 0)
        content = re.sub(r' *\n', os.linesep, content)

        return content.encode('utf-8')
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def parseopts(args):
    parser = create_main_parser()

    # Note: parser calls disable_interspersed_args(), so the result of this
    # call is to split the initial args into the general options before the
    # subcommand and everything else.
    # For example:
    #  args: ['--timeout=5', 'install', '--user', 'INITools']
    #  general_options: ['--timeout==5']
    #  args_else: ['install', '--user', 'INITools']
    general_options, args_else = parser.parse_args(args)

    # --version
    if general_options.version:
        sys.stdout.write(parser.version)
        sys.stdout.write(os.linesep)
        sys.exit()

    # pip || pip help -> print_help()
    if not args_else or (args_else[0] == 'help' and len(args_else) == 1):
        parser.print_help()
        sys.exit()

    # the subcommand name
    cmd_name = args_else[0]

    if cmd_name not in commands_dict:
        guess = get_similar_commands(cmd_name)

        msg = ['unknown command "%s"' % cmd_name]
        if guess:
            msg.append('maybe you meant "%s"' % guess)

        raise CommandError(' - '.join(msg))

    # all the args without the subcommand
    cmd_args = args[:]
    cmd_args.remove(cmd_name)

    return cmd_name, cmd_args
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def main(args=None):
    if args is None:
        args = sys.argv[1:]

    # Configure our deprecation warnings to be sent through loggers
    deprecation.install_warning_logger()

    autocomplete()

    try:
        cmd_name, cmd_args = parseopts(args)
    except PipError as exc:
        sys.stderr.write("ERROR: %s" % exc)
        sys.stderr.write(os.linesep)
        sys.exit(1)

    # Needed for locale.getpreferredencoding(False) to work
    # in pip.utils.encoding.auto_decode
    try:
        locale.setlocale(locale.LC_ALL, '')
    except locale.Error as e:
        # setlocale can apparently crash if locale are uninitialized
        logger.debug("Ignoring error %s when setting locale", e)
    command = commands_dict[cmd_name](isolated=check_isolated(cmd_args))
    return command.main(cmd_args)


# ###########################################################
# # Writing freeze files
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def parseopts(args):
    parser = create_main_parser()

    # Note: parser calls disable_interspersed_args(), so the result of this
    # call is to split the initial args into the general options before the
    # subcommand and everything else.
    # For example:
    #  args: ['--timeout=5', 'install', '--user', 'INITools']
    #  general_options: ['--timeout==5']
    #  args_else: ['install', '--user', 'INITools']
    general_options, args_else = parser.parse_args(args)

    # --version
    if general_options.version:
        sys.stdout.write(parser.version)
        sys.stdout.write(os.linesep)
        sys.exit()

    # pip || pip help -> print_help()
    if not args_else or (args_else[0] == 'help' and len(args_else) == 1):
        parser.print_help()
        sys.exit()

    # the subcommand name
    cmd_name = args_else[0]

    if cmd_name not in commands_dict:
        guess = get_similar_commands(cmd_name)

        msg = ['unknown command "%s"' % cmd_name]
        if guess:
            msg.append('maybe you meant "%s"' % guess)

        raise CommandError(' - '.join(msg))

    # all the args without the subcommand
    cmd_args = args[:]
    cmd_args.remove(cmd_name)

    return cmd_name, cmd_args
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def main(args=None):
    if args is None:
        args = sys.argv[1:]

    # Configure our deprecation warnings to be sent through loggers
    deprecation.install_warning_logger()

    autocomplete()

    try:
        cmd_name, cmd_args = parseopts(args)
    except PipError as exc:
        sys.stderr.write("ERROR: %s" % exc)
        sys.stderr.write(os.linesep)
        sys.exit(1)

    # Needed for locale.getpreferredencoding(False) to work
    # in pip.utils.encoding.auto_decode
    try:
        locale.setlocale(locale.LC_ALL, '')
    except locale.Error as e:
        # setlocale can apparently crash if locale are uninitialized
        logger.debug("Ignoring error %s when setting locale", e)
    command = commands_dict[cmd_name](isolated=check_isolated(cmd_args))
    return command.main(cmd_args)


# ###########################################################
# # Writing freeze files
项目:BioDownloader    作者:biomadeira    | 项目源码 | 文件源码
def gather_dependencies():
    import os
    with open('requirements.txt', 'r') as f_in:
        return [l for l in f_in.read().rsplit(os.linesep) 
                if l and not l.startswith("#")]