Python six 模块,print_() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用six.print_()

项目:errcron    作者:attakei    | 项目源码 | 文件源码
def test_timezone_in_plugin(capsys):
    class ActivateImpl(MockedImpl):
        TIMEZONE = 'Asia/Tokyo'
        CRONTAB = [
            '0 0 * * * .print_datetime',
        ]

        def activate(self):
            self.activate_crontab()

        def print_datetime(self, polled_time):
            six.print_(polled_time.strftime('%Y-%m-%d'), end='')

    plugin = ActivateImpl()
    plugin.activate()
    with freeze_time('2016-01-01 00:00:01'):
        plugin.poll_crontab()
        out, err = capsys.readouterr()
        assert out != '2016-01-01'
项目:errcron    作者:attakei    | 项目源码 | 文件源码
def test_timezone_in_config(capsys):
    class MockConfig(object):
        TIMEZONE = 'Asia/Tokyo'

    class ActivateImpl(MockedImpl):
        CRONTAB = [
            '0 0 * * * .print_datetime',
        ]

        def print_datetime(self, polled_time):
            six.print_(polled_time.strftime('%Y-%m-%d'), end='')

    plugin = ActivateImpl()
    plugin.activate()
    setattr(plugin, 'bot_config', MockConfig())
    with freeze_time('2016-01-01 00:00:01'):
        plugin.poll_crontab()
        out, err = capsys.readouterr()
        assert out != '2016-01-01'
项目:errcron    作者:attakei    | 项目源码 | 文件源码
def test_timezone_in_plugin(capsys):
    class ActivateImpl(MockedImpl):
        TIMEZONE = 'Asia/Tokyo'
        CRONTAB = [
            '0 0 * * * .print_datetime',
        ]

        def activate(self):
            self.activate_crontab()

        def print_datetime(self, polled_time):
            six.print_(polled_time.strftime('%Y-%m-%d'), end='')

    plugin = ActivateImpl()
    plugin.activate()
    with freeze_time('2016-01-01 00:00:01'):
        plugin.poll_crontab()
        out, err = capsys.readouterr()
        assert out != '2016-01-01'
项目:errcron    作者:attakei    | 项目源码 | 文件源码
def test_timezone_in_config(capsys):
    class MockConfig(object):
        TIMEZONE = 'Asia/Tokyo'

    class ActivateImpl(MockedImpl):
        CRONTAB = [
            '0 0 * * * .print_datetime',
        ]

        def print_datetime(self, polled_time):
            six.print_(polled_time.strftime('%Y-%m-%d'), end='')

    plugin = ActivateImpl()
    plugin.activate()
    setattr(plugin, 'bot_config', MockConfig())
    with freeze_time('2016-01-01 00:00:01'):
        plugin.poll_crontab()
        out, err = capsys.readouterr()
        assert out != '2016-01-01'
项目:pyqubes    作者:tommilligan    | 项目源码 | 文件源码
def echo(args, file=None):
    '''
    Echo a list of arguments (as given to ``subprocess.call``) to the given stream.

    This defaults to ``stdout``, but can be changed to any stream-like object such as a file handle.

    :param args: A string or list of strings
    :param file: A file-like object to stream output to. Defaults to ``sys.stdout``
    '''
    if file is None:
        file = sys.stdout

    if isinstance(args, six.string_types + (six.text_type,)):
        args = [args]

    six.print_(*args, file=file, flush=True)
项目:TCP-IP    作者:JackZ0    | 项目源码 | 文件源码
def sign(cls, args):
        """Sign."""
        key = args.alg.kty.load(args.key.read())
        args.key.close()
        if args.protect is None:
            args.protect = []
        if args.compact:
            args.protect.append('alg')

        sig = JWS.sign(payload=sys.stdin.read().encode(), key=key, alg=args.alg,
                       protect=set(args.protect))

        if args.compact:
            six.print_(sig.to_compact().decode('utf-8'))
        else:  # JSON
            six.print_(sig.json_dumps_pretty())
