我们从Python开源项目中,提取了以下22个代码示例,用于说明如何使用typing.TextIO()。
def _get_read_alignments(f: TextIO, reads: ReadMapping) -> AlignmentsT: logger.info("Pass 2 of alignments GFA2 file to import all pairwise local " "alignments...") read_alignments = defaultdict(dict) la_iter = map(gfa.gfa2_line_to_la(reads), (l for l in f if l.startswith('E'))) for la in la_iter: a_read, b_read = la.get_oriented_reads() read_alignments[a_read][b_read] = la read_alignments[b_read][a_read] = la.switch() logger.info("Done.") return read_alignments
def create_file_in_config_folder(self, filename: str, mode: int=None) -> TextIO: """ :param filename: the name of the file in the generated config folder :param mode: pass an ``int`` here if you want to modify the files mode (will be umasked) :return: an open file descriptor (``TextIO``) object that the *caller must call `.close()` on* """ if os.path.isfile(filename): raise InvalidArgumentException("Call create_file_in_config_folder with a filename, not a path") self.ensure_config_folder() f = cast(TextIO, io.open(os.path.join(self.configfolder, filename), mode="wt", encoding="utf-8")) if mode: os.chmod(os.path.join(self.configfolder, filename), get_umasked_mode(mode)) return f
def get_personalized_dictionaries(target: WirelessAccessPoint) -> List[TextIO]: """ Create and return dictionary personalized by available AP details. :type target: WirelessAccessPoint :param target: targeted AP :rtype: List[TextIO] :return: list of opened personalized dictionaries """ dictionaries = [] if re.match(r'^UPC\d{7}$', target.essid): t = pipes.Template() t.prepend('upc_keys {} {}'.format(target.essid, '24'), '.-') t.append('grep " -> WPA2 phrase for "', '--') t.append('sed "s/^ -> WPA2 phrase for \S* = \'\(.*\)\'$/\\1/"', '--') d = t.open('dictionary-pipeline', 'r') dictionaries.append(d) return dictionaries
def fetch_complete(self) -> TextIO: """Provide complete feed contents. Returns: Text stream of complete feed contents, that should be used in with-statement to be closed afterwards. Raises: requests.HTTPError: When an HTTP error occurs when fetching feed. """ session = default_new_session(self.session) resp = session.get(self.complete_url) resp.raise_for_status() with self._decode_contents(resp.content) as text: yield text
def load(cls: Type['ModPack'], stream: TextIO) -> 'ModPack': """Load mod-pack data from a file stream. Keyword arguments: stream: The text stream to load the data from. Returns: Loaded mod-pack. """ validator = cerberus.Validator(cerberus.schema_registry.get('pack')) if not validator.validate(yaml.load(stream)): msg = _('Modpack file contains invalid data'), validator.errors raise exceptions.InvalidStream(*msg) else: data = validator.document return cls( game=data['game'], path=Path(data['files']['path']), mods=OrderedDict((d.mod.id, d) for d in data['files']['mods']), dependencies=OrderedDict((d.mod.id, d) for d in data['files']['dependencies']), )
def __init__(self, filename: str, terminal: TextIO) -> None: self.terminal = terminal parent_directory = os.path.dirname(filename) os.makedirs(parent_directory, exist_ok=True) self.log = open(filename, 'a')
def gfa2_parse_segments(f: TextIO) -> ReadMapping: read_iter = map(gfa2_segment_to_read, (l for l in f if l.startswith('S'))) return {r.id: r for r in read_iter}
def write_graph(f: TextIO, g: AssemblyGraph, version=2, **kwargs): if version == 1: return gfa1_write_graph(f, g, **kwargs) else: return gfa2_write_graph(f, g, **kwargs)
def gfa1_write_graph(f: TextIO, g: AssemblyGraph, with_orig_segments: SegmentMapping=None): f.write(gfa_header("1.0")) segments = set() for n in g.nodes_iter(): segment_name = str(n)[:-1] if segment_name in segments: continue segments.add(segment_name) if with_orig_segments and segment_name in with_orig_segments: n = with_orig_segments[segment_name] parts = ["S", segment_name, "*", "LN:i:{}".format(n.length)] f.write(gfa_line(*parts)) for u, v, d in g.edges_iter(data=True): uid = str(u)[:-1] u_strand = str(u)[-1:] vid = str(v)[:-1] v_strand = str(v)[-1:] # Fake CIGAR string just indicating overlap length overlap = str(d['overlap_len'])+"M" parts = ["L", uid, u_strand, vid, v_strand, overlap] f.write(gfa_line(*parts))
def set_commit_editmsg(msg: str) -> Iterator[TextIO]: filename = 'COMMIT_EDITMSG' with open(filename, mode='w') as f: f.write(msg) try: yield cast(TextIO, f) finally: if os.path.isfile(filename): os.remove(filename)
def subprocess_based_service(cmd: List[str], service_url: str, log_file: TextIO) -> WebDriver: check_event_loop() closers = [] try: if log_file is os.devnull: log_file = DEVNULL process = await asyncio.create_subprocess_exec( *cmd, stdout=log_file, stderr=log_file, ) closers.append(partial(stop_process, process)) session = ClientSession() closers.append(session.close) count = 0 while True: try: await tasked(session.request( url=service_url + '/status', method='GET' )) break except: # TODO: make this better count += 1 if count > 30: raise Exception('not starting?') await asyncio.sleep(0.5) return WebDriver( Connection(session, service_url), closers, ) except: for closer in reversed(closers): await closer() raise
def update(self, print_stream: Optional[TextIO]=None, print_prefix: Optional[str]='MITMf 1> '): """ Update state of running process from process' feedback. Read new output from stdout and stderr, check if process is alive. :type print_stream: Optional[TextIO] :param print_stream: Print information about HTTP traffic from MITMf's stdout to provided stream. :type print_prefix: Optional[str] :param print_prefix: Prepend provided string in the beginning of every line printed to `print_stream`. """ super().update() # Is process running? State would be changed after reading stdout and stderr. self.poll() # check every added line in stdout if self.stdout_r and not self.stdout_r.closed: for line in self.stdout_r: if self.state == self.State.STARTED and line == '|_ SMB server online\n': self.state = self.State.SPOOFING elif self.state == self.State.SPOOFING and line != '\n': if print_stream: print(print_prefix + line, end='', file=print_stream) # check every added line in stderr if self.stderr_r and not self.stderr_r.closed: for line in self.stderr_r: if ' * Running on http://127.0.0.1:9999/ (Press CTRL+C to quit)\n' == line: continue # NOTE: stderr should be now empty logger.warning("Unexpected stderr of 'mitmf': '{}'. {}".format(line, str(self))) # Change state if process was not running in the time of poll() call in the beginning of this method. # NOTE: Process' poll() needs to be called in the beginning of this method and returncode checked in the end # to ensure all feedback (stdout and stderr) is read and states are changed accordingly. # If the process exited, its state is not changed immediately. All available feedback is read and then # the state is changed to self.State.TERMINATED. State, flags,stats and others can be changed during reading # the available feedback even if the process exited. But self.State.TERMINATED is assigned here if # the process exited. if self.returncode is not None: self.state = self.State.TERMINATED
def resolve(self, include_file: str, fpname: str) -> t.TextIO: """Resolve include_file and return a readable file like object. :param include_file: File to be include. :param fpname: Main configuration filename. :return: Return a readable file-like object for the specified resource. """ parts = urlparse(include_file) if parts.scheme not in _VALID_SCHEMAS_: raise exc.InvalidResourceScheme( "Supported resources: {resources}. Got {include} in {fpname}".format( resources=', '.join(_VALID_SCHEMAS_), include=include_file, fpname=fpname ) ) package = parts.netloc args = package.split('.') + [parts.path.lstrip('/')] path = os.path.join(*args) req = pkg_resources.Requirement.parse(package) if not _resource_manager.resource_exists(req, path): raise exc.NonExistingInclude( "Could not find {include}".format(include=include_file) ) config_source = _resource_manager.resource_stream(req, path) return config_source
def __init__(self, linelength: int = None, encoding: str = None, lineformat: str = None, output: TextIO = None) -> None: if linelength is not None: self.linelength = linelength if encoding is not None: self.encoding = encoding if lineformat is not None: self.lineformat = lineformat if output is not None: self.output = output
def extract_deps(fd: TextIO) -> Generator[str, None, None]: """Extract dependencies from file.""" yield from ( line for line in fd if line and not line.startswith(('#', 'git+')) ) # Setup values preparation
def _decode_contents(feed: bytes) -> TextIO: """Decode the provided data from bz2 to text. The :arg:`feed` is assumed to be bz2-encoded text data in utf-8 encoding. Keyword arguments: feed: The data to be decoded. Returns: Decoded text stream. """ with BytesIO(feed) as compressed, \ bz2.open(compressed, mode='rt', encoding='utf-8') as stream: yield stream
def dump(self: 'ModPack', stream: TextIO) -> None: """Serialize self to a file stream. Keyword arguments: stream: The text stream to serialize into. """ data = OrderedDict() data['game'] = self.game data['files'] = OrderedDict() data['files']['path'] = str(self.path) data['files']['mods'] = list(self.mods.values()) data['files']['dependencies'] = list(self.dependencies.values()) yaml.dump(data, stream)
def dump(self, file: TextIO) -> None: """Store credentials for future use. Keyword arguments: file: Open YAML text stream to write to. """ yaml.dump(attr.asdict(self), file)
def map_input(self, data: typing.TextIO) -> typing.Dict[str, typing.Any]: result = json.load(data) # type: typing.Dict[str, typing.Any] return result
def map_input(self, data: typing.TextIO) -> typing.Dict[str, typing.Any]: result = ucl.load(data.read()) # type: typing.Dict[str, typing.Any] result["legacy"] = True return result
def write_to_conll_eval_file(prediction_file: TextIO, gold_file: TextIO, verb_index: Optional[int], sentence: List[str], prediction: List[str], gold_labels: List[str]): """ Prints predicate argument predictions and gold labels for a single verbal predicate in a sentence to two provided file references. Parameters ---------- prediction_file : TextIO, required. A file reference to print predictions to. gold_file : TextIO, required. A file reference to print gold labels to. verb_index : Optional[int], required. The index of the verbal predicate in the sentence which the gold labels are the arguments for, or None if the sentence contains no verbal predicate. sentence : List[str], required. The word tokens. prediction : List[str], required. The predicted BIO labels. gold_labels : List[str], required. The gold BIO labels. """ verb_only_sentence = ["-"] * len(sentence) if verb_index: verb_only_sentence[verb_index] = sentence[verb_index] conll_format_predictions = convert_bio_tags_to_conll_format(prediction) conll_format_gold_labels = convert_bio_tags_to_conll_format(gold_labels) for word, predicted, gold in zip(verb_only_sentence, conll_format_predictions, conll_format_gold_labels): prediction_file.write(word.ljust(15)) prediction_file.write(predicted.rjust(15) + "\n") gold_file.write(word.ljust(15)) gold_file.write(gold.rjust(15) + "\n") prediction_file.write("\n") gold_file.write("\n")
def gfa2_parse_segments_with_fragments(f: TextIO): """Parse all segments and fragments from a GFA2 file and store them in a dict. In PHASM fragments are used to denote merged reads.""" segments = {} fragments = defaultdict(list) for line in f: if not line.startswith('S') and not line.startswith('F'): continue parts = line.strip().split('\t') line_type = parts[0].strip() segment_name = parts[1].strip() if line_type == 'S': segments[segment_name] = line if line_type == 'F': fragments[segment_name].append(gfa2_parse_fragment(line)) reads = {} for segment, line in segments.items(): if segment not in fragments: reads[segment] = gfa2_segment_to_read(line) else: read = gfa2_segment_to_read(line) length = len(read) fragment_reads = [] prefix_lengths = [] for fragment_info in sorted(fragments[segment], key=lambda elem: elem[2][0]): _, external_id, segment_range, fragment_range = fragment_info fragment_length = fragment_range[1] - fragment_range[0] fragment_reads.append(external_id) prefix_lengths.append(fragment_length) prefix_lengths.pop() reads[segment] = MergedFragment(read.id, length, fragment_reads, prefix_lengths) return reads