我们从Python开源项目中,提取了以下17个代码示例,用于说明如何使用typing.NamedTuple()。
def test_suggestPayout(): MockedReportWrapper = NamedTuple('MockedReportWrapper', [('getReportBody', Callable), ('getReportWeakness', Callable), ('getVulnDomains', Callable)]) MockedReportWrapperXSS = MockedReportWrapper(getReportBody=lambda: 'XSS', getReportWeakness=lambda: 'XSS', getVulnDomains=lambda: []) assert payout.suggestPayout(MockedReportWrapperXSS) == config.payoutDB['xss']['average'] for vulnType in config.payoutDB: for domain in config.payoutDB[vulnType]: MockedReportWrapperVuln = MockedReportWrapper(getReportBody=lambda: vulnType, getReportWeakness=lambda: vulnType, getVulnDomains=lambda: [domain]) assert payout.suggestPayout(MockedReportWrapperVuln) == config.payoutDB[vulnType][domain] MockedReportWrapperNone = MockedReportWrapper(getReportBody=lambda: '', getReportWeakness=lambda: '', getVulnDomains=lambda: []) assert payout.suggestPayout(MockedReportWrapperNone) is None
def __init__(self, name: str, parent_decoder: Decoder, beam_size: int, length_normalization: float, max_steps: int = None, save_checkpoint: str = None, load_checkpoint: str = None) -> None: check_argument_types() ModelPart.__init__(self, name, save_checkpoint, load_checkpoint) self.parent_decoder = parent_decoder self._beam_size = beam_size self._length_normalization = length_normalization # In the n+1th step, outputs of lenght n will be collected # and the n+1th step of decoder (which is discarded) will be executed if max_steps is None: max_steps = parent_decoder.max_output_len self._max_steps = tf.constant(max_steps + 1) self.max_output_len = max_steps # Feedables self._search_state = None # type: SearchState self._decoder_state = None # type: NamedTuple # Output self.outputs = self._decoding_loop()
def cte_constructor(cls, where=None, limit=None, offset=None, name=None) -> CTE: query = cls.Options.db_table.select() if where is not None: query = query.where(where) if limit is not None: query = query.limit(limit) if offset is not None: query = query.offset(offset) name = name or '{}_cte'.format(inflection.tableize(cls.__name__)) return query.cte(name=name) # With NamedTuple we can't use mixins normally, # so use classmethod() call directly
def test_to_message_from_dto(self): """ Test that we can serialize a DTO to a message. """ fields = [('id', None)] FooEvent = message_factory(NamedTuple('FooEvent', fields)) dto = FooEvent(id=1) message = to_message_from_dto(dto) assert message['class'] == 'FooEvent' assert message['data']['id'] == 1
def setup_method(self): self.data = {'id': 1, 'foo': 'bar', 'baz': None} fields = [(k, None) for k in self.data.keys()] self.namedtuple = NamedTuple('FooEvent', fields) self.message = message_factory(self.namedtuple)(**self.data)
def execute(args: typing.NamedTuple): if args.global_flag: create_directory(constants.GLOBAL_INIT_DIRECTORY_DEST) else: create_directory(constants.LOCAL_INIT_DIRECTORY_DEST)
def execute(args: typing.NamedTuple): spark_client = load_spark_client() if args.tail: utils.stream_logs(client=spark_client, cluster_id=args.cluster_id, application_name=args.app_name) else: app_logs = spark_client.get_application_log(cluster_id=args.cluster_id, application_name=args.app_name) print(app_logs.log)
def execute(_: typing.NamedTuple): spark_client = load_spark_client() clusters = spark_client.list_clusters() utils.print_clusters(clusters)
def execute(args: typing.NamedTuple): actions = {} actions[ClusterAction.create] = cluster_create.execute actions[ClusterAction.add_user] = cluster_add_user.execute actions[ClusterAction.delete] = cluster_delete.execute actions[ClusterAction.get] = cluster_get.execute actions[ClusterAction.list] = cluster_list.execute actions[ClusterAction.ssh] = cluster_ssh.execute actions[ClusterAction.submit] = cluster_submit.execute actions[ClusterAction.app_logs] = cluster_app_logs.execute func = actions[args.cluster_action] func(args)
def execute(args: typing.NamedTuple): actions = dict( cluster=cluster.execute, init=init.execute ) func = actions[args.action] func(args)
def execute(args: typing.NamedTuple): spark_client = load_spark_client() cluster_id = args.cluster_id cluster = spark_client.get_cluster(cluster_id) utils.print_cluster(spark_client, cluster)
def execute(args: typing.NamedTuple): spark_client = load_spark_client() log.info('-------------------------------------------') log.info('spark cluster id: {}'.format(args.cluster_id)) log.info('username: {}'.format(args.username)) log.info('-------------------------------------------') if args.ssh_key: ssh_key = args.ssh_key else: ssh_key = spark_client.secrets_config.ssh_pub_key ssh_key, password = utils.get_ssh_key_or_prompt(ssh_key, args.username, args.password, spark_client.secrets_config) spark_client.create_user( cluster_id=args.cluster_id, username=args.username, password=password, ssh_key=ssh_key ) if password: log.info('password: %s', '*' * len(password)) elif ssh_key: log.info('ssh public key: %s', ssh_key) log.info('-------------------------------------------')
def parse_common_args(args: NamedTuple): if args.verbose: logger.setup_logging(True) log.debug("Verbose logging enabled") else: logger.setup_logging(False)
def run_software(args: NamedTuple): softwares = {} softwares[aztk.models.Software.spark] = spark.execute func = softwares[args.software] func(args)
def execute(args: typing.NamedTuple): spark_client = load_spark_client() ssh_conf = SshConfig() ssh_conf.merge( cluster_id=args.cluster_id, username=args.username, job_ui_port=args.jobui, job_history_ui_port=args.jobhistoryui, web_ui_port=args.webui, jupyter_port=args.jupyter, name_node_ui_port=args.namenodeui, rstudio_server_port=args.rstudioserver, host=args.host, connect=args.connect ) http_prefix = 'http://localhost:' log.info("-------------------------------------------") log.info("spark cluster id: %s", ssh_conf.cluster_id) log.info("open webui: %s%s", http_prefix, ssh_conf.web_ui_port) log.info("open jobui: %s%s", http_prefix, ssh_conf.job_ui_port) log.info("open jobhistoryui: %s%s", http_prefix, ssh_conf.job_history_ui_port) log.info("open jupyter: %s%s", http_prefix, ssh_conf.jupyter_port) log.info("open namenodeui: %s%s", http_prefix, ssh_conf.name_node_ui_port) log.info("open rstudio server: %s%s", http_prefix, ssh_conf.rstudio_server_port) log.info("ssh username: %s", ssh_conf.username) log.info("connect: %s", ssh_conf.connect) log.info("-------------------------------------------") # get ssh command try: ssh_cmd = utils.ssh_in_master( client=spark_client, cluster_id=ssh_conf.cluster_id, webui=ssh_conf.web_ui_port, jobui=ssh_conf.job_ui_port, jobhistoryui=ssh_conf.job_history_ui_port, namenodeui=ssh_conf.name_node_ui_port, jupyter=ssh_conf.jupyter_port, rstudioserver=ssh_conf.rstudio_server_port, username=ssh_conf.username, host=ssh_conf.host, connect=ssh_conf.connect) if not ssh_conf.connect: log.info("") log.info("Use the following command to connect to your spark head node:") log.info("\t%s", ssh_cmd) except batch_error.BatchErrorException as e: if e.error.code == "PoolNotFound": raise aztk.error.AztkError("The cluster you are trying to connect to does not exist.") else: raise