我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用fileinput.FileInput()。
def download_with_info_file(self, info_filename): with contextlib.closing(fileinput.FileInput( [info_filename], mode='r', openhook=fileinput.hook_encoded('utf-8'))) as f: # FileInput doesn't have a read method, we can't call json.load info = self.filter_requested_info(json.loads('\n'.join(f))) try: self.process_ie_result(info, download=True) except DownloadError: webpage_url = info.get('webpage_url') if webpage_url is not None: self.report_warning('The info failed to download, trying with "%s"' % webpage_url) return self.download([webpage_url]) else: raise return self._download_retcode
def load_blocks_from_checkpoints(checkpoints_path, start, end): """Load blocks from checkpoints""" checkpoint_set = sbds.checkpoints.required_checkpoints_for_range( path=checkpoints_path, start=start, end=end) total_blocks_to_load = end - start with fileinput.FileInput( mode='r', files=checkpoint_set.checkpoint_paths, openhook=checkpoint_opener_wrapper(encoding='utf8')) as blocks: blocks = toolz.itertoolz.drop(checkpoint_set.initial_checkpoint_offset, blocks) if total_blocks_to_load > 0: with click.open_file('-', 'w', encoding='utf8') as f: for i, block in enumerate(blocks, 1): click.echo(block.strip().encode('utf8'), file=f) if i == total_blocks_to_load: break else: with click.open_file('-', 'w', encoding='utf8') as f: for block in blocks: click.echo(block.strip().encode('utf8'), file=f)
def test_checkpoint_access(ctx, checkpoints_path): """Test checkpoints access""" try: checkpoint_set = sbds.checkpoints.required_checkpoints_for_range( path=checkpoints_path, start=1, end=0) with fileinput.FileInput( mode='r', files=checkpoint_set.checkpoint_paths, openhook=checkpoint_opener_wrapper(encoding='utf8')) as blocks: for block in blocks: block_num = json.loads(block)['block_num'] if block_num: click.echo( 'Success: loaded block %s' % block_num, err=True) ctx.exit(code=0) else: click.echo('Failed to load block', err=True) ctx.exit(code=127) except Exception as e: click.echo('Fail: %s' % e, err=True) ctx.exit(code=127)
def getlines( self ): if self.left: for line in self.left.getlines(): yield line if len(self.lines) >= minregions: for line in self.lines: yield line if self.right: for line in self.right.getlines(): yield line ## def main(): ## f1 = fileinput.FileInput("big.bed") ## g1 = GenomicIntervalReader(f1) ## returntree, extra = find_clusters(g1, mincols=50) ## print "All found" ## for chrom, value in returntree.items(): ## for start, end in value.getregions(): ## print chrom+"\t"+str(start)+"\t"+str(end) ## for line in value.getlines(): ## print "Line:\t"+str(line) ## main()
def test_zero_byte_files(self): t1 = t2 = t3 = t4 = None try: t1 = writeTmp(1, [""]) t2 = writeTmp(2, [""]) t3 = writeTmp(3, ["The only line there is.\n"]) t4 = writeTmp(4, [""]) fi = FileInput(files=(t1, t2, t3, t4)) line = fi.readline() self.assertEqual(line, 'The only line there is.\n') self.assertEqual(fi.lineno(), 1) self.assertEqual(fi.filelineno(), 1) self.assertEqual(fi.filename(), t3) line = fi.readline() self.assertFalse(line) self.assertEqual(fi.lineno(), 1) self.assertEqual(fi.filelineno(), 0) self.assertEqual(fi.filename(), t4) fi.close() finally: remove_tempfiles(t1, t2, t3, t4)
def test_opening_mode(self): try: # invalid mode, should raise ValueError fi = FileInput(mode="w") self.fail("FileInput should reject invalid mode argument") except ValueError: pass t1 = None try: # try opening in universal newline mode t1 = writeTmp(1, [b"A\nB\r\nC\rD"], mode="wb") fi = FileInput(files=t1, mode="U") lines = list(fi) self.assertEqual(lines, ["A\n", "B\n", "C\n", "D"]) finally: remove_tempfiles(t1)
def test_file_opening_hook(self): try: # cannot use openhook and inplace mode fi = FileInput(inplace=1, openhook=lambda f, m: None) self.fail("FileInput should raise if both inplace " "and openhook arguments are given") except ValueError: pass try: fi = FileInput(openhook=1) self.fail("FileInput should check openhook for being callable") except ValueError: pass # XXX The rot13 codec was removed. # So this test needs to be changed to use something else. # (Or perhaps the API needs to change so we can just pass # an encoding rather than using a hook?) ## try: ## t1 = writeTmp(1, ["A\nB"], mode="wb") ## fi = FileInput(files=t1, openhook=hook_encoded("rot13")) ## lines = list(fi) ## self.assertEqual(lines, ["N\n", "O"]) ## finally: ## remove_tempfiles(t1)
def test_zero_byte_files(self): try: t1 = writeTmp(1, [""]) t2 = writeTmp(2, [""]) t3 = writeTmp(3, ["The only line there is.\n"]) t4 = writeTmp(4, [""]) fi = FileInput(files=(t1, t2, t3, t4)) line = fi.readline() self.assertEqual(line, 'The only line there is.\n') self.assertEqual(fi.lineno(), 1) self.assertEqual(fi.filelineno(), 1) self.assertEqual(fi.filename(), t3) line = fi.readline() self.assertFalse(line) self.assertEqual(fi.lineno(), 1) self.assertEqual(fi.filelineno(), 0) self.assertEqual(fi.filename(), t4) fi.close() finally: remove_tempfiles(t1, t2, t3, t4)
def test_readline(self): with open(TESTFN, 'wb') as f: f.write('A\nB\r\nC\r') # Fill TextIOWrapper buffer. f.write('123456789\n' * 1000) # Issue #20501: readline() shouldn't read whole file. f.write('\x80') self.addCleanup(safe_unlink, TESTFN) fi = FileInput(files=TESTFN, openhook=hook_encoded('ascii')) # The most likely failure is a UnicodeDecodeError due to the entire # file being read when it shouldn't have been. self.assertEqual(fi.readline(), u'A\n') self.assertEqual(fi.readline(), u'B\r\n') self.assertEqual(fi.readline(), u'C\r') with self.assertRaises(UnicodeDecodeError): # Read to the end of file. list(fi) fi.close()
def test_modes(self): with open(TESTFN, 'wb') as f: # UTF-7 is a convenient, seldom used encoding f.write('A\nB\r\nC\rD+IKw-') self.addCleanup(safe_unlink, TESTFN) def check(mode, expected_lines): fi = FileInput(files=TESTFN, mode=mode, openhook=hook_encoded('utf-7')) lines = list(fi) fi.close() self.assertEqual(lines, expected_lines) check('r', [u'A\n', u'B\r\n', u'C\r', u'D\u20ac']) check('rU', [u'A\n', u'B\r\n', u'C\r', u'D\u20ac']) check('U', [u'A\n', u'B\r\n', u'C\r', u'D\u20ac']) check('rb', [u'A\n', u'B\r\n', u'C\r', u'D\u20ac'])
def test_readline_os_fstat_raises_OSError(self): """Tests invoking FileInput.readline() when os.fstat() raises OSError. This exception should be silently discarded.""" os_fstat_orig = os.fstat os_fstat_replacement = UnconditionallyRaise(OSError) try: t = writeTmp(1, ["\n"]) self.addCleanup(remove_tempfiles, t) with FileInput(files=[t], inplace=True) as fi: os.fstat = os_fstat_replacement fi.readline() finally: os.fstat = os_fstat_orig # sanity check to make sure that our test scenario was actually hit self.assertTrue(os_fstat_replacement.invoked, "os.fstat() was not invoked")
def test_readline_os_chmod_raises_OSError(self): """Tests invoking FileInput.readline() when os.chmod() raises OSError. This exception should be silently discarded.""" os_chmod_orig = os.chmod os_chmod_replacement = UnconditionallyRaise(OSError) try: t = writeTmp(1, ["\n"]) self.addCleanup(remove_tempfiles, t) with FileInput(files=[t], inplace=True) as fi: os.chmod = os_chmod_replacement fi.readline() finally: os.chmod = os_chmod_orig # sanity check to make sure that our test scenario was actually hit self.assertTrue(os_chmod_replacement.invoked, "os.fstat() was not invoked")
def test_fileno_when_ValueError_raised(self): class FilenoRaisesValueError(UnconditionallyRaise): def __init__(self): UnconditionallyRaise.__init__(self, ValueError) def fileno(self): self.__call__() unconditionally_raise_ValueError = FilenoRaisesValueError() t = writeTmp(1, ["\n"]) self.addCleanup(remove_tempfiles, t) with FileInput(files=[t]) as fi: file_backup = fi._file try: fi._file = unconditionally_raise_ValueError result = fi.fileno() finally: fi._file = file_backup # make sure the file gets cleaned up # sanity check to make sure that our test scenario was actually hit self.assertTrue(unconditionally_raise_ValueError.invoked, "_file.fileno() was not invoked") self.assertEqual(result, -1, "fileno() should return -1")
def read_and_clean_files(clueweb_file, ann_file, data_dir, ann_dir): """ Read file from data_dir and ann_dir, replace entity mentions and clean records in that file :param clueweb_file: :param ann_file: :param data_dir: Warc files directory :param ann_dir: Annotations directory :return: {'record_id': record_id, 'replaced_record': cleaned_replaced_record, 'cleaned_record': cleaned_record} """ annotation_input = fileinput.FileInput(os.path.join(ann_dir, ann_file), openhook=fileinput.hook_compressed) annotation_list = [] for line in annotation_input: annotation_list.append(Annotation.parse_annotation(line)) warc_path = os.path.join(data_dir, clueweb_file) warc_file = warc.open(warc_path) print "Replacing entity mentions for ", clueweb_file, ":", ann_file, "..." start = time.time() warc_entry = WarcEntry(warc_path, warc_file, annotation_list) cleaned_records = warc_entry.replace_entity_mentions() end = time.time() print "Time used: ", end - start warc_file.close() return cleaned_records
def test_readline(self): with open(TESTFN, 'wb') as f: f.write('A\nB\r\nC\r') # Fill TextIOWrapper buffer. f.write('123456789\n' * 1000) # Issue #20501: readline() shouldn't read whole file. f.write('\x80') self.addCleanup(safe_unlink, TESTFN) fi = FileInput(files=TESTFN, openhook=hook_encoded('ascii'), bufsize=8) # The most likely failure is a UnicodeDecodeError due to the entire # file being read when it shouldn't have been. self.assertEqual(fi.readline(), u'A\n') self.assertEqual(fi.readline(), u'B\r\n') self.assertEqual(fi.readline(), u'C\r') with self.assertRaises(UnicodeDecodeError): # Read to the end of file. list(fi) fi.close()
def test_opening_mode(self): try: # invalid mode, should raise ValueError fi = FileInput(mode="w") self.fail("FileInput should reject invalid mode argument") except ValueError: pass t1 = None try: # try opening in universal newline mode t1 = writeTmp(1, [b"A\nB\r\nC\rD"], mode="wb") with check_warnings(('', DeprecationWarning)): fi = FileInput(files=t1, mode="U") with check_warnings(('', DeprecationWarning)): lines = list(fi) self.assertEqual(lines, ["A\n", "B\n", "C\n", "D"]) finally: remove_tempfiles(t1)
def test_readline(self): with open(TESTFN, 'wb') as f: f.write(b'A\nB\r\nC\r') # Fill TextIOWrapper buffer. f.write(b'123456789\n' * 1000) # Issue #20501: readline() shouldn't read whole file. f.write(b'\x80') self.addCleanup(safe_unlink, TESTFN) with FileInput(files=TESTFN, openhook=hook_encoded('ascii'), bufsize=8) as fi: try: self.assertEqual(fi.readline(), 'A\n') self.assertEqual(fi.readline(), 'B\n') self.assertEqual(fi.readline(), 'C\n') except UnicodeDecodeError: self.fail('Read to end of file') with self.assertRaises(UnicodeDecodeError): # Read to the end of file. list(fi) self.assertEqual(fi.readline(), '') self.assertEqual(fi.readline(), '')
def test_nextfile_oserror_deleting_backup(self): """Tests invoking FileInput.nextfile() when the attempt to delete the backup file would raise OSError. This error is expected to be silently ignored""" os_unlink_orig = os.unlink os_unlink_replacement = UnconditionallyRaise(OSError) try: t = writeTmp(1, ["\n"]) self.addCleanup(remove_tempfiles, t) with FileInput(files=[t], inplace=True) as fi: next(fi) # make sure the file is opened os.unlink = os_unlink_replacement fi.nextfile() finally: os.unlink = os_unlink_orig # sanity check to make sure that our test scenario was actually hit self.assertTrue(os_unlink_replacement.invoked, "os.unlink() was not invoked")
def ensure_file_exists(filename): """Ensure file exists and is not empty, otherwise raise an IOError. :type filename: string :param filename: file to check""" if not os.path.exists(filename): raise IOError("File %s doesn't exist or not correctly created" % filename) if not (os.path.getsize(filename) > 0): raise IOError("File %s empty" % filename) (shortname, extension) = os.path.splitext(filename) if sys.platform == 'win32' and extension=='.seg': import fileinput for line in fileinput.FileInput(filename,inplace=0): line = line.replace("\\\\","/")
def test_readline(self): with open(TESTFN, 'wb') as f: f.write(b'A\nB\r\nC\r') # Fill TextIOWrapper buffer. f.write(b'123456789\n' * 1000) # Issue #20501: readline() shouldn't read whole file. f.write(b'\x80') self.addCleanup(safe_unlink, TESTFN) with FileInput(files=TESTFN, openhook=hook_encoded('ascii'), bufsize=8) as fi: try: self.assertEqual(fi.readline(), 'A\n') self.assertEqual(fi.readline(), 'B\n') self.assertEqual(fi.readline(), 'C\n') except UnicodeDecodeError: self.fail('Read to end of file') with self.assertRaises(UnicodeDecodeError): # Read to the end of file. list(fi)
def test_modes(self): with open(TESTFN, 'wb') as f: # UTF-7 is a convenient, seldom used encoding f.write(b'A\nB\r\nC\rD+IKw-') self.addCleanup(safe_unlink, TESTFN) def check(mode, expected_lines): with FileInput(files=TESTFN, mode=mode, openhook=hook_encoded('utf-7')) as fi: lines = list(fi) self.assertEqual(lines, expected_lines) check('r', ['A\n', 'B\n', 'C\n', 'D\u20ac']) with self.assertWarns(DeprecationWarning): check('rU', ['A\n', 'B\n', 'C\n', 'D\u20ac']) with self.assertWarns(DeprecationWarning): check('U', ['A\n', 'B\n', 'C\n', 'D\u20ac']) with self.assertRaises(ValueError): check('rb', ['A\n', 'B\r\n', 'C\r', 'D\u20ac'])
def __upgrade_replace_text__(path, text, replace): with fileinput.FileInput(path, inplace=True, backup=".bak") as _file: for line in _file: print(line.replace(text, replace), end='')
def calibration_correction(measurement, channel, energy): """ calibration_correction implements a corrected energy calibration based on the array of channels and energies given as input. It performs a least squares regression fit of the channels and energies and implements the new energy calibration in a newly generated .Spe file. The new spectra file will contain the same information with only the old calibration changed. """ cal_file = sh.copyfile(measurement, os.path.splitext(measurement)[0] + '_recal.Spe') fix_measurement = SPEFile.SPEFile(cal_file) fix_measurement.read() old_cal = (str(float(fix_measurement.energy_cal[0])) + ' ' + str(float(fix_measurement.energy_cal[1]))) a_matrix = np.vstack([channel, np.ones(len(channel))]).T calibration_line = np.linalg.lstsq(a_matrix, energy) e0 = float(calibration_line[0][1]) eslope = float(calibration_line[0][0]) new_cal = str(float(e0)) + ' ' + str(float(eslope)) with fileinput.FileInput(cal_file, inplace=1) as file: for line in file: print(line.replace(old_cal, new_cal).rstrip()) return(cal_file)
def complement( reader, lens ): # Handle any ValueError, IndexError and OverflowError exceptions that may be thrown when # the bitsets are being created by skipping the problem lines complement_reader = BitsetSafeReaderWrapper( reader, lens=lens ) bitsets = complement_reader.binned_bitsets( upstream_pad=0, downstream_pad=0, lens=lens ) # NOT them all for key, value in bitsets.items(): value.invert() # Read remaining intervals and subtract for chrom in bitsets: bitset = bitsets[chrom] out_intervals = bits_set_in_range( bitset, 0, lens.get( chrom, MAX ) ) try: # Write the intervals for start, end in out_intervals: fields = ["." for x in range(max(complement_reader.chrom_col, complement_reader.start_col, complement_reader.end_col)+1)] # default the column to a + if it exists if complement_reader.strand_col < len( fields ) and complement_reader.strand_col >= 0: fields[complement_reader.strand_col] = "+" fields[complement_reader.chrom_col] = chrom fields[complement_reader.start_col] = start fields[complement_reader.end_col] = end new_interval = GenomicInterval(complement_reader, fields, complement_reader.chrom_col, complement_reader.start_col, complement_reader.end_col, complement_reader.strand_col, "+") yield new_interval except IndexError as e: complement_reader.skipped += 1 # no reason to stuff an entire bad file into memmory if complement_reader.skipped < 10: complement_reader.skipped_lines.append( ( complement_reader.linenum, complement_reader.current_line, str( e ) ) ) continue # def main(): # # test it all out # f1 = fileinput.FileInput("dataset_7.dat") # g1 = GenomicIntervalReader(f1) # for interval in complement(g1,{"chr":16000000}): # print "\t".join(interval) # # if __name__ == "__main__": # main()
def test_files_that_dont_end_with_newline(self): t1 = t2 = None try: t1 = writeTmp(1, ["A\nB\nC"]) t2 = writeTmp(2, ["D\nE\nF"]) fi = FileInput(files=(t1, t2)) lines = list(fi) self.assertEqual(lines, ["A\n", "B\n", "C", "D\n", "E\n", "F"]) self.assertEqual(fi.filelineno(), 3) self.assertEqual(fi.lineno(), 6) finally: remove_tempfiles(t1, t2) ## def test_unicode_filenames(self): ## # XXX A unicode string is always returned by writeTmp. ## # So is this needed? ## try: ## t1 = writeTmp(1, ["A\nB"]) ## encoding = sys.getfilesystemencoding() ## if encoding is None: ## encoding = 'ascii' ## fi = FileInput(files=str(t1, encoding)) ## lines = list(fi) ## self.assertEqual(lines, ["A\n", "B"]) ## finally: ## remove_tempfiles(t1)