Python fileinput 模块,hook_encoded() 实例源码

我们从Python开源项目中,提取了以下29个代码示例,用于说明如何使用fileinput.hook_encoded()

项目:Qyoutube-dl    作者:lzambella    | 项目源码 | 文件源码
def download_with_info_file(self, info_filename):
        with contextlib.closing(fileinput.FileInput(
                [info_filename], mode='r',
                openhook=fileinput.hook_encoded('utf-8'))) as f:
            # FileInput doesn't have a read method, we can't call json.load
            info = self.filter_requested_info(json.loads('\n'.join(f)))
        try:
            self.process_ie_result(info, download=True)
        except DownloadError:
            webpage_url = info.get('webpage_url')
            if webpage_url is not None:
                self.report_warning('The info failed to download, trying with "%s"' % webpage_url)
                return self.download([webpage_url])
            else:
                raise
        return self._download_retcode
项目:youtube_downloader    作者:aksinghdce    | 项目源码 | 文件源码
def download_with_info_file(self, info_filename):
        with contextlib.closing(fileinput.FileInput(
                [info_filename], mode='r',
                openhook=fileinput.hook_encoded('utf-8'))) as f:
            # FileInput doesn't have a read method, we can't call json.load
            info = self.filter_requested_info(json.loads('\n'.join(f)))
        try:
            self.process_ie_result(info, download=True)
        except DownloadError:
            webpage_url = info.get('webpage_url')
            if webpage_url is not None:
                self.report_warning('The info failed to download, trying with "%s"' % webpage_url)
                return self.download([webpage_url])
            else:
                raise
        return self._download_retcode
项目:optimalvibes    作者:littlemika    | 项目源码 | 文件源码
def download_with_info_file(self, info_filename):
        with contextlib.closing(fileinput.FileInput(
                [info_filename], mode='r',
                openhook=fileinput.hook_encoded('utf-8'))) as f:
            # FileInput doesn't have a read method, we can't call json.load
            info = self.filter_requested_info(json.loads('\n'.join(f)))
        try:
            self.process_ie_result(info, download=True)
        except DownloadError:
            webpage_url = info.get('webpage_url')
            if webpage_url is not None:
                self.report_warning('The info failed to download, trying with "%s"' % webpage_url)
                return self.download([webpage_url])
            else:
                raise
        return self._download_retcode
项目:tvalacarta    作者:tvalacarta    | 项目源码 | 文件源码
def download_with_info_file(self, info_filename):
        with contextlib.closing(fileinput.FileInput(
                [info_filename], mode='r',
                openhook=fileinput.hook_encoded('utf-8'))) as f:
            # FileInput doesn't have a read method, we can't call json.load
            info = self.filter_requested_info(json.loads('\n'.join(f)))
        try:
            self.process_ie_result(info, download=True)
        except DownloadError:
            webpage_url = info.get('webpage_url')
            if webpage_url is not None:
                self.report_warning('The info failed to download, trying with "%s"' % webpage_url)
                return self.download([webpage_url])
            else:
                raise
        return self._download_retcode
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_file_opening_hook(self):
        try:
            # cannot use openhook and inplace mode
            fi = FileInput(inplace=1, openhook=lambda f, m: None)
            self.fail("FileInput should raise if both inplace "
                             "and openhook arguments are given")
        except ValueError:
            pass
        try:
            fi = FileInput(openhook=1)
            self.fail("FileInput should check openhook for being callable")
        except ValueError:
            pass
        # XXX The rot13 codec was removed.
        #     So this test needs to be changed to use something else.
        #     (Or perhaps the API needs to change so we can just pass
        #     an encoding rather than using a hook?)
