我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用collections.namedtuple()。
def connection_from_pool_key(self, pool_key): """ Get a :class:`ConnectionPool` based on the provided pool key. ``pool_key`` should be a namedtuple that only contains immutable objects. At a minimum it must have the ``scheme``, ``host``, and ``port`` fields. """ with self.pools.lock: # If the scheme, host, or port doesn't match existing open # connections, open a new ConnectionPool. pool = self.pools.get(pool_key) if pool: return pool # Make a fresh ConnectionPool of the desired type pool = self._new_pool(pool_key.scheme, pool_key.host, pool_key.port) self.pools[pool_key] = pool return pool
def driver_version(self): """ collections.namedtuple: Indicates the major, minor and update portions of the installed version of NI-DAQmx. - major_version (int): Indicates the major portion of the installed version of NI-DAQmx, such as 7 for version 7.0. - minor_version (int): Indicates the minor portion of the installed version of NI-DAQmx, such as 0 for version 7.0. - update_version (int): Indicates the update portion of the installed version of NI-DAQmx, such as 1 for version 9.0.1. """ DriverVersion = collections.namedtuple( 'DriverVersion', ['major_version', 'minor_version', 'update_version']) return DriverVersion(self._major_version, self._minor_version, self._update_version)
def generate_leaders(*, leader_factory=None): """Creates a generator for the leaderboard. Generates a 4-tuple containing the ranking, score, user object, and registration information of the leader. The result is a namedtuple. """ if leader_factory is None: leader_factory = Leader cursor = db.get_cursor() cursor.execute(""" SELECT "serial", "score" FROM "user" ORDER BY "score" DESC """) for ranking, (serial, score) in enumerate(cursor, 1): user = User(serial=serial) registration = registrations.get_registration(serial=serial) yield leader_factory( ranking=ranking, score=score, user=user, registration=registration, )
def connection_from_pool_key(self, pool_key, request_context=None): """ Get a :class:`ConnectionPool` based on the provided pool key. ``pool_key`` should be a namedtuple that only contains immutable objects. At a minimum it must have the ``scheme``, ``host``, and ``port`` fields. """ with self.pools.lock: # If the scheme, host, or port doesn't match existing open # connections, open a new ConnectionPool. pool = self.pools.get(pool_key) if pool: return pool # Make a fresh ConnectionPool of the desired type scheme = request_context['scheme'] host = request_context['host'] port = request_context['port'] pool = self._new_pool(scheme, host, port, request_context=request_context) self.pools[pool_key] = pool return pool
def iter_units_for_relation_name(relation_name): """Iterate through all units in a relation Generator that iterates through all the units in a relation and yields a named tuple with rid and unit field names. Usage: data = [(u.rid, u.unit) for u in iter_units_for_relation_name(relation_name)] :param relation_name: string relation name :yield: Named Tuple with rid and unit field names """ RelatedUnit = namedtuple('RelatedUnit', 'rid, unit') for rid in relation_ids(relation_name): for unit in related_units(rid): yield RelatedUnit(rid, unit)
def parse_argspec(callable_): """ Takes a callable and returns a tuple with the list of Argument objects, the name of *args, and the name of **kwargs. If *args or **kwargs is not present, it will be None. This returns a namedtuple called Argspec that has three fields named: args, starargs, and kwargs. """ args, varargs, keywords, defaults = inspect.getargspec(callable_) defaults = list(defaults or []) if getattr(callable_, '__self__', None) is not None: # This is a bound method, drop the self param. args = args[1:] first_default = len(args) - len(defaults) return Argspec( [Argument(arg, Argument.no_default if n < first_default else defaults[n - first_default]) for n, arg in enumerate(args)], varargs, keywords, )
def test_checks_should_trigger(self): class CountingRule(Always): count = 0 def should_trigger(self, dt, env): CountingRule.count += 1 return True for r in [CountingRule] * 5: self.em.add_event( Event(r(), lambda context, data: None) ) mock_algo_class = namedtuple('FakeAlgo', ['trading_environment']) mock_algo = mock_algo_class(trading_environment="fake_env") self.em.handle_data(mock_algo, None, datetime.datetime.now()) self.assertEqual(CountingRule.count, 5)
def choose_aws_role(assertion): """ Choose AWS role from SAML assertion """ aws_attribute_role = 'https://aws.amazon.com/SAML/Attributes/Role' attribute_value_urn = '{urn:oasis:names:tc:SAML:2.0:assertion}AttributeValue' roles = [] role_tuple = namedtuple("RoleTuple", ["principal_arn", "role_arn"]) root = ET.fromstring(base64.b64decode(assertion)) for saml2attribute in root.iter('{urn:oasis:names:tc:SAML:2.0:assertion}Attribute'): if saml2attribute.get('Name') == aws_attribute_role: for saml2attributevalue in saml2attribute.iter(attribute_value_urn): roles.append(role_tuple(*saml2attributevalue.text.split(','))) for index, role in enumerate(roles): role_name = role.role_arn.split('/')[1] print("%d: %s" % (index+1, role_name)) role_choice = input('Please select the AWS role: ')-1 return roles[role_choice]
def create_legacy_kmeans_nodes(f, new_group_name, legacy_group_name, namedtuple, clustering_key): """ Soft-link a legacy-structured (CR 1.2) kmeans subgroup (dest) to a new-style (CR 1.3) subgroup (src). The old-style was a group called 'kmeans' with subgroups named _K. The new-style is a group called 'clustering' with subgroups named kmeans_K_clusters, etc. """ group = f.create_group(f.root, legacy_group_name) cluster_type, cluster_param = parse_clustering_key(clustering_key) if cluster_type != CLUSTER_TYPE_KMEANS: return legacy_key = format_legacy_clustering_key(cluster_type, cluster_param) subgroup = f.create_group(group, legacy_key) for field in namedtuple._fields: target = '/%s/_%s/%s' % (new_group_name, clustering_key, field) if f.__contains__(target): # NOTE: coerce `target` to 'str' here because pytables chokes on unicode `target` f.create_soft_link(subgroup, field, target=str(target)) else: sys.stderr.write('Skipped soft-link of legacy dataset to %s; node doesn\'t exist\n' % target)
def refresh_manager(self, fake_cluster, fake_database): refresh_manager = FullRefreshManager() refresh_manager.options = namedtuple( 'Options', ['cluster', 'database', 'config_path', 'dry_run', 'verbose', 'per_source_throughput_cap', 'total_throughput_cap'] )( fake_cluster, fake_database, self.config_path, True, 0, DEFAULT_CAP, 1000 ) refresh_manager._init_global_state() return refresh_manager
def make_params(**kwargs): """Create a Params tuple for BenchmarkCNN from kwargs. Default values are filled in from _DEFAULT_PARAMS. Args: **kwargs: kwarg values will override the default values. Returns: Params namedtuple for constructing BenchmarkCNN. """ # Create a (name: default_value) map from PARAMS. default_kwargs = { name: _DEFAULT_PARAMS[name].default_value for name in _DEFAULT_PARAMS } return Params(**default_kwargs)._replace(**kwargs)
def lookup(tag): """ :param tag: Integer tag number :returns: Taginfo namedtuple, From the TAGS_V2 info if possible, otherwise just populating the value and name from TAGS. If the tag is not recognized, "unknown" is returned for the name """ return TAGS_V2.get(tag, TagInfo(tag, TAGS.get(tag, 'unknown'))) ## # Map tag numbers to tag info. # # id: (Name, Type, Length, enum_values) #
def collect_moves(self, reader, name): Moves = namedtuple('Moves', ['pokemon', 'gen', 'color', 'moves', 'versions']) if name.split('-')[-1].isdigit(): for row in reader: if name == row[0]: pokemon = name.split('-')[0].title() generation, color = switcher[row[1]], int(ast.literal_eval(row[2])) moves, versions = ast.literal_eval(row[3]), ast.literal_eval(row[4]) return Moves(pokemon, generation, color, moves, versions) else: for row in reader: if name in row[0]: pokemon = name.title() generation, color = switcher[row[1]], int(ast.literal_eval(row[2])) moves, versions = ast.literal_eval(row[3]), ast.literal_eval(row[4]) return Moves(pokemon, generation, color, moves, versions)
def test_non_frozen_udts(self): """ Test to ensure that non frozen udt's work with C* >3.6. @since 3.7.0 @jira_ticket PYTHON-498 @expected_result Non frozen UDT's are supported @test_category data_types, udt """ self.session.execute("USE {0}".format(self.keyspace_name)) self.session.execute("CREATE TYPE user (state text, has_corn boolean)") self.session.execute("CREATE TABLE {0} (a int PRIMARY KEY, b user)".format(self.function_table_name)) User = namedtuple('user', ('state', 'has_corn')) self.cluster.register_user_type(self.keyspace_name, "user", User) self.session.execute("INSERT INTO {0} (a, b) VALUES (%s, %s)".format(self.function_table_name), (0, User("Nebraska", True))) self.session.execute("UPDATE {0} SET b.has_corn = False where a = 0".format(self.function_table_name)) result = self.session.execute("SELECT * FROM {0}".format(self.function_table_name)) self.assertFalse(result[0].b.has_corn) table_sql = self.cluster.metadata.keyspaces[self.keyspace_name].tables[self.function_table_name].as_cql_query() self.assertNotIn("<frozen>", table_sql)
def test_raise_error_on_nonexisting_udts(self): """ Test for ensuring that an error is raised for operating on a nonexisting udt or an invalid keyspace """ c = Cluster(protocol_version=PROTOCOL_VERSION) s = c.connect(self.keyspace_name, wait_for_all_pools=True) User = namedtuple('user', ('age', 'name')) with self.assertRaises(UserTypeDoesNotExist): c.register_user_type("some_bad_keyspace", "user", User) with self.assertRaises(UserTypeDoesNotExist): c.register_user_type("system", "user", User) with self.assertRaises(InvalidRequest): s.execute("CREATE TABLE mytable (a int PRIMARY KEY, b frozen<user>)") c.shutdown()
def lookup(tag): """ :param tag: Integer tag number :returns: Taginfo namedtuple, From the TAGS_V2 info if possible, otherwise just populating the value and name from TAGS. If the tag is not recognized, "unknown" is returned for the name """ return TAGS_V2.get(tag, TagInfo(tag, TAGS.get(tag, 'unknown'))) ## # Map tag numbers to tag info. # # id: (Name, Type, Length, enum_values) # # The length here differs from the length in the tiff spec. For # numbers, the tiff spec is for the number of fields returned. We # agree here. For string-like types, the tiff spec uses the length of # field in bytes. In Pillow, we are using the number of expected # fields, in general 1 for string-like types.
def stats2(sarray, names=None): """Calculate means and (co)variances for structured array data.""" if names is None: names = sarray.dtype.names nvar = len(names) data = tuple(sarray[name] for name in names) cov = np.cov(data) nondiag_cov = list(cov[i, j] for i, j in permutations(range(nvar), 2)) names_ave = list('ave_' + name for name in names) names_var = list('var_' + name for name in names) names_cov = list( 'cov_' + n1 + "_" + n2 for n1, n2 in permutations(names, 2)) out = dict(zip(names_ave, np.mean(data, axis=1))) out.update(zip(names_var, cov.diagonal())) out.update(zip(names_cov, nondiag_cov)) NamedStats = namedtuple('Stats2', names_ave + names_var + names_cov) return NamedStats(**out)
def abstract_brackets(formula, variables_re=''): lwt = split_formula(formula) new_variables = {} while lwt: substitute = no_re_matches(combine_re_expressions( itertools.chain((variables_re,), new_variables.keys()) )) formula = lwt['leading'] + substitute + lwt['trailing'] new_variables[substitute] = lwt['within'] lwt = split_formula(formula) if formula in new_variables.keys(): # incase of extranous brackets return abstract_brackets(new_variables[formula], variables_re) # return [formula, new_variables] return namedtuple('abstract_brackets', ('formula', 'new_variables'))(formula, new_variables) # splits formula into 2 parts (leading and trailing) where the operator (from settings.order_of_operations) with the lowest priority is # if there are no operators in formula returns None
def _read_data(self, stream, verbose = False): """Process frame data rows from the CSV stream.""" # Note that the frame_num indices do not necessarily start from zero, # but the setter functions assume that the array indices do. This # implementation just ignores the original frame numbers, the frames are # renumbered from zero. for row_num, row in enumerate(stream): frame_num = int(row[0]) frame_t = float(row[1]) values = row[2:] # if verbose: print "Processing row_num %d, frame_num %d, time %f." % (row_num, frame_num, frame_t) # add the new frame time to each object storing a trajectory for body in self.rigid_bodies.values(): body._add_frame(frame_t) # process the columns of interest for mapping in self._column_map: # each mapping is a namedtuple with a setter method, column index, and axis name mapping.setter( row_num, mapping.axis, values[mapping.column] ) # ================================================================
def paragraph(self): """Return the index within self.text of the current paragraph and of the current line and current character (number of characters since the start of the paragraph) within the paragraph Returns: namedtuple (para_index, line_index, char_index) """ idx_para = idx_buffer = idx_line = idx_char = 0 done = False for para in self.text: for idx_line, line in enumerate(para): if idx_buffer == self.buffer_idx_y: done = True break idx_buffer += 1 if done is True: break idx_para += 1 idx_char = sum(map(len, self.text[idx_para][:idx_line])) + \ self.buffer_idx_x p = namedtuple("para", ['para_index', 'line_index', 'char_index']) return p(idx_para, idx_line, idx_char)
def test_exception_invalid_csv(self): table_text = """nan = float("nan") inf = float("inf") TEST_TABLE_NAME = "test_table" TEST_DB_NAME = "test_db" NOT_EXIT_FILE_PATH = "/not/existing/file/__path__" NamedTuple = namedtuple("NamedTuple", "attr_a attr_b") NamedTupleEx = namedtuple("NamedTupleEx", "attr_a attr_b attr_c") """ loader = ptr.CsvTableTextLoader(table_text) loader.table_name = "dummy" with pytest.raises(ptr.InvalidDataError): for _tabletuple in loader.load(): pass
def parse_request_start_line(line): """Returns a (method, path, version) tuple for an HTTP 1.x request line. The response is a `collections.namedtuple`. >>> parse_request_start_line("GET /foo HTTP/1.1") RequestStartLine(method='GET', path='/foo', version='HTTP/1.1') """ try: method, path, version = line.split(" ") except ValueError: raise HTTPInputError("Malformed HTTP request line") if not re.match(r"^HTTP/1\.[0-9]$", version): raise HTTPInputError( "Malformed HTTP version in HTTP Request-Line: %r" % version) return RequestStartLine(method, path, version)
def parse_response_start_line(line): """Returns a (version, code, reason) tuple for an HTTP 1.x response line. The response is a `collections.namedtuple`. >>> parse_response_start_line("HTTP/1.1 200 OK") ResponseStartLine(version='HTTP/1.1', code=200, reason='OK') """ line = native_str(line) match = re.match("(HTTP/1.[0-9]) ([0-9]+) ([^\r]*)", line) if not match: raise HTTPInputError("Error parsing response start line") return ResponseStartLine(match.group(1), int(match.group(2)), match.group(3)) # _parseparam and _parse_header are copied and modified from python2.7's cgi.py # The original 2.7 version of this code did not correctly support some # combinations of semicolons and double quotes. # It has also been modified to support valueless parameters as seen in # websocket extension negotiations.
def test_get_network_data(self, time_mock, sleep_mock): time_mock.side_effect = [1, 2] Counter = namedtuple('Counter', ['bytes_sent', 'bytes_recv', 'packets_sent', 'packets_recv']) first_counter = Counter(bytes_sent=54000, bytes_recv=12000, packets_sent=50, packets_recv=100) second_counter = Counter(bytes_sent=108000, bytes_recv=36000, packets_sent=75, packets_recv=150) m = mock.Mock() m.side_effect = [ {'eth0': first_counter}, {'eth0': second_counter} ] self.network.psutil.net_io_counters = m kb_ul, kb_dl, p_ul, p_dl = self.network.get_network_data( interface='eth0', delay=1) self.assertEqual(kb_ul, 54000) self.assertEqual(kb_dl, 24000) self.assertEqual(p_ul, 25) self.assertEqual(p_dl, 50)
def build_model(): l_in = nn.layers.InputLayer((None, n_candidates_per_patient,) + p_transform['patch_size']) l_in_rshp = nn.layers.ReshapeLayer(l_in, (-1, 1,) + p_transform['patch_size']) l_target = nn.layers.InputLayer((None,)) l = load_pretrained_model(l_in_rshp) #ins = penultimate_layer.output_shape[1] # l = conv3d(penultimate_layer, ins, filter_size=3, stride=2) # #l = feat_red(l) # # # l = nn.layers.DropoutLayer(l) # # # l = nn.layers.DenseLayer(l, num_units=256, W=nn.init.Orthogonal(), # nonlinearity=nn.nonlinearities.rectify) #l = nn.layers.DropoutLayer(l) l = nn.layers.ReshapeLayer(l, (-1, n_candidates_per_patient, 1)) l_out = nn_lung.LogMeanExp(l,r=16, axis=(1, 2), name='LME') return namedtuple('Model', ['l_in', 'l_out', 'l_target'])(l_in, l_out, l_target)
def build_model(): l_in = nn.layers.InputLayer((None, n_candidates_per_patient, ) + p_transform['patch_size']) l_in_rshp = nn.layers.ReshapeLayer(l_in, (-1, 1,) + p_transform['patch_size']) l_target = nn.layers.InputLayer((batch_size,)) penultimate_layer = load_pretrained_model(l_in_rshp) l = dense(penultimate_layer, 128, name='dense_final') l = nn.layers.DenseLayer(l, num_units=1, W=nn.init.Orthogonal(), nonlinearity=nn.nonlinearities.sigmoid, name='dense_p_benign') l = nn.layers.ReshapeLayer(l, (-1, n_candidates_per_patient, 1), name='reshape2patients') l_out = nn_lung.LogMeanExp(l, r=8, axis=(1, 2), name='LME') return namedtuple('Model', ['l_in', 'l_out', 'l_target'])(l_in, l_out, l_target)
def build_model(): l_in = nn.layers.InputLayer((None, n_candidates_per_patient, 1,) + p_transform['patch_size']) l_in_rshp = nn.layers.ReshapeLayer(l_in, (-1, 1,) + p_transform['patch_size']) l_target = nn.layers.InputLayer((batch_size,)) penultimate_layer = load_pretrained_model(l_in_rshp) l = drop(penultimate_layer, name='drop_final2') l = dense(l, 256, name='dense_final1') l = drop(l, name='drop_final2') l = dense(l, 256, name='dense_final2') l = nn.layers.DenseLayer(l, num_units=1, W=nn.init.Orthogonal(), nonlinearity=None, name='dense_p_benign') l = nn.layers.ReshapeLayer(l, (-1, n_candidates_per_patient, 1), name='reshape2patients') l_out = nn_lung.AggAllBenignExp(l, name='aggregate_all_nodules_benign') return namedtuple('Model', ['l_in', 'l_out', 'l_target'])(l_in, l_out, l_target)
def build_model(): l_in = nn.layers.InputLayer((None, n_candidates_per_patient, 1,) + p_transform['patch_size']) l_in_rshp = nn.layers.ReshapeLayer(l_in, (-1, 1,) + p_transform['patch_size']) l_target = nn.layers.InputLayer((batch_size,)) penultimate_layer = load_pretrained_model(l_in_rshp) l = drop(penultimate_layer, name='drop_final') l = dense(l, 128, name='dense_final') l = nn.layers.DenseLayer(l, num_units=1, W=nn.init.Orthogonal(), nonlinearity=None, name='dense_p_benign') l = nn.layers.ReshapeLayer(l, (-1, n_candidates_per_patient, 1), name='reshape2patients') l_out = nn_lung.AggAllBenignExp(l, name='aggregate_all_nodules_benign') return namedtuple('Model', ['l_in', 'l_out', 'l_target'])(l_in, l_out, l_target)
def build_model(): l_in = nn.layers.InputLayer((None, n_candidates_per_patient,) + p_transform['patch_size']) l_in_rshp = nn.layers.ReshapeLayer(l_in, (-1, 1,) + p_transform['patch_size']) l_target = nn.layers.InputLayer((batch_size,)) l = load_pretrained_model(l_in_rshp) #ins = penultimate_layer.output_shape[1] # l = conv3d(penultimate_layer, ins, filter_size=3, stride=2) # #l = feat_red(l) # # # l = nn.layers.DropoutLayer(l) # # # l = nn.layers.DenseLayer(l, num_units=256, W=nn.init.Orthogonal(), # nonlinearity=nn.nonlinearities.rectify) #l = nn.layers.DropoutLayer(l) l = nn.layers.ReshapeLayer(l, (-1, n_candidates_per_patient, 1)) l_out = nn_lung.LogMeanExp(l,r=16, axis=(1, 2), name='LME') return namedtuple('Model', ['l_in', 'l_out', 'l_target'])(l_in, l_out, l_target)
def build_model(): l_in = nn.layers.InputLayer((None, n_candidates_per_patient, ) + p_transform['patch_size']) l_in_rshp = nn.layers.ReshapeLayer(l_in, (-1, 1,) + p_transform['patch_size']) l_target = nn.layers.InputLayer((batch_size,)) penultimate_layer = load_pretrained_model(l_in_rshp) l = nn.layers.DenseLayer(penultimate_layer, num_units=1, W=nn.init.Orthogonal(), nonlinearity=nn.nonlinearities.sigmoid, name='dense_p_benign') l = nn.layers.ReshapeLayer(l, (-1, n_candidates_per_patient, 1), name='reshape2patients') l_out = nn_lung.LogMeanExp(l, r=8, axis=(1, 2), name='LME') return namedtuple('Model', ['l_in', 'l_out', 'l_target'])(l_in, l_out, l_target)