我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用six.print_()。
def test_timezone_in_plugin(capsys): class ActivateImpl(MockedImpl): TIMEZONE = 'Asia/Tokyo' CRONTAB = [ '0 0 * * * .print_datetime', ] def activate(self): self.activate_crontab() def print_datetime(self, polled_time): six.print_(polled_time.strftime('%Y-%m-%d'), end='') plugin = ActivateImpl() plugin.activate() with freeze_time('2016-01-01 00:00:01'): plugin.poll_crontab() out, err = capsys.readouterr() assert out != '2016-01-01'
def test_timezone_in_config(capsys): class MockConfig(object): TIMEZONE = 'Asia/Tokyo' class ActivateImpl(MockedImpl): CRONTAB = [ '0 0 * * * .print_datetime', ] def print_datetime(self, polled_time): six.print_(polled_time.strftime('%Y-%m-%d'), end='') plugin = ActivateImpl() plugin.activate() setattr(plugin, 'bot_config', MockConfig()) with freeze_time('2016-01-01 00:00:01'): plugin.poll_crontab() out, err = capsys.readouterr() assert out != '2016-01-01'
def echo(args, file=None): ''' Echo a list of arguments (as given to ``subprocess.call``) to the given stream. This defaults to ``stdout``, but can be changed to any stream-like object such as a file handle. :param args: A string or list of strings :param file: A file-like object to stream output to. Defaults to ``sys.stdout`` ''' if file is None: file = sys.stdout if isinstance(args, six.string_types + (six.text_type,)): args = [args] six.print_(*args, file=file, flush=True)
def sign(cls, args): """Sign.""" key = args.alg.kty.load(args.key.read()) args.key.close() if args.protect is None: args.protect = [] if args.compact: args.protect.append('alg') sig = JWS.sign(payload=sys.stdin.read().encode(), key=key, alg=args.alg, protect=set(args.protect)) if args.compact: six.print_(sig.to_compact().decode('utf-8')) else: # JSON six.print_(sig.json_dumps_pretty())
def verify(cls, args): """Verify.""" if args.compact: sig = JWS.from_compact(sys.stdin.read().encode()) else: # JSON try: sig = JWS.json_loads(sys.stdin.read()) except errors.Error as error: six.print_(error) return -1 if args.key is not None: assert args.kty is not None key = args.kty.load(args.key.read()).public_key() args.key.close() else: key = None sys.stdout.write(sig.payload) return not sig.verify(key=key)
def _ingest_pairs(self, pairs, oid2nid, frame_size, limit, single_sided): oid2nid_v = np.vectorize(oid2nid.get) # whole pairs set does not fit in memory, so split it in frames with `frame_size` number of pairs. for start in range(0, limit, frame_size): stop = frame_size + start t1 = process_time() six.print_('Fetching pairs {0}:{1} of {2} ... '.format(start, stop, limit), end='', flush=True) raw_frame = pairs.read(start=start, stop=stop) t2 = process_time() six.print_('{0}s, Parsing ... '.format(int(t2 - t1)), flush=True) frame = self._translate_frame(raw_frame, oid2nid_v, single_sided) t3 = process_time() six.print_('Writing ... '.format(int(t3 - t2)), flush=True) # alternate direction, to make use of cached chunks of prev frame self._ingest_pairs_frame(frame) del frame t4 = process_time() six.print_('{0}s, Done with {1}:{2} in {3}s'.format(int(t4 - t3), start, stop, int(t4 - t1)), flush=True)
def to_pairs(self, pairs): """Copies labels and scores from self to pairs matrix. Args: pairs (SimilarityMatrix): """ six.print_('copy labels', flush=True) self.build_label_cache() pairs.labels.update(self.cache_l2i) six.print_('copy matrix to pairs', flush=True) limit = self.scores.shape[0] bar = ProgressBar() for query_id in bar(six.moves.range(0, limit)): subjects = self.scores[query_id, ...] filled_subjects_ids = subjects.nonzero()[0] filled_subjects = [(query_id, i, subjects[i]) for i in filled_subjects_ids if query_id < i] if filled_subjects: pairs.pairs.table.append(filled_subjects)
def test_pdt_view(): testname = 'pdt_view' pdt = view.DerivedTable(sql="SELECT id, count(*) c FROM table GROUP BY id", sql_trigger_value='DATE()', indexes=['id']) v = view.View(testname) v.derived_table = pdt v.add_field(field.Dimension('id', type='number', primary_key=True)) v.add_field(field.Dimension('c', type='number')) v.add_field(field.Measure('sum_c', sql='${TABLE}.c', type='sum')) f = six.StringIO() v.generate_lookml(f, format_options=test_format_options) lookml = f.getvalue() six.print_(lookml) with open(os.path.join(os.path.dirname(__file__), 'expected_output/%s.lkml' % testname), 'rt') as expected: assert lookml == expected.read()
def single_poll(self, next_page_token=None): poll_time = time.time() try: kwargs = {'domain': self.domain, 'taskList': {'name': self.task_list}, 'identity': self.identity} if next_page_token is not None: kwargs['nextPageToken'] = next_page_token # old botocore throws TypeError when unable to establish SWF connection return self.worker.client.poll_for_decision_task(**kwargs) except KeyboardInterrupt: # sleep before actually exiting as the connection is not yet closed # on the other end sleep_time = 60 - (time.time() - poll_time) six.print_("Exiting in {0}...".format(sleep_time), file=sys.stderr) time.sleep(sleep_time) raise
def cleanup(self, _warn=False): if self.name and not self._closed: try: self._rmtree(self.name) except (TypeError, AttributeError) as ex: # Issue #10188: Emit a warning on stderr # if the directory could not be cleaned # up due to missing globals if "None" not in str(ex): raise six.print_("ERROR: {!r} while cleaning up {!r}".format(ex, self,), file=_sys.stderr) return self._closed = True if _warn: # This should be a ResourceWarning, but it is not available in # Python 2.x. self._warn("Implicitly cleaning up {!r}".format(self), Warning)
def print_model_suffixes(model): # Six.Print_ all suffix values for all model components in a nice table six.print_("\t",end='') for name,suffix in active_import_suffix_generator(model): six.print_("%10s" % (name),end='') six.print_("") for i in model.s: six.print_(model.x[i].name+"\t",end='') for name,suffix in active_import_suffix_generator(model): six.print_("%10s" % (suffix.get(model.x[i])),end='') six.print_("") for i in model.s: six.print_(model.con[i].name+"\t",end='') for name,suffix in active_import_suffix_generator(model): six.print_("%10s" % (suffix.get(model.con[i])),end='') six.print_("") six.print_(model.obj.name+"\t",end='') for name,suffix in active_import_suffix_generator(model): six.print_("%10s" % (suffix.get(model.obj)),end='') print("") print("")
def on_recv_rsp(self, rsp_str): ret_code, ret_data = super(DataCache, self).on_recv_rsp(rsp_str) if ret_code == RET_ERROR or isinstance(ret_data, str): six.print_(_(u"push kline data error:{bar_data}").format(ret_data=ret_data)) else: if ret_data.empty: self._cache['cur_kline'] = {} else: bar_data = ret_data.iloc[-1:].copy() del bar_data['code'], bar_data['k_type'] # ???????????? for i in range(len(bar_data['time_key'])): # ???? bar_data.loc[i, 'time_key'] = int( bar_data['time_key'][i].replace('-', '').replace(' ', '').replace(':', '')) bar_data.rename(columns={'time_key': 'datetime', 'turnover': 'total_turnover'}, inplace=True) # ?????????? bar_data['volume'] = bar_data['volume'].astype('float64') # ???????????float self._cache['cur_kline'][ret_data['code'][0]]=bar_data return ret_code, self._cache['cur_kline'][ret_data['code'][0]]
def print_datetime(plugin, polled_time): six.print_(polled_time.strftime('%Y-%m-%d'), end='')
def print_datetime_with_str(plugin, polled_time, prefix): six.print_(prefix + polled_time.strftime('%Y-%m-%d'), end='')
def test_activate_instance_method(capsys): class ActivateImpl(MockedImpl): CRONTAB = [ '0 0 * * * .print_datetime', ] def print_datetime(self, polled_time): six.print_(polled_time.strftime('%Y-%m-%d'), end='') plugin = ActivateImpl() plugin.activate() with freeze_time('2016-01-01 00:00:01'): plugin.poll_crontab() out, err = capsys.readouterr() assert out == '2016-01-01'
def show_plot_methods(self): """Print the plotmethods of this instance""" print_func = PlotterInterface._print_func if print_func is None: print_func = six.print_ s = "\n".join( "%s\n %s" % t for t in six.iteritems(self._plot_methods)) return print_func(s)
def __init__(self, description="", logger=None, logging_level=logging.INFO, verbose_start=True, verbose_end=True, end_in_new_line=True, prefix="..."): if logger is not None: self.log = partial(logger.log, logging_level) else: self.log = six.print_ self.description = prefix + description self.verbose_start = verbose_start self.verbose_end = verbose_end self.end_in_new_line = end_in_new_line self.start_time = None self.end_time = None self.elapsed_time = None
def _rate_limit_info(response): """Print response rate limit information to stderr. Args: response (requests.Response): A GitHub API response. """ remaining = response.headers.get(_RATE_REMAINING_HEADER) rate_limit = response.headers.get(_RATE_LIMIT_HEADER) rate_reset = response.headers.get(_RATE_RESET_HEADER) msg = _RATE_LIMIT_TEMPLATE.format(remaining, rate_limit, rate_reset) six.print_(msg, file=sys.stderr) six.print_(_GH_ENV_VAR_MSG, file=sys.stderr)
def print_errors(self): """ Print tracebacks for every node with state "ERROR" in a Computation """ for n in self.nodes(): if self.s[n] == States.ERROR: six.print_("{}".format(n)) six.print_("=" * len(n)) six.print_() six.print_(self.v[n].traceback) six.print_()
def determine_shard_size(self, file_size, accumulator): # Based on <https://github.com/aleitner/shard-size-calculator/blob/master/src/shard_size.c> hops = 0 if (file_size <= 0): return 0 # if accumulator != True: # accumulator = 0 self.__logger.debug(accumulator) # Determine hops back by accumulator if ((accumulator - self.SHARD_MULTIPLES_BACK) < 0): hops = 0 else: hops = accumulator - self.SHARD_MULTIPLES_BACK # accumulator = 10 byte_multiple = self.shard_size(accumulator) check = file_size / byte_multiple # print_(check) if (check > 0 and check <= 1): while (hops > 0 and self.shard_size(hops) > self.MAX_SHARD_SIZE): if hops - 1 <= 0: hops = 0 else: hops = hops - 1 return self.shard_size(hops) # Maximum of 2 ^ 41 * 8 * 1024 * 1024 if (accumulator > 41): return 0 # return self.determine_shard_size(file_size, ++accumulator)
def test_dimension_group(): testname = 'dimension_group_test' v = view.View(testname) v.add_field(field.DimensionGroup('dimension1', sql='${TABLE}.dim1')) f = six.StringIO() v.generate_lookml(f, format_options=test_format_options) lookml = f.getvalue() six.print_(lookml) with open(os.path.join(os.path.dirname(__file__), 'expected_output/%s.lkml' % testname), 'rt') as expected: assert lookml == expected.read()
def test_dimension_group_no_timeframes(): testname = 'dimension_group_no_timeframes_test' v = view.View(testname) v.add_field(field.DimensionGroup('dimension1', sql='${TABLE}.dim1')) f = six.StringIO() fo_omit_timeframes = base_generator.\ GeneratorFormatOptions(warning_header_comment=None, omit_time_frames_if_not_set=True) v.generate_lookml(f, format_options=fo_omit_timeframes) lookml = f.getvalue() six.print_(lookml) with open(os.path.join(os.path.dirname(__file__), 'expected_output/%s.lkml' % testname), 'rt') as expected: assert lookml == expected.read()
def test_newlines(): testname = 'newlines_test' v = view.View(testname) for l in ['a', 'b', 'c', 'd']: v.add_field(field.Dimension(l, type='number')) v.add_field(field.Measure('sum_' + l, type='sum', sql='${{{0}}}'.format(l))) f = six.StringIO() v.generate_lookml(f, format_options=test_format_options) lookml = f.getvalue() six.print_(lookml) with open(os.path.join(os.path.dirname(__file__), 'expected_output/%s.lkml' % testname), 'rt') as expected: assert lookml == expected.read()
def cmp_file(src_file, dst_file): six.print_('testing: ', src_file, dst_file) assert (Tailer.file_opener(src_file, 'rb').read() == Tailer.file_opener(dst_file, 'rb').read())
def emit(self, count): six.print_(count)
def doRollover(self): time.sleep(self.ROLL_DELAY) self.rolls += 1 six.print_('rolls', self.rolls) return super(RotatingWithDelayFileHandler, self).doRollover()
def test_filter(): work_dir = tempfile.mkdtemp(prefix='tail-test_filter-tail_from_dir') six.print_('generating log files', work_dir) test_file = os.path.join(work_dir, 'test.log') with open(test_file, 'wb') as file_to_tail: file_name = file_to_tail.name six.print_('file to tail with filter', file_to_tail) for i in range(1000): if i % 2 == 0: line = "odd: %d\n" % i else: line = "even: %d\n" % i file_to_tail.write(line) def consumer_gen(): global filter_count while True: record = yield () filter_count += 1 consumer = consumer_gen() consumer.send(None) vargs = [__name__, '--only-backfill', '--clear-checkpoint', '--filter-re=odd:\\s*\\d+'] vargs.append(test_file) main(vargs, consumer) assert (500 == filter_count)
def test_backfill(log_handler=RotatingWithDelayFileHandler, consumer=None, tail_to_dir=None, vargs=None): tail_from_dir = tempfile.mkdtemp(prefix='tail-test_backfill-tail_from_dir') six.print_('generating log files', tail_from_dir) log_handler.generate(os.path.join(tail_from_dir, 'test.log'), BACKFILL_EMITS) if not tail_to_dir: tail_to_dir = tempfile.mkdtemp(prefix='tail-test_backfill-tail_to_dir') if not consumer: def consumer_gen(): while True: record = yield () open(os.path.join(tail_to_dir, record[1][0]), 'ab').write(record[2]) consumer = consumer_gen() consumer.send(None) six.print_('start tailer', tail_to_dir) source_pattern = os.path.join(tail_from_dir, '*') if not vargs: vargs = [__name__, '--only-backfill', '--clear-checkpoint'] vargs.append(source_pattern) main(vargs, consumer) cmp_files(source_pattern, tail_to_dir, lambda x: Tailer.make_sig(x)) six.print_('all done', tail_to_dir) # for src_file_path in glob.glob(source_pattern): # dst_file_path = os.path.join(tail_to_dir, Tailer.make_sig(src_file_path)) # six.print_("testing:", src_file_path, dst_file_path) # assert (Tailer.file_opener(src_file_path).read() == Tailer.file_opener(dst_file_path).read())
def eprint (msg, *args, **kwargs): '''print args to stderr''' #print_(text_type(msg).encode(), file=sys.stderr, **kwargs) print_(msg, file=sys.stderr, **kwargs)
def main(): six.print_('loading data') train_x, train_y, val_x, val_y = load_data() train_x = train_x.reshape(-1, 64 * 64) val_x = val_x.reshape(-1, 64 * 64) six.print_('load data complete') six.print_('start PCA') try: pca = pickle.load(open('pca.pickle', 'rb')) except: pca = decomposition.PCA(n_components=8*8) pca.fit(train_x[:]) train_x = pca.transform(train_x) six.print_('PCA complete') clf = SVC(C=0.0001, kernel='linear', verbose=True, max_iter=100) six.print_('start training') clf.fit(train_x, train_y) six.print_('training complete') val_x = pca.transform(val_x) acc = sum(val_y == clf.predict(val_x)) / float(len(val_y)) print(acc) pickle.dump(pca, open('pca.pickle', 'wb')) pickle.dump(clf, open('svm.pickle', 'wb'))
def main(): parser = argparse.ArgumentParser(description='Train a neural network') parser.add_argument('--model', type=str) parser.add_argument('--lr', type=float, default=0.001) parser.add_argument('--decay', type=float, default=1e-4) parser.add_argument('--momentum', type=float, default=0.9) parser.add_argument('--batch', type=int, default=128) parser.add_argument('--epoch', type=int, default=100) parser.add_argument('--output', type=str, default='weight') args = parser.parse_args() model = importlib.import_module(args.model).build() six.print_('loading data') (train_x, train_y, val_x, val_y) = load_data() six.print_('load data complete') sgd = SGD(lr=args.lr, decay=args.decay, momentum=args.momentum, nesterov=True) model.compile(loss='binary_crossentropy', optimizer=sgd, metrics=['accuracy']) six.print_('build model complete') six.print_('start training') model.fit(train_x, train_y, batch_size=args.batch, nb_epoch=args.epoch, verbose=2, shuffle=True, validation_data=(val_x, val_y)) model.save_weights(args.output + '.hdf5')
def join(self): """Will wait till all the processes are terminated """ try: self._process_queue.join() except KeyboardInterrupt: six.print_("\nTerminating, please wait...", file=sys.stderr) self._process_queue.join() self._running = False
def test_print_(): save = sys.stdout out = sys.stdout = six.moves.StringIO() try: six.print_("Hello,", "person!") finally: sys.stdout = save assert out.getvalue() == "Hello, person!\n" out = six.StringIO() six.print_("Hello,", "person!", file=out) assert out.getvalue() == "Hello, person!\n" out = six.StringIO() six.print_("Hello,", "person!", file=out, end="") assert out.getvalue() == "Hello, person!" out = six.StringIO() six.print_("Hello,", "person!", file=out, sep="X") assert out.getvalue() == "Hello,Xperson!\n" out = six.StringIO() six.print_(six.u("Hello,"), six.u("person!"), file=out) result = out.getvalue() assert isinstance(result, six.text_type) assert result == six.u("Hello, person!\n") six.print_("Hello", file=None) # This works. out = six.StringIO() six.print_(None, file=out) assert out.getvalue() == "None\n" class FlushableStringIO(six.StringIO): def __init__(self): six.StringIO.__init__(self) self.flushed = False def flush(self): self.flushed = True out = FlushableStringIO() six.print_("Hello", file=out) assert not out.flushed six.print_("Hello", file=out, flush=True) assert out.flushed
def test_print_encoding(monkeypatch): # Fool the type checking in print_. monkeypatch.setattr(six, "file", six.BytesIO, raising=False) out = six.BytesIO() out.encoding = "utf-8" out.errors = None six.print_(six.u("\u053c"), end="", file=out) assert out.getvalue() == six.b("\xd4\xbc") out = six.BytesIO() out.encoding = "ascii" out.errors = "strict" py.test.raises(UnicodeEncodeError, six.print_, six.u("\u053c"), file=out) out.errors = "backslashreplace" six.print_(six.u("\u053c"), end="", file=out) assert out.getvalue() == six.b("\\u053c")
def test_print_exceptions(): py.test.raises(TypeError, six.print_, x=3) py.test.raises(TypeError, six.print_, end=3) py.test.raises(TypeError, six.print_, sep=42)
def read_ids(filename): try: with open(filename) as f: return [int(id) for id in f.read().split()] except IOError as e: six.print_("Can't read the file", filename) exit(1)
def retrieve_statuses(api, ids_portion): try: return api.statuses_lookup(ids_portion) except tweepy.error.RateLimitError: six.print_("\nRate limit has been achieved. Waiting...") while True: try: return api.statuses_lookup(ids_portion) except tweepy.error.RateLimitError: time.sleep(30) except tweepy.error.TweepError as e: six.print_("Failed to look up:", str(e)) exit(1)
def simple_write_callback(iter_count, iteration_length): import six six.print_("{:d}/{:d}".format(iter_count, iteration_length))
def examples(directory): """ Generate example strategies to target folder """ source_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "examples") try: shutil.copytree(source_dir, os.path.join(directory, "examples")) except OSError as e: if e.errno == errno.EEXIST: six.print_("Folder examples is exists.")
def version(**kwargs): """ Output Version Info """ from rqalpha import version_info six.print_("Current Version: ", version_info)
def generate_config(directory): """ Generate default config file """ default_config = os.path.join(os.path.dirname(os.path.realpath(__file__)), "config.yml") target_config_path = os.path.abspath(os.path.join(directory, 'config.yml')) shutil.copy(default_config, target_config_path) six.print_("Config file has been generated in", target_config_path) # For Mod Cli
def output_profile_result(env): stdout_trap = six.StringIO() env.profile_deco.print_stats(stdout_trap) profile_output = stdout_trap.getvalue() profile_output = profile_output.rstrip() six.print_(profile_output) env.event_bus.publish_event(Event(EVENT.ON_LINE_PROFILER_RESULT, result=profile_output))
def get_bars(self, order_book_id, fields=None): try: s, e = self._index[order_book_id] except KeyError: six.print_(_(u"No data for {}").format(order_book_id)) return if fields is None: # the first is date fields = self._table.names[1:] if len(fields) == 1: return self._converter.convert(fields[0], self._table.cols[fields[0]][s:e]) # remove datetime if exist in fields self._remove_(fields, 'datetime') dtype = np.dtype([('datetime', np.uint64)] + [(f, self._converter.field_type(f, self._table.cols[f].dtype)) for f in fields]) result = np.empty(shape=(e - s, ), dtype=dtype) for f in fields: result[f] = self._converter.convert(f, self._table.cols[f][s:e]) result['datetime'] = self._table.cols['date'][s:e] result['datetime'] *= 1000000 return result