我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.linesep()。
def _install_message(self, message): """Format a message and blindly write to self._file.""" from_line = None if isinstance(message, str) and message.startswith('From '): newline = message.find('\n') if newline != -1: from_line = message[:newline] message = message[newline + 1:] else: from_line = message message = '' elif isinstance(message, _mboxMMDFMessage): from_line = 'From ' + message.get_from() elif isinstance(message, email.message.Message): from_line = message.get_unixfrom() # May be None. if from_line is None: from_line = 'From MAILER-DAEMON %s' % time.asctime(time.gmtime()) start = self._file.tell() self._file.write(from_line + os.linesep) self._dump_message(message, self._file, self._mangle_from_) stop = self._file.tell() return (start, stop)
def _generate_toc(self): """Generate key-to-(start, stop) table of contents.""" starts, stops = [], [] self._file.seek(0) while True: line_pos = self._file.tell() line = self._file.readline() if line.startswith('From '): if len(stops) < len(starts): stops.append(line_pos - len(os.linesep)) starts.append(line_pos) elif line == '': stops.append(line_pos) break self._toc = dict(enumerate(zip(starts, stops))) self._next_key = len(self._toc) self._file_length = self._file.tell()
def get_message(self, key): """Return a Message representation or raise a KeyError.""" start, stop = self._lookup(key) self._file.seek(start) self._file.readline() # Skip '1,' line specifying labels. original_headers = StringIO.StringIO() while True: line = self._file.readline() if line == '*** EOOH ***' + os.linesep or line == '': break original_headers.write(line.replace(os.linesep, '\n')) visible_headers = StringIO.StringIO() while True: line = self._file.readline() if line == os.linesep or line == '': break visible_headers.write(line.replace(os.linesep, '\n')) body = self._file.read(stop - self._file.tell()).replace(os.linesep, '\n') msg = BabylMessage(original_headers.getvalue() + body) msg.set_visible(visible_headers.getvalue()) if key in self._labels: msg.set_labels(self._labels[key]) return msg
def get_string(self, key): """Return a string representation or raise a KeyError.""" start, stop = self._lookup(key) self._file.seek(start) self._file.readline() # Skip '1,' line specifying labels. original_headers = StringIO.StringIO() while True: line = self._file.readline() if line == '*** EOOH ***' + os.linesep or line == '': break original_headers.write(line.replace(os.linesep, '\n')) while True: line = self._file.readline() if line == os.linesep or line == '': break return original_headers.getvalue() + \ self._file.read(stop - self._file.tell()).replace(os.linesep, '\n')
def fix_script(path): """Replace #!python with #!/path/to/python Return True if file was changed.""" # XXX RECORD hashes will need to be updated if os.path.isfile(path): script = open(path, 'rb') try: firstline = script.readline() if not firstline.startswith(binary('#!python')): return False exename = sys.executable.encode(sys.getfilesystemencoding()) firstline = binary('#!') + exename + binary(os.linesep) rest = script.read() finally: script.close() script = open(path, 'wb') try: script.write(firstline) script.write(rest) finally: script.close() return True
def main(initial_args=None): if initial_args is None: initial_args = sys.argv[1:] autocomplete() try: cmd_name, cmd_args = parseopts(initial_args) except PipError: e = sys.exc_info()[1] sys.stderr.write("ERROR: %s" % e) sys.stderr.write(os.linesep) sys.exit(1) command = commands[cmd_name]() return command.main(cmd_args)
def main(args=None): if args is None: args = sys.argv[1:] # Configure our deprecation warnings to be sent through loggers deprecation.install_warning_logger() autocomplete() try: cmd_name, cmd_args = parseopts(args) except PipError as exc: sys.stderr.write("ERROR: %s" % exc) sys.stderr.write(os.linesep) sys.exit(1) # Needed for locale.getpreferredencoding(False) to work # in pip._internal.utils.encoding.auto_decode try: locale.setlocale(locale.LC_ALL, '') except locale.Error as e: # setlocale can apparently crash if locale are uninitialized logger.debug("Ignoring error %s when setting locale", e) command = commands_dict[cmd_name](isolated=check_isolated(cmd_args)) return command.main(cmd_args)
def remove(path, pth_file=None): pth_file = pth_file or PATH try: with open(pth_file, "r") as f: lines = f.read().splitlines()[2:] except IOError: raise ValueError("not found: {}".format(path)) try: lines.remove(path) except ValueError: raise ValueError("not found: {}".format(path)) lines.insert(0, HEADER) with open(pth_file, "w") as f: f.write(os.linesep.join(lines))
def _printf(fmt, *args, **print3opts): '''Formatted print to sys.stdout or given stream. *print3opts* -- print keyword arguments, like Python 3.+ ''' if print3opts: # like Python 3.0 f = print3opts.get('file', None) or sys.stdout if args: f.write(fmt % args) else: f.write(fmt) f.write(print3opts.get('end', linesep)) if print3opts.get('flush', False): f.flush() elif args: print(fmt % args) else: print(fmt)
def set_exit_code(bld): """ If any of the tests fail waf will exit with that exit code. This is useful if you have an automated build system which need to report on errors from the tests. You may use it like this: def build(bld): bld(features='cxx cxxprogram test', source='main.c', target='app') from waflib.Tools import waf_unit_test bld.add_post_fun(waf_unit_test.set_exit_code) """ lst = getattr(bld, 'utest_results', []) for (f, code, out, err) in lst: if code: msg = [] if out: msg.append('stdout:%s%s' % (os.linesep, out.decode('utf-8'))) if err: msg.append('stderr:%s%s' % (os.linesep, err.decode('utf-8'))) bld.fatal(os.linesep.join(msg))
def write_json(self, data, pretty=True): """ Writes a python object as JSON to disk. Files are always written as UTF8 as per the JSON standard:: def build(bld): bld.path.find_node('xyz.json').write_json(199) :type data: object :param data: The data to write to disk :type pretty: boolean :param pretty: Determines if the JSON will be nicely space separated """ import json # Python 2.6 and up indent = 2 separators = (',', ': ') sort_keys = pretty newline = os.linesep if not pretty: indent = None separators = (',', ':') newline = '' output = json.dumps(data, indent=indent, separators=separators, sort_keys=sort_keys) + newline self.write(output, encoding='utf-8')
def __init__(self, to=sys.stdout, only=(), *args, **kwargs): super(DebugStream, self).__init__(*args, **kwargs) def safe_str(chunk): if isinstance(chunk, bytes): chunk = chunk.decode("utf-8") elif not isinstance(chunk, str): chunk = str(chunk) return chunk class Bugger(object): __before__ = __after__ = lambda *args: None def __getattr__(self, event): def inner(*args, **kwargs): to.write(event.upper() + " ") to.write("; ".join(map(safe_str, args))) to.write(" ") to.write(", ".join("{0}: {1}".format(k, safe_str(v)) for k, v in kwargs.items())) to.write(os.linesep) return inner self.attach(Bugger(), only=only)
def generate(): global sound statement = coremeraco.constructRegularStatement() + os.linesep textArea.insert(Tkinter.END, statement) textArea.yview(Tkinter.END) if readOption and readOutputState.get(): if sound: sound.stop() sound = playsnd.playsnd() threading.Thread(target=sound.play, args=(procfest.text2wave(statement),)).start() if saveOutputState.get(): try: outputFileHandler = open(saveOutputDestination.get(), 'a') # 'a' means append mode outputFileHandler.write(statement) outputFileHandler.close() except IOError as Argument: tkMessageBox.showerror(productName, meracolocale.getLocalisedString(localisationFile, "ioErrorMessage") + str(Argument)) except Exception as Argument: tkMessageBox.showerror(productName, meracolocale.getLocalisedString(localisationFile, "genericErrorMessage") + str(Argument))
def test_append_metadata_file(tmpdir): initial_data = 'some data' my_metadata = { 'some': 'data', 'for': 'this metadata' } output_file = tmpdir.join('metadata') output_file.write_binary((initial_data + os.linesep).encode('utf-8')) with metadata.MetadataWriter(suppress_output=False)(str(output_file)) as writer: writer.write_metadata(**my_metadata) lines = output_file.readlines() assert len(lines) == 2 assert lines[0].strip() == initial_data assert json.loads(lines[1].strip()) == my_metadata
def test_overwrite_metdata_file(tmpdir): initial_data = 'some data' my_metadata = { 'some': 'data', 'for': 'this metadata' } output_file = tmpdir.join('metadata') output_file.write(initial_data + os.linesep) overwrite_writer = metadata.MetadataWriter(suppress_output=False)(str(output_file)) overwrite_writer.force_overwrite() with overwrite_writer as writer: writer.write_metadata(**my_metadata) lines = output_file.readlines() assert len(lines) == 1 assert json.loads(lines[0].strip()) == my_metadata
def write_metadata(self, **metadata): # type: (**Any) -> Optional[int] """Writes metadata to the output stream if output is not suppressed. :param **metadata: JSON-serializeable metadata kwargs to write """ if self.suppress_output: return 0 # wrote 0 bytes metadata_line = json.dumps(metadata, sort_keys=True) + os.linesep metadata_output = '' # type: Union[str, bytes] if 'b' in self._output_mode: metadata_output = metadata_line.encode('utf-8') else: metadata_output = metadata_line return self._output_stream.write(metadata_output)
def _collect(self, room_name, compress): event = yield idiokit.next() while True: current = datetime.utcnow().day with _open_archive(self.archive_dir, time.time(), room_name) as archive: self.log.info("Opened archive {0!r}".format(archive.name)) while current == datetime.utcnow().day: json_dict = dict((key, event.values(key)) for key in event.keys()) archive.write(json.dumps(json_dict) + os.linesep) event = yield idiokit.next() yield compress.queue(0.0, _rename(archive.name))
def output(self): if self._params['delimiter'] is None: delim = ',' else: delim = self._params['delimiter'] if self._params['linesep'] is None: eol = os.linesep else: eol = self._params['linesep'] lines = eol.join([delim.join(map(str, row)) for row in self._params['data_matrix']]) if self._params['header_list']: header = delim.join(self._params['header_list']) else: header = '' return header + eol + lines
def main(args=None): if args is None: args = sys.argv[1:] # Configure our deprecation warnings to be sent through loggers deprecation.install_warning_logger() autocomplete() try: cmd_name, cmd_args = parseopts(args) except PipError as exc: sys.stderr.write("ERROR: %s" % exc) sys.stderr.write(os.linesep) sys.exit(1) # Needed for locale.getpreferredencoding(False) to work # in pip.utils.encoding.auto_decode locale.setlocale(locale.LC_ALL, '') command = commands_dict[cmd_name](isolated=check_isolated(cmd_args)) return command.main(cmd_args) # ########################################################### # # Writing freeze files
def keyPressEvent(self, qkeyevent): if qkeyevent.matches(QKeySequence.Copy): selected_rows = [x.row() for x in self.selectionModel().selectedRows()] logger.info('Copy to clipboard requested [%r rows]' % len(selected_rows)) out_string = '' for row in selected_rows: out_string += self.get_row_as_string(row) + os.linesep if out_string: QApplication.clipboard().setText(out_string) else: super(BasicTable, self).keyPressEvent(qkeyevent) if qkeyevent.matches(QKeySequence.InsertParagraphSeparator): if self.hasFocus(): self.on_enter_pressed([(x.row(), x.column()) for x in self.selectedIndexes()])
def print_help(): """Print help encoded as initial script's comment between #< and #>.""" show_line = False fp = open(sys.argv[0], 'r') for line in fp: if line[0:2] == '#<': # start token show_line = True continue elif line[0:2] == '#>': # end token return if show_line == True: print(line.rstrip(os.linesep)[1:]) fp.close()
def gui_ask_for_api(): """Gtk dialog for API key insert.""" message = gtk.MessageDialog(type=gtk.MESSAGE_QUESTION, buttons=gtk.BUTTONS_OK_CANCEL) message.set_markup(colorize.MSG_ASK_API.replace(colorize.URL,"<u>" + colorize.URL +"</u>")) entry = gtk.Entry(max=64) entry.set_text("Enter your API key") entry.show() message.vbox.pack_end(entry) entry.connect("activate", lambda _: d.response(gtk.RESPONSE_OK)) message.set_default_response(gtk.RESPONSE_OK) message.run() api_key = entry.get_text().decode('utf8') fp = open(colorize.HOME + colorize.API_KEY_FILE, 'w') fp.write("YOUR_API_KEY={0}{1}".format(api_key, os.linesep)) fp.close() # process buttong click immediately message.destroy() while gtk.events_pending(): gtk.main_iteration()
def testOutputSignal(self): # Use SIGKILL here because it's guaranteed to be delivered. Using # SIGHUP might not work in, e.g., a buildbot slave run under the # 'nohup' command. exe = sys.executable scriptFile = self.makeSourceFile([ "import sys, os, signal", "sys.stdout.write('stdout bytes\\n')", "sys.stderr.write('stderr bytes\\n')", "sys.stdout.flush()", "sys.stderr.flush()", "os.kill(os.getpid(), signal.SIGKILL)" ]) def gotOutputAndValue(err): (out, err, sig) = err.value # XXX Sigh wtf self.assertEquals(out, "stdout bytes" + os.linesep) self.assertEquals(err, "stderr bytes" + os.linesep) self.assertEquals(sig, signal.SIGKILL) d = utils.getProcessOutputAndValue(exe, ['-u', scriptFile]) return d.addErrback(gotOutputAndValue)
def hard(args): ref = collections.defaultdict(list) with open(args.ref, "rb") as f: b = f.read(1) i = 0 while b: ref[b].append(i) i = i + 1 b = f.read(1) if len(ref) < 256: raise Exception output = [] with open(args.input, "rb") as f: b = f.read(1) while b: output.append(ref[b][random.randrange(len(ref[b]))]) b = f.read(1) with open(args.output, "w") as f: for item in output: f.write(str(item) + os.linesep)
def __parse_data(self, proto_parse_class, data, file_hd): """ Args: proto_parse_class: data: file_hd: Returns: """ proto_parse_ob = proto_parse_class(data) data_yield = proto_parse_ob.parse_data(sep="\x00") for d in data_yield: if d: if file_hd: file_hd.write("%r%s" % (d, os.linesep)) if self.std_output_enable: if not isinstance(d, dict): d = d.toDict() print json.dumps(d, indent=4)
def __str__(self): res = "DynArgDescriptor03 object" + os.linesep res += " argument_type[0]='{0}'".format(self._type) if self._vector_size > 1: res += "*"+str(self._vector_size) res += os.linesep res += " access_descriptor[1]='{0}'".format(self._access_descriptor) \ + os.linesep if self._type == "gh_field": res += " function_space[2]='{0}'".format(self._function_space1) \ + os.linesep elif self._type in VALID_OPERATOR_NAMES: res += " function_space_to[2]='{0}'".\ format(self._function_space1) + os.linesep res += " function_space_from[3]='{0}'".\ format(self._function_space2) + os.linesep elif self._type in VALID_SCALAR_NAMES: pass # we have nothing to add if we're a scalar else: # we should never get to here raise ParseError("Internal error in DynArgDescriptor03.__str__") return res
def amend_commit_message(self, cherry_pick_branch): """ prefix the commit message with (X.Y) """ commit_prefix = "" if self.prefix_commit: commit_prefix = f"[{get_base_branch(cherry_pick_branch)}] " updated_commit_message = f"{commit_prefix}{self.get_commit_message(self.commit_sha1)}{os.linesep}(cherry picked from commit {self.commit_sha1})" updated_commit_message = updated_commit_message.replace('#', 'GH-') if self.dry_run: click.echo(f" dry-run: git commit --amend -m '{updated_commit_message}'") else: try: subprocess.check_output(["git", "commit", "--amend", "-m", updated_commit_message], stderr=subprocess.STDOUT) except subprocess.CalledProcessError as cpe: click.echo("Failed to amend the commit message \u2639") click.echo(cpe.output) return updated_commit_message
def convert_and_parse_xml(src_model_fname): dst_model_fname = os.path.basename(src_model_fname).split('.')[0] + '.xml.mdl' with open(dst_model_fname, 'wb') as wfile: wfile.write('<MODEL>\n') with open(src_model_fname, 'rb') as rfile: for line in rfile.readlines(): newline = line if '<CNT>' in line: newline = line.strip() + '</CNT>' elif '<MEAN>' in line: newline = line.strip() + '</MEAN>' elif pn_re.findall(line): newline = pn_re.sub(r'@ \2 \3 \4 \5 @',line) wfile.write(newline.strip() + os.linesep) wfile.write('</MODEL>\n') xmldoc = minidom.parse(dst_model_fname) os.remove(dst_model_fname) return xmldoc
def start(self, args, config): super(DisplayPlugin, self).start(args, config) if args.pattern_help: self.__print_pattern_help() return if not _have_grako: console.printError(u"Unknown module 'grako'" + os.linesep + u"Please install grako! " + u"E.g. $ pip install grako") self.__return_code = 2 return if args.pattern_string is not None: self.__pattern = Pattern(args.pattern_string) if args.pattern_file is not None: pfile = open(args.pattern_file, "r") self.__pattern = Pattern(''.join(pfile.read().splitlines())) pfile.close() self.__output_ending = "" if args.no_newline else os.linesep
def __print_pattern_help(self): # FIXME: Force some order print(console.formatText("ID3 Tags:", b=True)) self.__print_complex_pattern_help(TagPattern) print(os.linesep) print(console.formatText("Functions:", b=True)) self.__print_complex_pattern_help(FunctionPattern) print(os.linesep) print(console.formatText("Special characters:", b=True)) print(console.formatText("\tescape seq. character")) for i in range(len(TextPattern.SPECIAL_CHARACTERS)): print(("\t\\%s" + (" " * 12) + "%s") % (TextPattern.SPECIAL_CHARACTERS[i], TextPattern.SPECIAL_CHARACTERS_DESCRIPTIONS[i]))
def _wrap_lines(msg): """Format lines nicely to 80 chars. :param str msg: Original message :returns: Formatted message respecting newlines in message :rtype: str """ lines = msg.splitlines() fixed_l = [] for line in lines: fixed_l.append(textwrap.fill( line, 80, break_long_words=False, break_on_hyphens=False)) return os.linesep.join(fixed_l)
def _ask_user_to_confirm_new_names(config, new_domains, certname, old_domains): """Ask user to confirm update cert certname to contain new_domains. """ if config.renew_with_new_domains: return msg = ("You are updating certificate {0} to include domains: {1}{br}{br}" "It previously included domains: {2}{br}{br}" "Did you intend to make this change?".format( certname, ", ".join(new_domains), ", ".join(old_domains), br=os.linesep)) obj = zope.component.getUtility(interfaces.IDisplay) if not obj.yesno(msg, "Update cert", "Cancel", default=True): raise errors.ConfigurationError("Specified mismatched cert name and domains.")
def test_get_version(self, mock_script): mock_script.return_value = ( "Server Version: Apache/2.4.2 (Debian)", "") self.assertEqual(self.config.get_version(), (2, 4, 2)) mock_script.return_value = ( "Server Version: Apache/2 (Linux)", "") self.assertEqual(self.config.get_version(), (2,)) mock_script.return_value = ( "Server Version: Apache (Debian)", "") self.assertRaises(errors.PluginError, self.config.get_version) mock_script.return_value = ( "Server Version: Apache/2.3{0} Apache/2.4.7".format( os.linesep), "") self.assertRaises(errors.PluginError, self.config.get_version) mock_script.side_effect = errors.SubprocessError("Can't find program") self.assertRaises(errors.PluginError, self.config.get_version)
def update_init_file(version: tuple): import os import re print('Updating __init__.py to have the correct version.') version_line_re = re.compile(r'^\s+[\'"]version[\'"]: (\([0-9,]+\)),') with open('__init__.py', 'r') as infile, \ open('__init__.py~whl~installer~', 'w') as outfile: for line in infile: if version_line_re.match(line): outfile.write(" 'version': %s,%s" % (version, os.linesep)) else: outfile.write(line) os.unlink('__init__.py') os.rename('__init__.py~whl~installer~', '__init__.py')
def _copy_partial_doc(cls): for base in cls.mro(): if base.__module__.startswith('sklearn'): break lines = base.__doc__.split(os.linesep) header, rest = lines[0], lines[1:] insert = """ This class wraps scikit-learn's {classname}. When a dask-array is passed to our ``fit`` method, the array is passed block-wise to the scikit-learn class' ``partial_fit`` method. This will allow you to fit the estimator on larger-than memory datasets sequentially (block-wise), but without an parallelism, or any ability to distribute across a cluster.""".format( classname=base.__name__) doc = '\n'.join([header + insert] + rest) cls.__doc__ = doc return cls
def replace_line_in_file(f, start_exp, to_replace): lines = read_lines_of_file(f) for i in range(len(lines)): if lines[i].startswith(start_exp): lines[i] = to_replace + os.linesep write_lines_into_file(Res.project_path + f, lines)
def ENMLtoText(contentENML): soup = BeautifulSoup(contentENML.decode('utf-8')) for section in soup.select('li > p'): section.replace_with( section.contents[0] ) for section in soup.select('li > br'): if section.next_sibling: next_sibling = section.next_sibling.next_sibling if next_sibling: if next_sibling.find('li'): section.extract() else: section.extract() Editor.checklistInENMLtoSoup(soup) for section in soup.findAll('en-todo', checked='true'): section.replace_with('[x]') for section in soup.findAll('en-todo'): section.replace_with('[ ]') content = html2text.html2text(str(soup).decode('utf-8'), '', 0) content = re.sub(r' *\n', os.linesep, content) return content.encode('utf-8')
def parseopts(args): parser = create_main_parser() # Note: parser calls disable_interspersed_args(), so the result of this # call is to split the initial args into the general options before the # subcommand and everything else. # For example: # args: ['--timeout=5', 'install', '--user', 'INITools'] # general_options: ['--timeout==5'] # args_else: ['install', '--user', 'INITools'] general_options, args_else = parser.parse_args(args) # --version if general_options.version: sys.stdout.write(parser.version) sys.stdout.write(os.linesep) sys.exit() # pip || pip help -> print_help() if not args_else or (args_else[0] == 'help' and len(args_else) == 1): parser.print_help() sys.exit() # the subcommand name cmd_name = args_else[0] if cmd_name not in commands_dict: guess = get_similar_commands(cmd_name) msg = ['unknown command "%s"' % cmd_name] if guess: msg.append('maybe you meant "%s"' % guess) raise CommandError(' - '.join(msg)) # all the args without the subcommand cmd_args = args[:] cmd_args.remove(cmd_name) return cmd_name, cmd_args
def main(args=None): if args is None: args = sys.argv[1:] # Configure our deprecation warnings to be sent through loggers deprecation.install_warning_logger() autocomplete() try: cmd_name, cmd_args = parseopts(args) except PipError as exc: sys.stderr.write("ERROR: %s" % exc) sys.stderr.write(os.linesep) sys.exit(1) # Needed for locale.getpreferredencoding(False) to work # in pip.utils.encoding.auto_decode try: locale.setlocale(locale.LC_ALL, '') except locale.Error as e: # setlocale can apparently crash if locale are uninitialized logger.debug("Ignoring error %s when setting locale", e) command = commands_dict[cmd_name](isolated=check_isolated(cmd_args)) return command.main(cmd_args) # ########################################################### # # Writing freeze files
def gather_dependencies(): import os with open('requirements.txt', 'r') as f_in: return [l for l in f_in.read().rsplit(os.linesep) if l and not l.startswith("#")]