项目:TCP-IP    作者:JackZ0    | 项目源码 | 文件源码
def verify(cls, args):
        """Verify."""
        if args.compact:
            sig = JWS.from_compact(sys.stdin.read().encode())
        else:  # JSON
            try:
                sig = JWS.json_loads(sys.stdin.read())
            except errors.Error as error:
                six.print_(error)
                return -1

        if args.key is not None:
            assert args.kty is not None
            key = args.kty.load(args.key.read()).public_key()
            args.key.close()
        else:
            key = None

        sys.stdout.write(sig.payload)
        return not sig.verify(key=key)
项目:kripodb    作者:3D-e-Chem    | 项目源码 | 文件源码
def _ingest_pairs(self, pairs, oid2nid, frame_size, limit, single_sided):
        oid2nid_v = np.vectorize(oid2nid.get)
        # whole pairs set does not fit in memory, so split it in frames with `frame_size` number of pairs.
        for start in range(0, limit, frame_size):
            stop = frame_size + start
            t1 = process_time()
            six.print_('Fetching pairs {0}:{1} of {2} ... '.format(start, stop, limit), end='', flush=True)
            raw_frame = pairs.read(start=start, stop=stop)
            t2 = process_time()
            six.print_('{0}s, Parsing ... '.format(int(t2 - t1)), flush=True)
            frame = self._translate_frame(raw_frame, oid2nid_v, single_sided)
            t3 = process_time()
            six.print_('Writing ... '.format(int(t3 - t2)), flush=True)
            # alternate direction, to make use of cached chunks of prev frame
            self._ingest_pairs_frame(frame)
            del frame
            t4 = process_time()
            six.print_('{0}s, Done with {1}:{2} in {3}s'.format(int(t4 - t3), start, stop, int(t4 - t1)), flush=True)
项目:kripodb    作者:3D-e-Chem    | 项目源码 | 文件源码
def to_pairs(self, pairs):
        """Copies labels and scores from self to pairs matrix.

        Args:
            pairs (SimilarityMatrix):

        """
        six.print_('copy labels', flush=True)
        self.build_label_cache()
        pairs.labels.update(self.cache_l2i)

        six.print_('copy matrix to pairs', flush=True)
        limit = self.scores.shape[0]
        bar = ProgressBar()
        for query_id in bar(six.moves.range(0, limit)):
            subjects = self.scores[query_id, ...]
            filled_subjects_ids = subjects.nonzero()[0]
            filled_subjects = [(query_id, i, subjects[i]) for i in filled_subjects_ids if query_id < i]
            if filled_subjects:
                pairs.pairs.table.append(filled_subjects)
项目:lookml-gen    作者:symphonyrm    | 项目源码 | 文件源码
def test_pdt_view():
    testname = 'pdt_view'
    pdt = view.DerivedTable(sql="SELECT id, count(*) c FROM table GROUP BY id",
                            sql_trigger_value='DATE()',
                            indexes=['id'])
    v = view.View(testname)
    v.derived_table = pdt
    v.add_field(field.Dimension('id', type='number',
                                primary_key=True))
    v.add_field(field.Dimension('c', type='number'))
    v.add_field(field.Measure('sum_c', sql='${TABLE}.c', type='sum'))
    f = six.StringIO()
    v.generate_lookml(f, format_options=test_format_options)
    lookml = f.getvalue()
    six.print_(lookml)
    with open(os.path.join(os.path.dirname(__file__),
                           'expected_output/%s.lkml' % testname),
              'rt') as expected:
        assert lookml == expected.read()
项目:certbot    作者:nikoloskii    | 项目源码 | 文件源码
def sign(cls, args):
        """Sign."""
        key = args.alg.kty.load(args.key.read())
        args.key.close()
        if args.protect is None:
            args.protect = []
        if args.compact:
            args.protect.append('alg')

        sig = JWS.sign(payload=sys.stdin.read().encode(), key=key, alg=args.alg,
                       protect=set(args.protect))

        if args.compact:
            six.print_(sig.to_compact().decode('utf-8'))
        else:  # JSON
            six.print_(sig.json_dumps_pretty())
