Python fileinput 模块,FileInput() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用fileinput.FileInput()

项目:Qyoutube-dl    作者:lzambella    | 项目源码 | 文件源码
def download_with_info_file(self, info_filename):
        with contextlib.closing(fileinput.FileInput(
                [info_filename], mode='r',
                openhook=fileinput.hook_encoded('utf-8'))) as f:
            # FileInput doesn't have a read method, we can't call json.load
            info = self.filter_requested_info(json.loads('\n'.join(f)))
        try:
            self.process_ie_result(info, download=True)
        except DownloadError:
            webpage_url = info.get('webpage_url')
            if webpage_url is not None:
                self.report_warning('The info failed to download, trying with "%s"' % webpage_url)
                return self.download([webpage_url])
            else:
                raise
        return self._download_retcode
项目:youtube_downloader    作者:aksinghdce    | 项目源码 | 文件源码
def download_with_info_file(self, info_filename):
        with contextlib.closing(fileinput.FileInput(
                [info_filename], mode='r',
                openhook=fileinput.hook_encoded('utf-8'))) as f:
            # FileInput doesn't have a read method, we can't call json.load
            info = self.filter_requested_info(json.loads('\n'.join(f)))
        try:
            self.process_ie_result(info, download=True)
        except DownloadError:
            webpage_url = info.get('webpage_url')
            if webpage_url is not None:
                self.report_warning('The info failed to download, trying with "%s"' % webpage_url)
                return self.download([webpage_url])
            else:
                raise
        return self._download_retcode
项目:sbds    作者:steemit    | 项目源码 | 文件源码
def load_blocks_from_checkpoints(checkpoints_path, start, end):
    """Load blocks from checkpoints"""

    checkpoint_set = sbds.checkpoints.required_checkpoints_for_range(
        path=checkpoints_path, start=start, end=end)
    total_blocks_to_load = end - start

    with fileinput.FileInput(
            mode='r',
            files=checkpoint_set.checkpoint_paths,
            openhook=checkpoint_opener_wrapper(encoding='utf8')) as blocks:

        blocks = toolz.itertoolz.drop(checkpoint_set.initial_checkpoint_offset,
                                      blocks)
        if total_blocks_to_load > 0:
            with click.open_file('-', 'w', encoding='utf8') as f:
                for i, block in enumerate(blocks, 1):
                    click.echo(block.strip().encode('utf8'), file=f)
                    if i == total_blocks_to_load:
                        break
        else:
            with click.open_file('-', 'w', encoding='utf8') as f:
                for block in blocks:
                    click.echo(block.strip().encode('utf8'), file=f)
项目:sbds    作者:steemit    | 项目源码 | 文件源码
def test_checkpoint_access(ctx, checkpoints_path):
    """Test checkpoints access"""
    try:
        checkpoint_set = sbds.checkpoints.required_checkpoints_for_range(
            path=checkpoints_path, start=1, end=0)
        with fileinput.FileInput(
                mode='r',
                files=checkpoint_set.checkpoint_paths,
                openhook=checkpoint_opener_wrapper(encoding='utf8')) as blocks:

            for block in blocks:
                block_num = json.loads(block)['block_num']
                if block_num:
                    click.echo(
                        'Success: loaded block %s' % block_num, err=True)
                    ctx.exit(code=0)
                else:
                    click.echo('Failed to load block', err=True)
                    ctx.exit(code=127)
    except Exception as e:
        click.echo('Fail: %s' % e, err=True)
        ctx.exit(code=127)
项目:optimalvibes    作者:littlemika    | 项目源码 | 文件源码
def download_with_info_file(self, info_filename):
        with contextlib.closing(fileinput.FileInput(
                [info_filename], mode='r',
                openhook=fileinput.hook_encoded('utf-8'))) as f:
            # FileInput doesn't have a read method, we can't call json.load
            info = self.filter_requested_info(json.loads('\n'.join(f)))
        try:
            self.process_ie_result(info, download=True)
        except DownloadError:
            webpage_url = info.get('webpage_url')
            if webpage_url is not None:
                self.report_warning('The info failed to download, trying with "%s"' % webpage_url)
                return self.download([webpage_url])
            else:
                raise
        return self._download_retcode
项目:tvalacarta    作者:tvalacarta    | 项目源码 | 文件源码
def download_with_info_file(self, info_filename):
        with contextlib.closing(fileinput.FileInput(
                [info_filename], mode='r',
                openhook=fileinput.hook_encoded('utf-8'))) as f:
            # FileInput doesn't have a read method, we can't call json.load
            info = self.filter_requested_info(json.loads('\n'.join(f)))
        try:
            self.process_ie_result(info, download=True)
        except DownloadError:
            webpage_url = info.get('webpage_url')
            if webpage_url is not None:
                self.report_warning('The info failed to download, trying with "%s"' % webpage_url)
                return self.download([webpage_url])
            else:
                raise
        return self._download_retcode
项目:bx-python    作者:bxlab    | 项目源码 | 文件源码
def getlines( self ):
        if self.left:
            for line in self.left.getlines():
                yield line
        if len(self.lines) >= minregions:
            for line in self.lines:
                yield line
        if self.right:
            for line in self.right.getlines():
                yield line