##         try:
##             t1 = writeTmp(1, ["A\nB"], mode="wb")
##             fi = FileInput(files=t1, openhook=hook_encoded("rot13"))
##             lines = list(fi)
##             self.assertEqual(lines, ["N\n", "O"])
##         finally:
##             remove_tempfiles(t1)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_file_opening_hook(self):
        try:
            # cannot use openhook and inplace mode
            fi = FileInput(inplace=1, openhook=lambda f,m: None)
            self.fail("FileInput should raise if both inplace "
                             "and openhook arguments are given")
        except ValueError:
            pass
        try:
            fi = FileInput(openhook=1)
            self.fail("FileInput should check openhook for being callable")
        except ValueError:
            pass
        try:
            # UTF-7 is a convenient, seldom used encoding
            t1 = writeTmp(1, ['+AEE-\n+AEI-'], mode="wb")
            fi = FileInput(files=t1, openhook=hook_encoded("utf-7"))
            lines = list(fi)
            self.assertEqual(lines, [u'A\n', u'B'])
        finally:
            remove_tempfiles(t1)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_readline(self):
        with open(TESTFN, 'wb') as f:
            f.write('A\nB\r\nC\r')
            # Fill TextIOWrapper buffer.
            f.write('123456789\n' * 1000)
            # Issue #20501: readline() shouldn't read whole file.
            f.write('\x80')
        self.addCleanup(safe_unlink, TESTFN)

        fi = FileInput(files=TESTFN, openhook=hook_encoded('ascii'))
        # The most likely failure is a UnicodeDecodeError due to the entire
        # file being read when it shouldn't have been.
        self.assertEqual(fi.readline(), u'A\n')
        self.assertEqual(fi.readline(), u'B\r\n')
        self.assertEqual(fi.readline(), u'C\r')
        with self.assertRaises(UnicodeDecodeError):
            # Read to the end of file.
            list(fi)
        fi.close()
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_modes(self):
        with open(TESTFN, 'wb') as f:
            # UTF-7 is a convenient, seldom used encoding
            f.write('A\nB\r\nC\rD+IKw-')
        self.addCleanup(safe_unlink, TESTFN)

        def check(mode, expected_lines):
            fi = FileInput(files=TESTFN, mode=mode,
                           openhook=hook_encoded('utf-7'))
            lines = list(fi)
            fi.close()
            self.assertEqual(lines, expected_lines)

        check('r', [u'A\n', u'B\r\n', u'C\r', u'D\u20ac'])
        check('rU', [u'A\n', u'B\r\n', u'C\r', u'D\u20ac'])
        check('U', [u'A\n', u'B\r\n', u'C\r', u'D\u20ac'])
        check('rb', [u'A\n', u'B\r\n', u'C\r', u'D\u20ac'])
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_file_opening_hook(self):
        try:
            # cannot use openhook and inplace mode
            fi = FileInput(inplace=1, openhook=lambda f,m: None)
            self.fail("FileInput should raise if both inplace "
                             "and openhook arguments are given")
        except ValueError:
            pass
        try:
            fi = FileInput(openhook=1)
            self.fail("FileInput should check openhook for being callable")
        except ValueError:
            pass
        try:
            # UTF-7 is a convenient, seldom used encoding
            t1 = writeTmp(1, ['+AEE-\n+AEI-'], mode="wb")
            fi = FileInput(files=t1, openhook=hook_encoded("utf-7"))
            lines = list(fi)
            self.assertEqual(lines, [u'A\n', u'B'])
        finally:
            remove_tempfiles(t1)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_readline(self):
        with open(TESTFN, 'wb') as f:
            f.write('A\nB\r\nC\r')
            # Fill TextIOWrapper buffer.
            f.write('123456789\n' * 1000)
            # Issue #20501: readline() shouldn't read whole file.
            f.write('\x80')
        self.addCleanup(safe_unlink, TESTFN)

        fi = FileInput(files=TESTFN, openhook=hook_encoded('ascii'))
        # The most likely failure is a UnicodeDecodeError due to the entire
        # file being read when it shouldn't have been.
        self.assertEqual(fi.readline(), u'A\n')
        self.assertEqual(fi.readline(), u'B\r\n')
        self.assertEqual(fi.readline(), u'C\r')
        with self.assertRaises(UnicodeDecodeError):
            # Read to the end of file.
            list(fi)
        fi.close()
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_modes(self):
        with open(TESTFN, 'wb') as f:
            # UTF-7 is a convenient, seldom used encoding
            f.write('A\nB\r\nC\rD+IKw-')
        self.addCleanup(safe_unlink, TESTFN)

        def check(mode, expected_lines):
            fi = FileInput(files=TESTFN, mode=mode,
                           openhook=hook_encoded('utf-7'))
            lines = list(fi)
            fi.close()
            self.assertEqual(lines, expected_lines)

        check('r', [u'A\n', u'B\r\n', u'C\r', u'D\u20ac'])
        check('rU', [u'A\n', u'B\r\n', u'C\r', u'D\u20ac'])
        check('U', [u'A\n', u'B\r\n', u'C\r', u'D\u20ac'])
        check('rb', [u'A\n', u'B\r\n', u'C\r', u'D\u20ac'])