项目:certbot    作者:nikoloskii    | 项目源码 | 文件源码
def verify(cls, args):
        """Verify."""
        if args.compact:
            sig = JWS.from_compact(sys.stdin.read().encode())
        else:  # JSON
            try:
                sig = JWS.json_loads(sys.stdin.read())
            except errors.Error as error:
                six.print_(error)
                return -1

        if args.key is not None:
            assert args.kty is not None
            key = args.kty.load(args.key.read()).public_key()
            args.key.close()
        else:
            key = None

        sys.stdout.write(sig.payload)
        return not sig.verify(key=key)
项目:botoflow    作者:boto    | 项目源码 | 文件源码
def single_poll(self, next_page_token=None):
        poll_time = time.time()
        try:
            kwargs = {'domain': self.domain,
                      'taskList': {'name': self.task_list},
                      'identity': self.identity}
            if next_page_token is not None:
                kwargs['nextPageToken'] = next_page_token
            # old botocore throws TypeError when unable to establish SWF connection
            return self.worker.client.poll_for_decision_task(**kwargs)

        except KeyboardInterrupt:
            # sleep before actually exiting as the connection is not yet closed
            # on the other end
            sleep_time = 60 - (time.time() - poll_time)
            six.print_("Exiting in {0}...".format(sleep_time), file=sys.stderr)
            time.sleep(sleep_time)
            raise
项目:klusta    作者:kwikteam    | 项目源码 | 文件源码
def cleanup(self, _warn=False):
        if self.name and not self._closed:
            try:
                self._rmtree(self.name)
            except (TypeError, AttributeError) as ex:
                # Issue #10188: Emit a warning on stderr
                # if the directory could not be cleaned
                # up due to missing globals
                if "None" not in str(ex):
                    raise
                six.print_("ERROR: {!r} while cleaning up {!r}".format(ex,
                                                                       self,),
                           file=_sys.stderr)
                return
            self._closed = True
            if _warn:
                # This should be a ResourceWarning, but it is not available in
                # Python 2.x.
                self._warn("Implicitly cleaning up {!r}".format(self),
                           Warning)
项目:pyomo    作者:Pyomo    | 项目源码 | 文件源码
def print_model_suffixes(model):
    # Six.Print_ all suffix values for all model components in a nice table
    six.print_("\t",end='')
    for name,suffix in active_import_suffix_generator(model):
            six.print_("%10s" % (name),end='')
    six.print_("")
    for i in model.s:
        six.print_(model.x[i].name+"\t",end='')
        for name,suffix in active_import_suffix_generator(model):
            six.print_("%10s" % (suffix.get(model.x[i])),end='')
        six.print_("")
    for i in model.s:
        six.print_(model.con[i].name+"\t",end='')
        for name,suffix in active_import_suffix_generator(model):
            six.print_("%10s" % (suffix.get(model.con[i])),end='')
        six.print_("")
    six.print_(model.obj.name+"\t",end='')
    for name,suffix in active_import_suffix_generator(model):
        six.print_("%10s" % (suffix.get(model.obj)),end='')
    print("")
    print("")
项目:rqalpha-mod-futu    作者:FutunnOpen    | 项目源码 | 文件源码
def on_recv_rsp(self, rsp_str):
        ret_code, ret_data = super(DataCache, self).on_recv_rsp(rsp_str)
        if ret_code == RET_ERROR or isinstance(ret_data, str):
            six.print_(_(u"push kline data error:{bar_data}").format(ret_data=ret_data))
        else:
            if ret_data.empty:
                self._cache['cur_kline'] = {}
            else:
                bar_data = ret_data.iloc[-1:].copy()
                del bar_data['code'], bar_data['k_type']  # ????????????
                for i in range(len(bar_data['time_key'])):  # ????
                    bar_data.loc[i, 'time_key'] = int(
                        bar_data['time_key'][i].replace('-', '').replace(' ', '').replace(':', ''))

                bar_data.rename(columns={'time_key': 'datetime', 'turnover': 'total_turnover'},
                                inplace=True)  # ??????????
                bar_data['volume'] = bar_data['volume'].astype('float64')  # ???????????float

                self._cache['cur_kline'][ret_data['code'][0]]=bar_data
                return ret_code, self._cache['cur_kline'][ret_data['code'][0]]
项目:errcron    作者:attakei    | 项目源码 | 文件源码
def print_datetime(plugin, polled_time):
    six.print_(polled_time.strftime('%Y-%m-%d'), end='')