## def main():
##     f1 = fileinput.FileInput("big.bed")
##     g1 = GenomicIntervalReader(f1)
##     returntree, extra = find_clusters(g1, mincols=50)
##     print "All found"
##     for chrom, value in returntree.items():
##         for start, end in value.getregions():
##             print chrom+"\t"+str(start)+"\t"+str(end)
##         for line in value.getlines():
##             print "Line:\t"+str(line)

## main()
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_zero_byte_files(self):
        t1 = t2 = t3 = t4 = None
        try:
            t1 = writeTmp(1, [""])
            t2 = writeTmp(2, [""])
            t3 = writeTmp(3, ["The only line there is.\n"])
            t4 = writeTmp(4, [""])
            fi = FileInput(files=(t1, t2, t3, t4))

            line = fi.readline()
            self.assertEqual(line, 'The only line there is.\n')
            self.assertEqual(fi.lineno(), 1)
            self.assertEqual(fi.filelineno(), 1)
            self.assertEqual(fi.filename(), t3)

            line = fi.readline()
            self.assertFalse(line)
            self.assertEqual(fi.lineno(), 1)
            self.assertEqual(fi.filelineno(), 0)
            self.assertEqual(fi.filename(), t4)
            fi.close()
        finally:
            remove_tempfiles(t1, t2, t3, t4)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_opening_mode(self):
        try:
            # invalid mode, should raise ValueError
            fi = FileInput(mode="w")
            self.fail("FileInput should reject invalid mode argument")
        except ValueError:
            pass
        t1 = None
        try:
            # try opening in universal newline mode
            t1 = writeTmp(1, [b"A\nB\r\nC\rD"], mode="wb")
            fi = FileInput(files=t1, mode="U")
            lines = list(fi)
            self.assertEqual(lines, ["A\n", "B\n", "C\n", "D"])
        finally:
            remove_tempfiles(t1)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_file_opening_hook(self):
        try:
            # cannot use openhook and inplace mode
            fi = FileInput(inplace=1, openhook=lambda f, m: None)
            self.fail("FileInput should raise if both inplace "
                             "and openhook arguments are given")
        except ValueError:
            pass
        try:
            fi = FileInput(openhook=1)
            self.fail("FileInput should check openhook for being callable")
        except ValueError:
            pass
        # XXX The rot13 codec was removed.
        #     So this test needs to be changed to use something else.
        #     (Or perhaps the API needs to change so we can just pass
        #     an encoding rather than using a hook?)