项目:kodi-plugin.video.ted-talks-chinese    作者:daineseh    | 项目源码 | 文件源码
def download_with_info_file(self, info_filename):
        with contextlib.closing(fileinput.FileInput(
                [info_filename], mode='r',
                openhook=fileinput.hook_encoded('utf-8'))) as f:
            # FileInput doesn't have a read method, we can't call json.load
            info = self.filter_requested_info(json.loads('\n'.join(f)))
        try:
            self.process_ie_result(info, download=True)
        except DownloadError:
            webpage_url = info.get('webpage_url')
            if webpage_url is not None:
                self.report_warning('The info failed to download, trying with "%s"' % webpage_url)
                return self.download([webpage_url])
            else:
                raise
        return self._download_retcode
项目:GUIYoutube    作者:coltking    | 项目源码 | 文件源码
def download_with_info_file(self, info_filename):
        with contextlib.closing(fileinput.FileInput(
                [info_filename], mode='r',
                openhook=fileinput.hook_encoded('utf-8'))) as f:
            # FileInput doesn't have a read method, we can't call json.load
            info = self.filter_requested_info(json.loads('\n'.join(f)))
        try:
            self.process_ie_result(info, download=True)
        except DownloadError:
            webpage_url = info.get('webpage_url')
            if webpage_url is not None:
                self.report_warning('The info failed to download, trying with "%s"' % webpage_url)
                return self.download([webpage_url])
            else:
                raise
        return self._download_retcode
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test(self):
        encoding = object()
        result = fileinput.hook_encoded(encoding)

        fake_open = InvocationRecorder()
        original_open = builtins.open
        builtins.open = fake_open
        try:
            filename = object()
            mode = object()
            open_result = result(filename, mode)
        finally:
            builtins.open = original_open

        self.assertEqual(fake_open.invocation_count, 1)

        args, kwargs = fake_open.last_invocation
        self.assertIs(args[0], filename)
        self.assertIs(args[1], mode)
        self.assertIs(kwargs.pop('encoding'), encoding)
        self.assertFalse(kwargs)