项目:errcron    作者:attakei    | 项目源码 | 文件源码
def print_datetime_with_str(plugin, polled_time, prefix):
    six.print_(prefix + polled_time.strftime('%Y-%m-%d'), end='')
项目:errcron    作者:attakei    | 项目源码 | 文件源码
def test_activate_instance_method(capsys):
    class ActivateImpl(MockedImpl):
        CRONTAB = [
            '0 0 * * * .print_datetime',
        ]

        def print_datetime(self, polled_time):
            six.print_(polled_time.strftime('%Y-%m-%d'), end='')

    plugin = ActivateImpl()
    plugin.activate()
    with freeze_time('2016-01-01 00:00:01'):
        plugin.poll_crontab()
        out, err = capsys.readouterr()
        assert out == '2016-01-01'
项目:psyplot    作者:Chilipp    | 项目源码 | 文件源码
def show_plot_methods(self):
        """Print the plotmethods of this instance"""
        print_func = PlotterInterface._print_func
        if print_func is None:
            print_func = six.print_
        s = "\n".join(
            "%s\n    %s" % t for t in six.iteritems(self._plot_methods))
        return print_func(s)
项目:bistiming    作者:ianlini    | 项目源码 | 文件源码
def __init__(self, description="", logger=None, logging_level=logging.INFO,
                 verbose_start=True, verbose_end=True, end_in_new_line=True,
                 prefix="..."):
        if logger is not None:
            self.log = partial(logger.log, logging_level)
        else:
            self.log = six.print_
        self.description = prefix + description
        self.verbose_start = verbose_start
        self.verbose_end = verbose_end
        self.end_in_new_line = end_in_new_line
        self.start_time = None
        self.end_time = None
        self.elapsed_time = None
项目:ci-diff-helper    作者:dhermes    | 项目源码 | 文件源码
def _rate_limit_info(response):
    """Print response rate limit information to stderr.

    Args:
        response (requests.Response): A GitHub API response.
    """
    remaining = response.headers.get(_RATE_REMAINING_HEADER)
    rate_limit = response.headers.get(_RATE_LIMIT_HEADER)
    rate_reset = response.headers.get(_RATE_RESET_HEADER)
    msg = _RATE_LIMIT_TEMPLATE.format(remaining, rate_limit, rate_reset)
    six.print_(msg, file=sys.stderr)
    six.print_(_GH_ENV_VAR_MSG, file=sys.stderr)
项目:loman    作者:janusassetallocation    | 项目源码 | 文件源码
def print_errors(self):
        """
        Print tracebacks for every node with state "ERROR" in a Computation 
        """
        for n in self.nodes():
            if self.s[n] == States.ERROR:
                six.print_("{}".format(n))
                six.print_("=" * len(n))
                six.print_()
                six.print_(self.v[n].traceback)
                six.print_()
项目:EasyStorj    作者:lakewik    | 项目源码 | 文件源码
def determine_shard_size(self, file_size, accumulator):

        # Based on <https://github.com/aleitner/shard-size-calculator/blob/master/src/shard_size.c>

        hops = 0

        if (file_size <= 0):
            return 0
            # if accumulator != True:
            # accumulator  = 0
        self.__logger.debug(accumulator)

        # Determine hops back by accumulator
        if ((accumulator - self.SHARD_MULTIPLES_BACK) < 0):
            hops = 0
        else:
            hops = accumulator - self.SHARD_MULTIPLES_BACK

        # accumulator = 10
        byte_multiple = self.shard_size(accumulator)

        check = file_size / byte_multiple
        # print_(check)
        if (check > 0 and check <= 1):
            while (hops > 0 and self.shard_size(hops) > self.MAX_SHARD_SIZE):
                if hops - 1 <= 0:
                    hops = 0
                else:
                    hops = hops - 1
            return self.shard_size(hops)

        # Maximum of 2 ^ 41 * 8 * 1024 * 1024
        if (accumulator > 41):
            return 0

            # return self.determine_shard_size(file_size, ++accumulator)