##         try:
##             t1 = writeTmp(1, ["A\nB"], mode="wb")
##             fi = FileInput(files=t1, openhook=hook_encoded("rot13"))
##             lines = list(fi)
##             self.assertEqual(lines, ["N\n", "O"])
##         finally:
##             remove_tempfiles(t1)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_zero_byte_files(self):
        try:
            t1 = writeTmp(1, [""])
            t2 = writeTmp(2, [""])
            t3 = writeTmp(3, ["The only line there is.\n"])
            t4 = writeTmp(4, [""])
            fi = FileInput(files=(t1, t2, t3, t4))

            line = fi.readline()
            self.assertEqual(line, 'The only line there is.\n')
            self.assertEqual(fi.lineno(), 1)
            self.assertEqual(fi.filelineno(), 1)
            self.assertEqual(fi.filename(), t3)

            line = fi.readline()
            self.assertFalse(line)
            self.assertEqual(fi.lineno(), 1)
            self.assertEqual(fi.filelineno(), 0)
            self.assertEqual(fi.filename(), t4)
            fi.close()
        finally:
            remove_tempfiles(t1, t2, t3, t4)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_readline(self):
        with open(TESTFN, 'wb') as f:
            f.write('A\nB\r\nC\r')
            # Fill TextIOWrapper buffer.
            f.write('123456789\n' * 1000)
            # Issue #20501: readline() shouldn't read whole file.
            f.write('\x80')
        self.addCleanup(safe_unlink, TESTFN)

        fi = FileInput(files=TESTFN, openhook=hook_encoded('ascii'))
        # The most likely failure is a UnicodeDecodeError due to the entire
        # file being read when it shouldn't have been.
        self.assertEqual(fi.readline(), u'A\n')
        self.assertEqual(fi.readline(), u'B\r\n')
        self.assertEqual(fi.readline(), u'C\r')
        with self.assertRaises(UnicodeDecodeError):
            # Read to the end of file.
            list(fi)
        fi.close()
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_modes(self):
        with open(TESTFN, 'wb') as f:
            # UTF-7 is a convenient, seldom used encoding
            f.write('A\nB\r\nC\rD+IKw-')
        self.addCleanup(safe_unlink, TESTFN)

        def check(mode, expected_lines):
            fi = FileInput(files=TESTFN, mode=mode,
                           openhook=hook_encoded('utf-7'))
            lines = list(fi)
            fi.close()
            self.assertEqual(lines, expected_lines)

        check('r', [u'A\n', u'B\r\n', u'C\r', u'D\u20ac'])
        check('rU', [u'A\n', u'B\r\n', u'C\r', u'D\u20ac'])
        check('U', [u'A\n', u'B\r\n', u'C\r', u'D\u20ac'])
        check('rb', [u'A\n', u'B\r\n', u'C\r', u'D\u20ac'])
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_zero_byte_files(self):
        try:
            t1 = writeTmp(1, [""])
            t2 = writeTmp(2, [""])
            t3 = writeTmp(3, ["The only line there is.\n"])
            t4 = writeTmp(4, [""])
            fi = FileInput(files=(t1, t2, t3, t4))

            line = fi.readline()
            self.assertEqual(line, 'The only line there is.\n')
            self.assertEqual(fi.lineno(), 1)
            self.assertEqual(fi.filelineno(), 1)
            self.assertEqual(fi.filename(), t3)

            line = fi.readline()
            self.assertFalse(line)
            self.assertEqual(fi.lineno(), 1)
            self.assertEqual(fi.filelineno(), 0)
            self.assertEqual(fi.filename(), t4)
            fi.close()
        finally:
            remove_tempfiles(t1, t2, t3, t4)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_readline(self):
        with open(TESTFN, 'wb') as f:
            f.write('A\nB\r\nC\r')
            # Fill TextIOWrapper buffer.
            f.write('123456789\n' * 1000)
            # Issue #20501: readline() shouldn't read whole file.
            f.write('\x80')
        self.addCleanup(safe_unlink, TESTFN)

        fi = FileInput(files=TESTFN, openhook=hook_encoded('ascii'))
        # The most likely failure is a UnicodeDecodeError due to the entire
        # file being read when it shouldn't have been.
        self.assertEqual(fi.readline(), u'A\n')
        self.assertEqual(fi.readline(), u'B\r\n')
        self.assertEqual(fi.readline(), u'C\r')
        with self.assertRaises(UnicodeDecodeError):
            # Read to the end of file.
            list(fi)
        fi.close()
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_modes(self):
        with open(TESTFN, 'wb') as f:
            # UTF-7 is a convenient, seldom used encoding
            f.write('A\nB\r\nC\rD+IKw-')
        self.addCleanup(safe_unlink, TESTFN)

        def check(mode, expected_lines):
            fi = FileInput(files=TESTFN, mode=mode,
                           openhook=hook_encoded('utf-7'))
            lines = list(fi)
            fi.close()
            self.assertEqual(lines, expected_lines)

        check('r', [u'A\n', u'B\r\n', u'C\r', u'D\u20ac'])
        check('rU', [u'A\n', u'B\r\n', u'C\r', u'D\u20ac'])
        check('U', [u'A\n', u'B\r\n', u'C\r', u'D\u20ac'])
        check('rb', [u'A\n', u'B\r\n', u'C\r', u'D\u20ac'])
项目:kodi-plugin.video.ted-talks-chinese    作者:daineseh    | 项目源码 | 文件源码
def download_with_info_file(self, info_filename):
        with contextlib.closing(fileinput.FileInput(
                [info_filename], mode='r',
                openhook=fileinput.hook_encoded('utf-8'))) as f:
            # FileInput doesn't have a read method, we can't call json.load
            info = self.filter_requested_info(json.loads('\n'.join(f)))
        try:
            self.process_ie_result(info, download=True)
        except DownloadError:
            webpage_url = info.get('webpage_url')
            if webpage_url is not None:
                self.report_warning('The info failed to download, trying with "%s"' % webpage_url)
                return self.download([webpage_url])
            else:
                raise
        return self._download_retcode