项目:Yv-dl    作者:r0oth3x49    | 项目源码 | 文件源码
def download_with_info_file(self, info_filename):
        with contextlib.closing(fileinput.FileInput(
                [info_filename], mode='r',
                openhook=fileinput.hook_encoded('utf-8'))) as f:
            # FileInput doesn't have a read method, we can't call json.load
            info = self.filter_requested_info(json.loads('\n'.join(f)))
        try:
            self.process_ie_result(info, download=True)
        except DownloadError:
            webpage_url = info.get('webpage_url')
            if webpage_url is not None:
                self.report_warning('The info failed to download, trying with "%s"' % webpage_url)
                return self.download([webpage_url])
            else:
                raise
        return self._download_retcode
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_file_opening_hook(self):
        try:
            # cannot use openhook and inplace mode
            fi = FileInput(inplace=1, openhook=lambda f,m: None)
            self.fail("FileInput should raise if both inplace "
                             "and openhook arguments are given")
        except ValueError:
            pass
        try:
            fi = FileInput(openhook=1)
            self.fail("FileInput should check openhook for being callable")
        except ValueError:
            pass
        try:
            t1 = writeTmp(1, ["A\nB"], mode="wb")
            fi = FileInput(files=t1, openhook=hook_encoded("rot13"))
            lines = list(fi)
            self.assertEqual(lines, ["N\n", "O"])
        finally:
            remove_tempfiles(t1)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_readline(self):
        with open(TESTFN, 'wb') as f:
            f.write('A\nB\r\nC\r')
            # Fill TextIOWrapper buffer.
            f.write('123456789\n' * 1000)
            # Issue #20501: readline() shouldn't read whole file.
            f.write('\x80')
        self.addCleanup(safe_unlink, TESTFN)

        fi = FileInput(files=TESTFN, openhook=hook_encoded('ascii'), bufsize=8)
        # The most likely failure is a UnicodeDecodeError due to the entire
        # file being read when it shouldn't have been.
        self.assertEqual(fi.readline(), u'A\n')
        self.assertEqual(fi.readline(), u'B\r\n')
        self.assertEqual(fi.readline(), u'C\r')
        with self.assertRaises(UnicodeDecodeError):
            # Read to the end of file.
            list(fi)
        fi.close()
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_modes(self):
        with open(TESTFN, 'wb') as f:
            # UTF-7 is a convenient, seldom used encoding
            f.write('A\nB\r\nC\rD+IKw-')
        self.addCleanup(safe_unlink, TESTFN)

        def check(mode, expected_lines):
            fi = FileInput(files=TESTFN, mode=mode,
                           openhook=hook_encoded('utf-7'))
            lines = list(fi)
            fi.close()
            self.assertEqual(lines, expected_lines)

        check('r', [u'A\n', u'B\r\n', u'C\r', u'D\u20ac'])
        check('rU', [u'A\n', u'B\r\n', u'C\r', u'D\u20ac'])
        check('U', [u'A\n', u'B\r\n', u'C\r', u'D\u20ac'])
        check('rb', [u'A\n', u'B\r\n', u'C\r', u'D\u20ac'])
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_readline(self):
        with open(TESTFN, 'wb') as f:
            f.write(b'A\nB\r\nC\r')
            # Fill TextIOWrapper buffer.
            f.write(b'123456789\n' * 1000)
            # Issue #20501: readline() shouldn't read whole file.
            f.write(b'\x80')
        self.addCleanup(safe_unlink, TESTFN)

        with FileInput(files=TESTFN,
                       openhook=hook_encoded('ascii'), bufsize=8) as fi:
            try:
                self.assertEqual(fi.readline(), 'A\n')
                self.assertEqual(fi.readline(), 'B\n')
                self.assertEqual(fi.readline(), 'C\n')
            except UnicodeDecodeError:
                self.fail('Read to end of file')
            with self.assertRaises(UnicodeDecodeError):
                # Read to the end of file.
                list(fi)
            self.assertEqual(fi.readline(), '')
            self.assertEqual(fi.readline(), '')
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test(self):
        encoding = object()
        result = fileinput.hook_encoded(encoding)

        fake_open = InvocationRecorder()
        original_open = builtins.open
        builtins.open = fake_open
        try:
            filename = object()
            mode = object()
            open_result = result(filename, mode)
        finally:
            builtins.open = original_open

        self.assertEqual(fake_open.invocation_count, 1)

        args, kwargs = fake_open.last_invocation
        self.assertIs(args[0], filename)
        self.assertIs(args[1], mode)
        self.assertIs(kwargs.pop('encoding'), encoding)
        self.assertFalse(kwargs)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_modes(self):
        with open(TESTFN, 'wb') as f:
            # UTF-7 is a convenient, seldom used encoding
            f.write(b'A\nB\r\nC\rD+IKw-')
        self.addCleanup(safe_unlink, TESTFN)

        def check(mode, expected_lines):
            with FileInput(files=TESTFN, mode=mode,
                           openhook=hook_encoded('utf-7')) as fi:
                lines = list(fi)
            self.assertEqual(lines, expected_lines)

        check('r', ['A\n', 'B\n', 'C\n', 'D\u20ac'])
        with self.assertWarns(DeprecationWarning):
            check('rU', ['A\n', 'B\n', 'C\n', 'D\u20ac'])
        with self.assertWarns(DeprecationWarning):
            check('U', ['A\n', 'B\n', 'C\n', 'D\u20ac'])
        with self.assertRaises(ValueError):
            check('rb', ['A\n', 'B\r\n', 'C\r', 'D\u20ac'])
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_file_opening_hook(self):
        try:
            # cannot use openhook and inplace mode
            fi = FileInput(inplace=1, openhook=lambda f,m: None)
            self.fail("FileInput should raise if both inplace "
                             "and openhook arguments are given")
        except ValueError:
            pass
        try:
            fi = FileInput(openhook=1)
            self.fail("FileInput should check openhook for being callable")
        except ValueError:
            pass
        try:
            t1 = writeTmp(1, ["A\nB"], mode="wb")
            fi = FileInput(files=t1, openhook=hook_encoded("rot13"))
            lines = list(fi)
            self.assertEqual(lines, ["N\n", "O"])
        finally:
            remove_tempfiles(t1)