项目:lookml-gen    作者:symphonyrm    | 项目源码 | 文件源码
def test_dimension_group():
    testname = 'dimension_group_test'
    v = view.View(testname)
    v.add_field(field.DimensionGroup('dimension1', sql='${TABLE}.dim1'))
    f = six.StringIO()
    v.generate_lookml(f, format_options=test_format_options)
    lookml = f.getvalue()
    six.print_(lookml)
    with open(os.path.join(os.path.dirname(__file__),
                           'expected_output/%s.lkml' % testname),
              'rt') as expected:
        assert lookml == expected.read()
项目:lookml-gen    作者:symphonyrm    | 项目源码 | 文件源码
def test_dimension_group_no_timeframes():
    testname = 'dimension_group_no_timeframes_test'
    v = view.View(testname)
    v.add_field(field.DimensionGroup('dimension1', sql='${TABLE}.dim1'))
    f = six.StringIO()
    fo_omit_timeframes = base_generator.\
        GeneratorFormatOptions(warning_header_comment=None, omit_time_frames_if_not_set=True)
    v.generate_lookml(f, format_options=fo_omit_timeframes)
    lookml = f.getvalue()
    six.print_(lookml)
    with open(os.path.join(os.path.dirname(__file__),
                           'expected_output/%s.lkml' % testname),
              'rt') as expected:
        assert lookml == expected.read()
项目:lookml-gen    作者:symphonyrm    | 项目源码 | 文件源码
def test_newlines():
    testname = 'newlines_test'
    v = view.View(testname)
    for l in ['a', 'b', 'c', 'd']:
        v.add_field(field.Dimension(l, type='number'))
        v.add_field(field.Measure('sum_' + l, type='sum', sql='${{{0}}}'.format(l)))

    f = six.StringIO()
    v.generate_lookml(f, format_options=test_format_options)
    lookml = f.getvalue()
    six.print_(lookml)
    with open(os.path.join(os.path.dirname(__file__),
                           'expected_output/%s.lkml' % testname),
              'rt') as expected:
        assert lookml == expected.read()
项目:tailchaser    作者:thanos    | 项目源码 | 文件源码
def cmp_file(src_file, dst_file):
    six.print_('testing: ', src_file, dst_file)
    assert (Tailer.file_opener(src_file, 'rb').read() == Tailer.file_opener(dst_file, 'rb').read())
项目:tailchaser    作者:thanos    | 项目源码 | 文件源码
def emit(self, count):
        six.print_(count)
项目:tailchaser    作者:thanos    | 项目源码 | 文件源码
def doRollover(self):
        time.sleep(self.ROLL_DELAY)
        self.rolls += 1
        six.print_('rolls', self.rolls)
        return super(RotatingWithDelayFileHandler, self).doRollover()
项目:tailchaser    作者:thanos    | 项目源码 | 文件源码
def test_filter():
    work_dir = tempfile.mkdtemp(prefix='tail-test_filter-tail_from_dir')
    six.print_('generating log files', work_dir)
    test_file = os.path.join(work_dir, 'test.log')
    with open(test_file, 'wb') as file_to_tail:
        file_name = file_to_tail.name
        six.print_('file to tail with filter', file_to_tail)
        for i in range(1000):
            if i % 2 == 0:
                line = "odd: %d\n" % i
            else:
                line = "even: %d\n" % i
            file_to_tail.write(line)

    def consumer_gen():
        global filter_count
        while True:
            record = yield ()
            filter_count += 1

    consumer = consumer_gen()
    consumer.send(None)
    vargs = [__name__, '--only-backfill', '--clear-checkpoint', '--filter-re=odd:\\s*\\d+']
    vargs.append(test_file)
    main(vargs, consumer)
    assert (500 == filter_count)