项目:GUIYoutube    作者:coltking    | 项目源码 | 文件源码
def download_with_info_file(self, info_filename):
        with contextlib.closing(fileinput.FileInput(
                [info_filename], mode='r',
                openhook=fileinput.hook_encoded('utf-8'))) as f:
            # FileInput doesn't have a read method, we can't call json.load
            info = self.filter_requested_info(json.loads('\n'.join(f)))
        try:
            self.process_ie_result(info, download=True)
        except DownloadError:
            webpage_url = info.get('webpage_url')
            if webpage_url is not None:
                self.report_warning('The info failed to download, trying with "%s"' % webpage_url)
                return self.download([webpage_url])
            else:
                raise
        return self._download_retcode
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_zero_byte_files(self):
        t1 = t2 = t3 = t4 = None
        try:
            t1 = writeTmp(1, [""])
            t2 = writeTmp(2, [""])
            t3 = writeTmp(3, ["The only line there is.\n"])
            t4 = writeTmp(4, [""])
            fi = FileInput(files=(t1, t2, t3, t4))

            line = fi.readline()
            self.assertEqual(line, 'The only line there is.\n')
            self.assertEqual(fi.lineno(), 1)
            self.assertEqual(fi.filelineno(), 1)
            self.assertEqual(fi.filename(), t3)

            line = fi.readline()
            self.assertFalse(line)
            self.assertEqual(fi.lineno(), 1)
            self.assertEqual(fi.filelineno(), 0)
            self.assertEqual(fi.filename(), t4)
            fi.close()
        finally:
            remove_tempfiles(t1, t2, t3, t4)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_opening_mode(self):
        try:
            # invalid mode, should raise ValueError
            fi = FileInput(mode="w")
            self.fail("FileInput should reject invalid mode argument")
        except ValueError:
            pass
        t1 = None
        try:
            # try opening in universal newline mode
            t1 = writeTmp(1, [b"A\nB\r\nC\rD"], mode="wb")
            fi = FileInput(files=t1, mode="U")
            lines = list(fi)
            self.assertEqual(lines, ["A\n", "B\n", "C\n", "D"])
        finally:
            remove_tempfiles(t1)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_readline_os_fstat_raises_OSError(self):
        """Tests invoking FileInput.readline() when os.fstat() raises OSError.
           This exception should be silently discarded."""

        os_fstat_orig = os.fstat
        os_fstat_replacement = UnconditionallyRaise(OSError)
        try:
            t = writeTmp(1, ["\n"])
            self.addCleanup(remove_tempfiles, t)
            with FileInput(files=[t], inplace=True) as fi:
                os.fstat = os_fstat_replacement
                fi.readline()
        finally:
            os.fstat = os_fstat_orig

        # sanity check to make sure that our test scenario was actually hit
        self.assertTrue(os_fstat_replacement.invoked,
                        "os.fstat() was not invoked")
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_readline_os_chmod_raises_OSError(self):
        """Tests invoking FileInput.readline() when os.chmod() raises OSError.
           This exception should be silently discarded."""

        os_chmod_orig = os.chmod
        os_chmod_replacement = UnconditionallyRaise(OSError)
        try:
            t = writeTmp(1, ["\n"])
            self.addCleanup(remove_tempfiles, t)
            with FileInput(files=[t], inplace=True) as fi:
                os.chmod = os_chmod_replacement
                fi.readline()
        finally:
            os.chmod = os_chmod_orig

        # sanity check to make sure that our test scenario was actually hit
        self.assertTrue(os_chmod_replacement.invoked,
                        "os.fstat() was not invoked")
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_fileno_when_ValueError_raised(self):
        class FilenoRaisesValueError(UnconditionallyRaise):
            def __init__(self):
                UnconditionallyRaise.__init__(self, ValueError)
            def fileno(self):
                self.__call__()

        unconditionally_raise_ValueError = FilenoRaisesValueError()
        t = writeTmp(1, ["\n"])
        self.addCleanup(remove_tempfiles, t)
        with FileInput(files=[t]) as fi:
            file_backup = fi._file
            try:
                fi._file = unconditionally_raise_ValueError
                result = fi.fileno()
            finally:
                fi._file = file_backup # make sure the file gets cleaned up

        # sanity check to make sure that our test scenario was actually hit
        self.assertTrue(unconditionally_raise_ValueError.invoked,
                        "_file.fileno() was not invoked")

        self.assertEqual(result, -1, "fileno() should return -1")
项目:Yv-dl    作者:r0oth3x49    | 项目源码 | 文件源码
def download_with_info_file(self, info_filename):
        with contextlib.closing(fileinput.FileInput(
                [info_filename], mode='r',
                openhook=fileinput.hook_encoded('utf-8'))) as f:
            # FileInput doesn't have a read method, we can't call json.load
            info = self.filter_requested_info(json.loads('\n'.join(f)))
        try:
            self.process_ie_result(info, download=True)
        except DownloadError:
            webpage_url = info.get('webpage_url')
            if webpage_url is not None:
                self.report_warning('The info failed to download, trying with "%s"' % webpage_url)
                return self.download([webpage_url])
            else:
                raise
        return self._download_retcode