项目:get-youtube-subtitle-url-node    作者:joegesualdo    | 项目源码 | 文件源码
def download_with_info_file(self, info_filename):
        with contextlib.closing(fileinput.FileInput(
                [info_filename], mode='r',
                openhook=fileinput.hook_encoded('utf-8'))) as f:
            # FileInput doesn't have a read method, we can't call json.load
            info = self.filter_requested_info(json.loads('\n'.join(f)))
        try:
            self.process_ie_result(info, download=True)
        except DownloadError:
            webpage_url = info.get('webpage_url')
            if webpage_url is not None:
                self.report_warning('The info failed to download, trying with "%s"' % webpage_url)
                return self.download([webpage_url])
            else:
                raise
        return self._download_retcode
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_readline(self):
        with open(TESTFN, 'wb') as f:
            f.write(b'A\nB\r\nC\r')
            # Fill TextIOWrapper buffer.
            f.write(b'123456789\n' * 1000)
            # Issue #20501: readline() shouldn't read whole file.
            f.write(b'\x80')
        self.addCleanup(safe_unlink, TESTFN)

        with FileInput(files=TESTFN,
                       openhook=hook_encoded('ascii'), bufsize=8) as fi:
            try:
                self.assertEqual(fi.readline(), 'A\n')
                self.assertEqual(fi.readline(), 'B\n')
                self.assertEqual(fi.readline(), 'C\n')
            except UnicodeDecodeError:
                self.fail('Read to end of file')
            with self.assertRaises(UnicodeDecodeError):
                # Read to the end of file.
                list(fi)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test(self):
        encoding = object()
        result = fileinput.hook_encoded(encoding)

        fake_open = InvocationRecorder()
        original_open = builtins.open
        builtins.open = fake_open
        try:
            filename = object()
            mode = object()
            open_result = result(filename, mode)
        finally:
            builtins.open = original_open

        self.assertEqual(fake_open.invocation_count, 1)

        args, kwargs = fake_open.last_invocation
        self.assertIs(args[0], filename)
        self.assertIs(args[1], mode)
        self.assertIs(kwargs.pop('encoding'), encoding)
        self.assertFalse(kwargs)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_modes(self):
        with open(TESTFN, 'wb') as f:
            # UTF-7 is a convenient, seldom used encoding
            f.write(b'A\nB\r\nC\rD+IKw-')
        self.addCleanup(safe_unlink, TESTFN)

        def check(mode, expected_lines):
            with FileInput(files=TESTFN, mode=mode,
                           openhook=hook_encoded('utf-7')) as fi:
                lines = list(fi)
            self.assertEqual(lines, expected_lines)

        check('r', ['A\n', 'B\n', 'C\n', 'D\u20ac'])
        with self.assertWarns(DeprecationWarning):
            check('rU', ['A\n', 'B\n', 'C\n', 'D\u20ac'])
        with self.assertWarns(DeprecationWarning):
            check('U', ['A\n', 'B\n', 'C\n', 'D\u20ac'])
        with self.assertRaises(ValueError):
            check('rb', ['A\n', 'B\r\n', 'C\r', 'D\u20ac'])