项目:tailchaser    作者:thanos    | 项目源码 | 文件源码
def test_backfill(log_handler=RotatingWithDelayFileHandler, consumer=None, tail_to_dir=None, vargs=None):
    tail_from_dir = tempfile.mkdtemp(prefix='tail-test_backfill-tail_from_dir')
    six.print_('generating log files', tail_from_dir)
    log_handler.generate(os.path.join(tail_from_dir, 'test.log'), BACKFILL_EMITS)
    if not tail_to_dir:
        tail_to_dir = tempfile.mkdtemp(prefix='tail-test_backfill-tail_to_dir')

    if not consumer:
        def consumer_gen():
            while True:
                record = yield ()
                open(os.path.join(tail_to_dir, record[1][0]), 'ab').write(record[2])

        consumer = consumer_gen()
        consumer.send(None)
    six.print_('start tailer', tail_to_dir)
    source_pattern = os.path.join(tail_from_dir, '*')
    if not vargs:
        vargs = [__name__, '--only-backfill', '--clear-checkpoint']
    vargs.append(source_pattern)
    main(vargs, consumer)
    cmp_files(source_pattern, tail_to_dir, lambda x: Tailer.make_sig(x))
    six.print_('all done', tail_to_dir)
    # for src_file_path in glob.glob(source_pattern):
    #     dst_file_path = os.path.join(tail_to_dir, Tailer.make_sig(src_file_path))
    #     six.print_("testing:", src_file_path, dst_file_path)
    #     assert (Tailer.file_opener(src_file_path).read() == Tailer.file_opener(dst_file_path).read())
项目:jira_reporting_scripts    作者:andrew-hamlin-sp    | 项目源码 | 文件源码
def eprint (msg, *args, **kwargs):
        '''print args to stderr'''
        #print_(text_type(msg).encode(), file=sys.stderr, **kwargs)
        print_(msg, file=sys.stderr, **kwargs)
项目:Particle-Picking-Cryo-EM    作者:hqythu    | 项目源码 | 文件源码
def main():
    six.print_('loading data')
    train_x, train_y, val_x, val_y = load_data()
    train_x = train_x.reshape(-1, 64 * 64)
    val_x = val_x.reshape(-1, 64 * 64)
    six.print_('load data complete')

    six.print_('start PCA')
    try:
        pca = pickle.load(open('pca.pickle', 'rb'))
    except:
        pca = decomposition.PCA(n_components=8*8)
        pca.fit(train_x[:])
    train_x = pca.transform(train_x)
    six.print_('PCA complete')

    clf = SVC(C=0.0001, kernel='linear', verbose=True, max_iter=100)
    six.print_('start training')
    clf.fit(train_x, train_y)
    six.print_('training complete')

    val_x = pca.transform(val_x)
    acc = sum(val_y == clf.predict(val_x)) / float(len(val_y))
    print(acc)

    pickle.dump(pca, open('pca.pickle', 'wb'))
    pickle.dump(clf, open('svm.pickle', 'wb'))
项目:Particle-Picking-Cryo-EM    作者:hqythu    | 项目源码 | 文件源码
def main():
    parser = argparse.ArgumentParser(description='Train a neural network')

    parser.add_argument('--model', type=str)
    parser.add_argument('--lr', type=float, default=0.001)
    parser.add_argument('--decay', type=float, default=1e-4)
    parser.add_argument('--momentum', type=float, default=0.9)
    parser.add_argument('--batch', type=int, default=128)
    parser.add_argument('--epoch', type=int, default=100)
    parser.add_argument('--output', type=str, default='weight')
    args = parser.parse_args()

    model = importlib.import_module(args.model).build()

    six.print_('loading data')
    (train_x, train_y, val_x, val_y) = load_data()
    six.print_('load data complete')

    sgd = SGD(lr=args.lr,
              decay=args.decay,
              momentum=args.momentum,
              nesterov=True)
    model.compile(loss='binary_crossentropy', optimizer=sgd,
                  metrics=['accuracy'])
    six.print_('build model complete')

    six.print_('start training')
    model.fit(train_x, train_y, batch_size=args.batch, nb_epoch=args.epoch,
              verbose=2,
              shuffle=True,
              validation_data=(val_x, val_y))
    model.save_weights(args.output + '.hdf5')
项目:botoflow    作者:boto    | 项目源码 | 文件源码
def join(self):
        """Will wait till all the processes are terminated
        """
        try:
            self._process_queue.join()
        except KeyboardInterrupt:
            six.print_("\nTerminating, please wait...", file=sys.stderr)
            self._process_queue.join()
        self._running = False