项目:tino-thesis    作者:Tino92    | 项目源码 | 文件源码
def read_and_clean_files(clueweb_file, ann_file, data_dir, ann_dir):
    """
    Read file from data_dir and ann_dir, replace entity mentions and clean records in that file
    :param clueweb_file:
    :param ann_file:
    :param data_dir: Warc files directory
    :param ann_dir: Annotations directory
    :return: {'record_id': record_id,
        'replaced_record': cleaned_replaced_record,
        'cleaned_record': cleaned_record}
    """
    annotation_input = fileinput.FileInput(os.path.join(ann_dir, ann_file), openhook=fileinput.hook_compressed)
    annotation_list = []
    for line in annotation_input:
    annotation_list.append(Annotation.parse_annotation(line))

    warc_path = os.path.join(data_dir, clueweb_file)
    warc_file = warc.open(warc_path)
    print "Replacing entity mentions for ", clueweb_file, ":", ann_file, "..."
    start = time.time()
    warc_entry = WarcEntry(warc_path, warc_file, annotation_list)
    cleaned_records = warc_entry.replace_entity_mentions()
    end = time.time()
    print "Time used: ", end - start
    warc_file.close()
    return cleaned_records
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_zero_byte_files(self):
        try:
            t1 = writeTmp(1, [""])
            t2 = writeTmp(2, [""])
            t3 = writeTmp(3, ["The only line there is.\n"])
            t4 = writeTmp(4, [""])
            fi = FileInput(files=(t1, t2, t3, t4))

            line = fi.readline()
            self.assertEqual(line, 'The only line there is.\n')
            self.assertEqual(fi.lineno(), 1)
            self.assertEqual(fi.filelineno(), 1)
            self.assertEqual(fi.filename(), t3)

            line = fi.readline()
            self.assertFalse(line)
            self.assertEqual(fi.lineno(), 1)
            self.assertEqual(fi.filelineno(), 0)
            self.assertEqual(fi.filename(), t4)
            fi.close()
        finally:
            remove_tempfiles(t1, t2, t3, t4)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_readline(self):
        with open(TESTFN, 'wb') as f:
            f.write('A\nB\r\nC\r')
            # Fill TextIOWrapper buffer.
            f.write('123456789\n' * 1000)
            # Issue #20501: readline() shouldn't read whole file.
            f.write('\x80')
        self.addCleanup(safe_unlink, TESTFN)

        fi = FileInput(files=TESTFN, openhook=hook_encoded('ascii'), bufsize=8)
        # The most likely failure is a UnicodeDecodeError due to the entire
        # file being read when it shouldn't have been.
        self.assertEqual(fi.readline(), u'A\n')
        self.assertEqual(fi.readline(), u'B\r\n')
        self.assertEqual(fi.readline(), u'C\r')
        with self.assertRaises(UnicodeDecodeError):
            # Read to the end of file.
            list(fi)
        fi.close()
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_modes(self):
        with open(TESTFN, 'wb') as f:
            # UTF-7 is a convenient, seldom used encoding
            f.write('A\nB\r\nC\rD+IKw-')
        self.addCleanup(safe_unlink, TESTFN)

        def check(mode, expected_lines):
            fi = FileInput(files=TESTFN, mode=mode,
                           openhook=hook_encoded('utf-7'))
            lines = list(fi)
            fi.close()
            self.assertEqual(lines, expected_lines)

        check('r', [u'A\n', u'B\r\n', u'C\r', u'D\u20ac'])
        check('rU', [u'A\n', u'B\r\n', u'C\r', u'D\u20ac'])
        check('U', [u'A\n', u'B\r\n', u'C\r', u'D\u20ac'])
        check('rb', [u'A\n', u'B\r\n', u'C\r', u'D\u20ac'])
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_zero_byte_files(self):
        t1 = t2 = t3 = t4 = None
        try:
            t1 = writeTmp(1, [""])
            t2 = writeTmp(2, [""])
            t3 = writeTmp(3, ["The only line there is.\n"])
            t4 = writeTmp(4, [""])
            fi = FileInput(files=(t1, t2, t3, t4))

            line = fi.readline()
            self.assertEqual(line, 'The only line there is.\n')
            self.assertEqual(fi.lineno(), 1)
            self.assertEqual(fi.filelineno(), 1)
            self.assertEqual(fi.filename(), t3)

            line = fi.readline()
            self.assertFalse(line)
            self.assertEqual(fi.lineno(), 1)
            self.assertEqual(fi.filelineno(), 0)
            self.assertEqual(fi.filename(), t4)
            fi.close()
        finally:
            remove_tempfiles(t1, t2, t3, t4)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_opening_mode(self):
        try:
            # invalid mode, should raise ValueError
            fi = FileInput(mode="w")
            self.fail("FileInput should reject invalid mode argument")
        except ValueError:
            pass
        t1 = None
        try:
            # try opening in universal newline mode
            t1 = writeTmp(1, [b"A\nB\r\nC\rD"], mode="wb")
            with check_warnings(('', DeprecationWarning)):
                fi = FileInput(files=t1, mode="U")
            with check_warnings(('', DeprecationWarning)):
                lines = list(fi)
            self.assertEqual(lines, ["A\n", "B\n", "C\n", "D"])
        finally:
            remove_tempfiles(t1)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_readline(self):
        with open(TESTFN, 'wb') as f:
            f.write(b'A\nB\r\nC\r')
            # Fill TextIOWrapper buffer.
            f.write(b'123456789\n' * 1000)
            # Issue #20501: readline() shouldn't read whole file.
            f.write(b'\x80')
        self.addCleanup(safe_unlink, TESTFN)

        with FileInput(files=TESTFN,
                       openhook=hook_encoded('ascii'), bufsize=8) as fi:
            try:
                self.assertEqual(fi.readline(), 'A\n')
                self.assertEqual(fi.readline(), 'B\n')
                self.assertEqual(fi.readline(), 'C\n')
            except UnicodeDecodeError:
                self.fail('Read to end of file')
            with self.assertRaises(UnicodeDecodeError):
                # Read to the end of file.
                list(fi)
            self.assertEqual(fi.readline(), '')
            self.assertEqual(fi.readline(), '')
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_nextfile_oserror_deleting_backup(self):
        """Tests invoking FileInput.nextfile() when the attempt to delete
           the backup file would raise OSError.  This error is expected to be
           silently ignored"""

        os_unlink_orig = os.unlink
        os_unlink_replacement = UnconditionallyRaise(OSError)
        try:
            t = writeTmp(1, ["\n"])
            self.addCleanup(remove_tempfiles, t)
            with FileInput(files=[t], inplace=True) as fi:
                next(fi) # make sure the file is opened
                os.unlink = os_unlink_replacement
                fi.nextfile()
        finally:
            os.unlink = os_unlink_orig

        # sanity check to make sure that our test scenario was actually hit
        self.assertTrue(os_unlink_replacement.invoked,
                        "os.unlink() was not invoked")
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_readline_os_fstat_raises_OSError(self):
        """Tests invoking FileInput.readline() when os.fstat() raises OSError.
           This exception should be silently discarded."""

        os_fstat_orig = os.fstat
        os_fstat_replacement = UnconditionallyRaise(OSError)
        try:
            t = writeTmp(1, ["\n"])
            self.addCleanup(remove_tempfiles, t)
            with FileInput(files=[t], inplace=True) as fi:
                os.fstat = os_fstat_replacement
                fi.readline()
        finally:
            os.fstat = os_fstat_orig

        # sanity check to make sure that our test scenario was actually hit
        self.assertTrue(os_fstat_replacement.invoked,
                        "os.fstat() was not invoked")
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_readline_os_chmod_raises_OSError(self):
        """Tests invoking FileInput.readline() when os.chmod() raises OSError.
           This exception should be silently discarded."""

        os_chmod_orig = os.chmod
        os_chmod_replacement = UnconditionallyRaise(OSError)
        try:
            t = writeTmp(1, ["\n"])
            self.addCleanup(remove_tempfiles, t)
            with FileInput(files=[t], inplace=True) as fi:
                os.chmod = os_chmod_replacement
                fi.readline()
        finally:
            os.chmod = os_chmod_orig

        # sanity check to make sure that our test scenario was actually hit
        self.assertTrue(os_chmod_replacement.invoked,
                        "os.fstat() was not invoked")
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_zero_byte_files(self):
        try:
            t1 = writeTmp(1, [""])
            t2 = writeTmp(2, [""])
            t3 = writeTmp(3, ["The only line there is.\n"])
            t4 = writeTmp(4, [""])
            fi = FileInput(files=(t1, t2, t3, t4))

            line = fi.readline()
            self.assertEqual(line, 'The only line there is.\n')
            self.assertEqual(fi.lineno(), 1)
            self.assertEqual(fi.filelineno(), 1)
            self.assertEqual(fi.filename(), t3)

            line = fi.readline()
            self.assertFalse(line)
            self.assertEqual(fi.lineno(), 1)
            self.assertEqual(fi.filelineno(), 0)
            self.assertEqual(fi.filename(), t4)
            fi.close()
        finally:
            remove_tempfiles(t1, t2, t3, t4)