项目:ngDownloaderAPI    作者:manishbisht    | 项目源码 | 文件源码
def download_with_info_file(self, info_filename):
        with contextlib.closing(fileinput.FileInput(
                [info_filename], mode='r',
                openhook=fileinput.hook_encoded('utf-8'))) as f:
            # FileInput doesn't have a read method, we can't call json.load
            info = self.filter_requested_info(json.loads('\n'.join(f)))
        try:
            self.process_ie_result(info, download=True)
        except DownloadError:
            webpage_url = info.get('webpage_url')
            if webpage_url is not None:
                self.report_warning('The info failed to download, trying with "%s"' % webpage_url)
                return self.download([webpage_url])
            else:
                raise
        return self._download_retcode
项目:CVProject    作者:hieuxinhe94    | 项目源码 | 文件源码
def _main(argv):
    import optparse

    usage = "usage: %prog [options] <file1 file2 ...>\n<stdin> will be used as input source if no file specified."

    parser = optparse.OptionParser(usage=usage, version="%%prog %s @ Copyright %s" % (__version__, __copyright__))
    parser.add_option('-t', '--target-language', metavar='zh-CN',
                      help='specify target language to translate the source text into')
    parser.add_option('-s', '--source-language', default='auto', metavar='en',
                      help='specify source language, if not provide it will identify the source language automatically')
    parser.add_option('-i', '--input-encoding', default=sys.getfilesystemencoding(), metavar='utf-8',
                      help='specify input encoding, default to current console system encoding')
    parser.add_option('-o', '--output-encoding', default=sys.getfilesystemencoding(), metavar='utf-8',
                      help='specify output encoding, default to current console system encoding')
    parser.add_option('-r', '--roman', action="store_true",
                      help='change translation writing to roman (e.g.: output pinyin instead of Chinese charactors for Chinese. It only valid for some of the target languages)')


    options, args = parser.parse_args(argv[1:])

    if not options.target_language:
        print('Error: missing target language!')
        parser.print_help()
        return

    writing = WRITING_NATIVE
    if options.roman:
        writing = WRITING_ROMAN

    gs = Goslate(writing=writing)
    import fileinput
    # inputs = fileinput.input(args, mode='rU', openhook=fileinput.hook_encoded(options.input_encoding))
    inputs = fileinput.input(args, mode='rb')
    inputs = (i.decode(options.input_encoding) for i in inputs)
    outputs = gs.translate(inputs, options.target_language, options.source_language)
    for i in outputs:
        sys.stdout.write((i+u'\n').encode(options.output_encoding))
        sys.stdout.flush()
项目:Ossian    作者:CSTR-Edinburgh    | 项目源码 | 文件源码
def convert_lexicon(self, files, format='festival'):

        print '     convert lexicon...'
        entries = {}
        seen_tags = {}  ## for reporting
        if format=='festival':
            for line in fileinput.input(files, openhook=fileinput.hook_encoded("utf8")):
                line = line.strip(' \n')
                if line.startswith(';') or line == '' or line == 'MNCL':
                    continue ## ignore Scheme comment line and empty lines

                (headword, tags, pronun) = self.read_festival_lexentry(line)

                if headword not in entries:
                    entries[headword] = []
                entries[headword].append([tags, pronun])
                seen_tags[tags] = ''
        else:
            sys.exit('Unknown lexicon format: %s'%(format))

        print 'Tags in lexicon: ' 
        print seen_tags.keys()

        f = codecs.open(self.lexicon_fname, 'w', encoding='utf8')
        for head_word in sorted(entries.keys()):
            for (tag, pron) in entries[head_word]:
                f.write('%s\t%s\t%s\n'%(head_word, tag, pron))
        f.close() 

        self.entries = entries