项目:obsoleted-vpduserv    作者:InfraSIM    | 项目源码 | 文件源码
def test_print_():
    save = sys.stdout
    out = sys.stdout = six.moves.StringIO()
    try:
        six.print_("Hello,", "person!")
    finally:
        sys.stdout = save
    assert out.getvalue() == "Hello, person!\n"
    out = six.StringIO()
    six.print_("Hello,", "person!", file=out)
    assert out.getvalue() == "Hello, person!\n"
    out = six.StringIO()
    six.print_("Hello,", "person!", file=out, end="")
    assert out.getvalue() == "Hello, person!"
    out = six.StringIO()
    six.print_("Hello,", "person!", file=out, sep="X")
    assert out.getvalue() == "Hello,Xperson!\n"
    out = six.StringIO()
    six.print_(six.u("Hello,"), six.u("person!"), file=out)
    result = out.getvalue()
    assert isinstance(result, six.text_type)
    assert result == six.u("Hello, person!\n")
    six.print_("Hello", file=None) # This works.
    out = six.StringIO()
    six.print_(None, file=out)
    assert out.getvalue() == "None\n"
    class FlushableStringIO(six.StringIO):
        def __init__(self):
            six.StringIO.__init__(self)
            self.flushed = False
        def flush(self):
            self.flushed = True
    out = FlushableStringIO()
    six.print_("Hello", file=out)
    assert not out.flushed
    six.print_("Hello", file=out, flush=True)
    assert out.flushed
项目:obsoleted-vpduserv    作者:InfraSIM    | 项目源码 | 文件源码
def test_print_encoding(monkeypatch):
    # Fool the type checking in print_.
    monkeypatch.setattr(six, "file", six.BytesIO, raising=False)
    out = six.BytesIO()
    out.encoding = "utf-8"
    out.errors = None
    six.print_(six.u("\u053c"), end="", file=out)
    assert out.getvalue() == six.b("\xd4\xbc")
    out = six.BytesIO()
    out.encoding = "ascii"
    out.errors = "strict"
    py.test.raises(UnicodeEncodeError, six.print_, six.u("\u053c"), file=out)
    out.errors = "backslashreplace"
    six.print_(six.u("\u053c"), end="", file=out)
    assert out.getvalue() == six.b("\\u053c")
项目:obsoleted-vpduserv    作者:InfraSIM    | 项目源码 | 文件源码
def test_print_exceptions():
    py.test.raises(TypeError, six.print_, x=3)
    py.test.raises(TypeError, six.print_, end=3)
    py.test.raises(TypeError, six.print_, sep=42)
项目:humor    作者:micyril    | 项目源码 | 文件源码
def read_ids(filename):
    try:
        with open(filename) as f:
            return [int(id) for id in f.read().split()]
    except IOError as e:
        six.print_("Can't read the file", filename)
        exit(1)
项目:humor    作者:micyril    | 项目源码 | 文件源码
def retrieve_statuses(api, ids_portion):
    try:
        return api.statuses_lookup(ids_portion)
    except tweepy.error.RateLimitError:
        six.print_("\nRate limit has been achieved. Waiting...")
        while True:
            try:
                return api.statuses_lookup(ids_portion)
            except tweepy.error.RateLimitError:
                time.sleep(30)
    except tweepy.error.TweepError as e:
        six.print_("Failed to look up:", str(e))
        exit(1)
项目:Deploy_XXNET_Server    作者:jzp820927    | 项目源码 | 文件源码
def test_print_():
    save = sys.stdout
    out = sys.stdout = six.moves.StringIO()
    try:
        six.print_("Hello,", "person!")
    finally:
        sys.stdout = save
    assert out.getvalue() == "Hello, person!\n"
    out = six.StringIO()
    six.print_("Hello,", "person!", file=out)
    assert out.getvalue() == "Hello, person!\n"
    out = six.StringIO()
    six.print_("Hello,", "person!", file=out, end="")
    assert out.getvalue() == "Hello, person!"
    out = six.StringIO()
    six.print_("Hello,", "person!", file=out, sep="X")
    assert out.getvalue() == "Hello,Xperson!\n"
    out = six.StringIO()
    six.print_(six.u("Hello,"), six.u("person!"), file=out)
    result = out.getvalue()
    assert isinstance(result, six.text_type)
    assert result == six.u("Hello, person!\n")
    six.print_("Hello", file=None) # This works.
    out = six.StringIO()
    six.print_(None, file=out)
    assert out.getvalue() == "None\n"
    class FlushableStringIO(six.StringIO):
        def __init__(self):
            six.StringIO.__init__(self)
            self.flushed = False
        def flush(self):
            self.flushed = True
    out = FlushableStringIO()
    six.print_("Hello", file=out)
    assert not out.flushed
    six.print_("Hello", file=out, flush=True)
    assert out.flushed