项目:voiceid    作者:sih4sing5hong5    | 项目源码 | 文件源码
def ensure_file_exists(filename):
    """Ensure file exists and is not empty, otherwise raise an IOError.

    :type filename: string
    :param filename: file to check"""
    if not os.path.exists(filename):
        raise IOError("File %s doesn't exist or not correctly created" 
                      % filename)
    if not (os.path.getsize(filename) > 0):
        raise IOError("File %s empty" % filename)

    (shortname, extension) = os.path.splitext(filename)
    if sys.platform == 'win32' and extension=='.seg':
        import fileinput
        for line in fileinput.FileInput(filename,inplace=0):
            line = line.replace("\\\\","/")
项目:get-youtube-subtitle-url-node    作者:joegesualdo    | 项目源码 | 文件源码
def download_with_info_file(self, info_filename):
        with contextlib.closing(fileinput.FileInput(
                [info_filename], mode='r',
                openhook=fileinput.hook_encoded('utf-8'))) as f:
            # FileInput doesn't have a read method, we can't call json.load
            info = self.filter_requested_info(json.loads('\n'.join(f)))
        try:
            self.process_ie_result(info, download=True)
        except DownloadError:
            webpage_url = info.get('webpage_url')
            if webpage_url is not None:
                self.report_warning('The info failed to download, trying with "%s"' % webpage_url)
                return self.download([webpage_url])
            else:
                raise
        return self._download_retcode
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_zero_byte_files(self):
        t1 = t2 = t3 = t4 = None
        try:
            t1 = writeTmp(1, [""])
            t2 = writeTmp(2, [""])
            t3 = writeTmp(3, ["The only line there is.\n"])
            t4 = writeTmp(4, [""])
            fi = FileInput(files=(t1, t2, t3, t4))

            line = fi.readline()
            self.assertEqual(line, 'The only line there is.\n')
            self.assertEqual(fi.lineno(), 1)
            self.assertEqual(fi.filelineno(), 1)
            self.assertEqual(fi.filename(), t3)

            line = fi.readline()
            self.assertFalse(line)
            self.assertEqual(fi.lineno(), 1)
            self.assertEqual(fi.filelineno(), 0)
            self.assertEqual(fi.filename(), t4)
            fi.close()
        finally:
            remove_tempfiles(t1, t2, t3, t4)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_opening_mode(self):
        try:
            # invalid mode, should raise ValueError
            fi = FileInput(mode="w")
            self.fail("FileInput should reject invalid mode argument")
        except ValueError:
            pass
        t1 = None
        try:
            # try opening in universal newline mode
            t1 = writeTmp(1, [b"A\nB\r\nC\rD"], mode="wb")
            with check_warnings(('', DeprecationWarning)):
                fi = FileInput(files=t1, mode="U")
            with check_warnings(('', DeprecationWarning)):
                lines = list(fi)
            self.assertEqual(lines, ["A\n", "B\n", "C\n", "D"])
        finally:
            remove_tempfiles(t1)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_readline(self):
        with open(TESTFN, 'wb') as f:
            f.write(b'A\nB\r\nC\r')
            # Fill TextIOWrapper buffer.
            f.write(b'123456789\n' * 1000)
            # Issue #20501: readline() shouldn't read whole file.
            f.write(b'\x80')
        self.addCleanup(safe_unlink, TESTFN)

        with FileInput(files=TESTFN,
                       openhook=hook_encoded('ascii'), bufsize=8) as fi:
            try:
                self.assertEqual(fi.readline(), 'A\n')
                self.assertEqual(fi.readline(), 'B\n')
                self.assertEqual(fi.readline(), 'C\n')
            except UnicodeDecodeError:
                self.fail('Read to end of file')
            with self.assertRaises(UnicodeDecodeError):
                # Read to the end of file.
                list(fi)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_nextfile_oserror_deleting_backup(self):
        """Tests invoking FileInput.nextfile() when the attempt to delete
           the backup file would raise OSError.  This error is expected to be
           silently ignored"""

        os_unlink_orig = os.unlink
        os_unlink_replacement = UnconditionallyRaise(OSError)
        try:
            t = writeTmp(1, ["\n"])
            self.addCleanup(remove_tempfiles, t)
            with FileInput(files=[t], inplace=True) as fi:
                next(fi) # make sure the file is opened
                os.unlink = os_unlink_replacement
                fi.nextfile()
        finally:
            os.unlink = os_unlink_orig

        # sanity check to make sure that our test scenario was actually hit
        self.assertTrue(os_unlink_replacement.invoked,
                        "os.unlink() was not invoked")
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_readline_os_fstat_raises_OSError(self):
        """Tests invoking FileInput.readline() when os.fstat() raises OSError.
           This exception should be silently discarded."""

        os_fstat_orig = os.fstat
        os_fstat_replacement = UnconditionallyRaise(OSError)
        try:
            t = writeTmp(1, ["\n"])
            self.addCleanup(remove_tempfiles, t)
            with FileInput(files=[t], inplace=True) as fi:
                os.fstat = os_fstat_replacement
                fi.readline()
        finally:
            os.fstat = os_fstat_orig

        # sanity check to make sure that our test scenario was actually hit
        self.assertTrue(os_fstat_replacement.invoked,
                        "os.fstat() was not invoked")
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_readline_os_chmod_raises_OSError(self):
        """Tests invoking FileInput.readline() when os.chmod() raises OSError.
           This exception should be silently discarded."""

        os_chmod_orig = os.chmod
        os_chmod_replacement = UnconditionallyRaise(OSError)
        try:
            t = writeTmp(1, ["\n"])
            self.addCleanup(remove_tempfiles, t)
            with FileInput(files=[t], inplace=True) as fi:
                os.chmod = os_chmod_replacement
                fi.readline()
        finally:
            os.chmod = os_chmod_orig

        # sanity check to make sure that our test scenario was actually hit
        self.assertTrue(os_chmod_replacement.invoked,
                        "os.fstat() was not invoked")
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_fileno_when_ValueError_raised(self):
        class FilenoRaisesValueError(UnconditionallyRaise):
            def __init__(self):
                UnconditionallyRaise.__init__(self, ValueError)
            def fileno(self):
                self.__call__()

        unconditionally_raise_ValueError = FilenoRaisesValueError()
        t = writeTmp(1, ["\n"])
        self.addCleanup(remove_tempfiles, t)
        with FileInput(files=[t]) as fi:
            file_backup = fi._file
            try:
                fi._file = unconditionally_raise_ValueError
                result = fi.fileno()
            finally:
                fi._file = file_backup # make sure the file gets cleaned up

        # sanity check to make sure that our test scenario was actually hit
        self.assertTrue(unconditionally_raise_ValueError.invoked,
                        "_file.fileno() was not invoked")

        self.assertEqual(result, -1, "fileno() should return -1")
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_modes(self):
        with open(TESTFN, 'wb') as f:
            # UTF-7 is a convenient, seldom used encoding
            f.write(b'A\nB\r\nC\rD+IKw-')
        self.addCleanup(safe_unlink, TESTFN)

        def check(mode, expected_lines):
            with FileInput(files=TESTFN, mode=mode,
                           openhook=hook_encoded('utf-7')) as fi:
                lines = list(fi)
            self.assertEqual(lines, expected_lines)

        check('r', ['A\n', 'B\n', 'C\n', 'D\u20ac'])
        with self.assertWarns(DeprecationWarning):
            check('rU', ['A\n', 'B\n', 'C\n', 'D\u20ac'])
        with self.assertWarns(DeprecationWarning):
            check('U', ['A\n', 'B\n', 'C\n', 'D\u20ac'])
        with self.assertRaises(ValueError):
            check('rb', ['A\n', 'B\r\n', 'C\r', 'D\u20ac'])