项目:Deploy_XXNET_Server    作者:jzp820927    | 项目源码 | 文件源码
def test_print_encoding(monkeypatch):
    # Fool the type checking in print_.
    monkeypatch.setattr(six, "file", six.BytesIO, raising=False)
    out = six.BytesIO()
    out.encoding = "utf-8"
    out.errors = None
    six.print_(six.u("\u053c"), end="", file=out)
    assert out.getvalue() == six.b("\xd4\xbc")
    out = six.BytesIO()
    out.encoding = "ascii"
    out.errors = "strict"
    py.test.raises(UnicodeEncodeError, six.print_, six.u("\u053c"), file=out)
    out.errors = "backslashreplace"
    six.print_(six.u("\u053c"), end="", file=out)
    assert out.getvalue() == six.b("\\u053c")
项目:Deploy_XXNET_Server    作者:jzp820927    | 项目源码 | 文件源码
def test_print_exceptions():
    py.test.raises(TypeError, six.print_, x=3)
    py.test.raises(TypeError, six.print_, end=3)
    py.test.raises(TypeError, six.print_, sep=42)
项目:pytablewriter    作者:thombashi    | 项目源码 | 文件源码
def simple_write_callback(iter_count, iteration_length):
    import six

    six.print_("{:d}/{:d}".format(iter_count, iteration_length))
项目:rqalpha    作者:ricequant    | 项目源码 | 文件源码
def examples(directory):
    """
    Generate example strategies to target folder
    """
    source_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "examples")

    try:
        shutil.copytree(source_dir, os.path.join(directory, "examples"))
    except OSError as e:
        if e.errno == errno.EEXIST:
            six.print_("Folder examples is exists.")
项目:rqalpha    作者:ricequant    | 项目源码 | 文件源码
def version(**kwargs):
    """
    Output Version Info
    """
    from rqalpha import version_info
    six.print_("Current Version: ", version_info)
项目:rqalpha    作者:ricequant    | 项目源码 | 文件源码
def generate_config(directory):
    """
    Generate default config file
    """
    default_config = os.path.join(os.path.dirname(os.path.realpath(__file__)), "config.yml")
    target_config_path = os.path.abspath(os.path.join(directory, 'config.yml'))
    shutil.copy(default_config, target_config_path)
    six.print_("Config file has been generated in", target_config_path)


# For Mod Cli
项目:rqalpha    作者:ricequant    | 项目源码 | 文件源码
def output_profile_result(env):
    stdout_trap = six.StringIO()
    env.profile_deco.print_stats(stdout_trap)
    profile_output = stdout_trap.getvalue()
    profile_output = profile_output.rstrip()
    six.print_(profile_output)
    env.event_bus.publish_event(Event(EVENT.ON_LINE_PROFILER_RESULT, result=profile_output))
项目:rqalpha    作者:ricequant    | 项目源码 | 文件源码
def get_bars(self, order_book_id, fields=None):
        try:
            s, e = self._index[order_book_id]
        except KeyError:
            six.print_(_(u"No data for {}").format(order_book_id))
            return

        if fields is None:
            # the first is date
            fields = self._table.names[1:]

        if len(fields) == 1:
            return self._converter.convert(fields[0], self._table.cols[fields[0]][s:e])

        # remove datetime if exist in fields
        self._remove_(fields, 'datetime')

        dtype = np.dtype([('datetime', np.uint64)] +
                         [(f, self._converter.field_type(f, self._table.cols[f].dtype))
                          for f in fields])
        result = np.empty(shape=(e - s, ), dtype=dtype)
        for f in fields:
            result[f] = self._converter.convert(f, self._table.cols[f][s:e])
        result['datetime'] = self._table.cols['date'][s:e]
        result['datetime'] *= 1000000

        return result