项目:ngDownloaderAPI    作者:manishbisht    | 项目源码 | 文件源码
def download_with_info_file(self, info_filename):
        with contextlib.closing(fileinput.FileInput(
                [info_filename], mode='r',
                openhook=fileinput.hook_encoded('utf-8'))) as f:
            # FileInput doesn't have a read method, we can't call json.load
            info = self.filter_requested_info(json.loads('\n'.join(f)))
        try:
            self.process_ie_result(info, download=True)
        except DownloadError:
            webpage_url = info.get('webpage_url')
            if webpage_url is not None:
                self.report_warning('The info failed to download, trying with "%s"' % webpage_url)
                return self.download([webpage_url])
            else:
                raise
        return self._download_retcode
项目:iocage    作者:iocage    | 项目源码 | 文件源码
def __upgrade_replace_text__(path, text, replace):
        with fileinput.FileInput(path, inplace=True, backup=".bak") as _file:
            for line in _file:
                print(line.replace(text, replace), end='')
项目:radwatch-analysis    作者:bearing    | 项目源码 | 文件源码
def calibration_correction(measurement, channel, energy):
    """
    calibration_correction implements a corrected energy calibration based
    on the array of channels and energies given as input. It performs a
    least squares regression fit of the channels and energies and implements
    the new energy calibration in a newly generated .Spe file. The new
    spectra file will contain the same information with only the old
    calibration changed.
    """
    cal_file = sh.copyfile(measurement, os.path.splitext(measurement)[0] +
                           '_recal.Spe')
    fix_measurement = SPEFile.SPEFile(cal_file)
    fix_measurement.read()
    old_cal = (str(float(fix_measurement.energy_cal[0])) + ' ' +
               str(float(fix_measurement.energy_cal[1])))

    a_matrix = np.vstack([channel, np.ones(len(channel))]).T
    calibration_line = np.linalg.lstsq(a_matrix, energy)

    e0 = float(calibration_line[0][1])
    eslope = float(calibration_line[0][0])
    new_cal = str(float(e0)) + ' ' + str(float(eslope))
    with fileinput.FileInput(cal_file, inplace=1) as file:
        for line in file:
            print(line.replace(old_cal, new_cal).rstrip())
    return(cal_file)
项目:bx-python    作者:bxlab    | 项目源码 | 文件源码
def complement( reader, lens ):
    # Handle any ValueError, IndexError and OverflowError exceptions that may be thrown when
    # the bitsets are being created by skipping the problem lines
    complement_reader = BitsetSafeReaderWrapper( reader, lens=lens )
    bitsets = complement_reader.binned_bitsets( upstream_pad=0, downstream_pad=0, lens=lens )
    # NOT them all
    for key, value in bitsets.items():
        value.invert()
    # Read remaining intervals and subtract
    for chrom in bitsets:
        bitset = bitsets[chrom]
        out_intervals = bits_set_in_range( bitset, 0, lens.get( chrom, MAX ) )
        try:
            # Write the intervals
            for start, end in out_intervals:
                fields = ["."  for x in range(max(complement_reader.chrom_col, complement_reader.start_col, complement_reader.end_col)+1)]
                # default the column to a + if it exists
                if complement_reader.strand_col < len( fields ) and complement_reader.strand_col >= 0:
                    fields[complement_reader.strand_col] = "+"
                fields[complement_reader.chrom_col] = chrom
                fields[complement_reader.start_col] = start
                fields[complement_reader.end_col] = end
                new_interval = GenomicInterval(complement_reader, fields, complement_reader.chrom_col, complement_reader.start_col, complement_reader.end_col, complement_reader.strand_col, "+")
                yield new_interval
        except IndexError as e:
            complement_reader.skipped += 1
            # no reason to stuff an entire bad file into memmory
            if complement_reader.skipped < 10:
                complement_reader.skipped_lines.append( ( complement_reader.linenum, complement_reader.current_line, str( e ) ) )
            continue


# def main():
#     # test it all out
#     f1 = fileinput.FileInput("dataset_7.dat")
#     g1 = GenomicIntervalReader(f1)
#     for interval in complement(g1,{"chr":16000000}):
#         print "\t".join(interval)
# 
# if __name__ == "__main__":
#     main()
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_files_that_dont_end_with_newline(self):
        t1 = t2 = None
        try:
            t1 = writeTmp(1, ["A\nB\nC"])
            t2 = writeTmp(2, ["D\nE\nF"])
            fi = FileInput(files=(t1, t2))
            lines = list(fi)
            self.assertEqual(lines, ["A\n", "B\n", "C", "D\n", "E\n", "F"])
            self.assertEqual(fi.filelineno(), 3)
            self.assertEqual(fi.lineno(), 6)
        finally:
            remove_tempfiles(t1, t2)

##     def test_unicode_filenames(self):
##         # XXX A unicode string is always returned by writeTmp.
##         #     So is this needed?
##         try:
##             t1 = writeTmp(1, ["A\nB"])
##             encoding = sys.getfilesystemencoding()
##             if encoding is None:
##                 encoding = 'ascii'
##             fi = FileInput(files=str(t1, encoding))
##             lines = list(fi)
##             self.assertEqual(lines, ["A\n", "B"])
##         finally:
##             remove_tempfiles(t1)