我们从Python开源项目中,提取了以下14个代码示例,用于说明如何使用yaml.ScalarNode()。
def represent_odict(dump, tag, mapping, flow_style=None): """ Like BaseRepresenter.represent_mapping, but does not issue the sort(). """ value = [] node = yaml.MappingNode(tag, value, flow_style=flow_style) if dump.alias_key is not None: dump.represented_objects[dump.alias_key] = node best_style = True if hasattr(mapping, 'items'): mapping = mapping.items() for item_key, item_value in mapping: node_key = dump.represent_data(item_key) node_value = dump.represent_data(item_value) if not (isinstance(node_key, yaml.ScalarNode) and not node_key.style): best_style = False if not (isinstance(node_value, yaml.ScalarNode) and not node_value.style): best_style = False value.append((node_key, node_value)) if flow_style is None: if dump.default_flow_style is not None: node.flow_style = dump.default_flow_style else: node.flow_style = best_style return node
def represent_mapping(self, tag, mapping, flow_style=None): value = [] node = yaml.MappingNode(tag, value, flow_style=flow_style) if self.alias_key is not None: self.represented_objects[self.alias_key] = node best_style = False if hasattr(mapping, 'items'): mapping = list(mapping.items()) for item_key, item_value in mapping: node_key = self.represent_data(item_key) node_value = self.represent_data(item_value) if not (isinstance(node_key, yaml.ScalarNode) and not node_key.style): best_style = False if not (isinstance(node_value, yaml.ScalarNode) and not node_value.style): best_style = False value.append((node_key, node_value)) if flow_style is None: if self.default_flow_style is not None: node.flow_style = self.default_flow_style else: node.flow_style = best_style return node
def multi_constructor(loader, tag_suffix, node): """ Deal with !Ref style function format """ if tag_suffix not in UNCONVERTED_SUFFIXES: tag_suffix = "{}{}".format(FN_PREFIX, tag_suffix) constructor = None if tag_suffix == "Fn::GetAtt": constructor = construct_getatt elif isinstance(node, yaml.ScalarNode): constructor = loader.construct_scalar elif isinstance(node, yaml.SequenceNode): constructor = loader.construct_sequence elif isinstance(node, yaml.MappingNode): constructor = loader.construct_mapping else: raise "Bad tag: !{}".format(tag_suffix) return ODict(( (tag_suffix, constructor(node)), ))
def represent_odict(dump, tag, mapping, flow_style=None): """Like BaseRepresenter.represent_mapping, but does not issue the sort(). """ value = [] node = yaml.MappingNode(tag, value, flow_style=flow_style) if dump.alias_key is not None: dump.represented_objects[dump.alias_key] = node best_style = True if hasattr(mapping, 'items'): mapping = mapping.items() for item_key, item_value in mapping: node_key = dump.represent_data(item_key) node_value = dump.represent_data(item_value) if not (isinstance(node_key, yaml.ScalarNode) and not node_key.style): best_style = False if not (isinstance(node_value, yaml.ScalarNode) and not node_value.style): best_style = False value.append((node_key, node_value)) if flow_style is None: if dump.default_flow_style is not None: node.flow_style = dump.default_flow_style else: node.flow_style = best_style return node
def represent_mapping(self, tag, mapping, flow_style=None): value = [] node = MappingNode(tag, value, flow_style=flow_style) if self.alias_key is not None: self.represented_objects[self.alias_key] = node best_style = True if hasattr(mapping, 'items'): mapping = list(mapping.items()) if not isinstance(mapping, OrderedDict): mapping.sort() for item_key, item_value in mapping: node_key = self.represent_data(item_key) node_value = self.represent_data(item_value) if not (isinstance(node_key, ScalarNode) and not node_key.style): best_style = False if not (isinstance(node_value, ScalarNode) and not node_value.style): best_style = False value.append((node_key, node_value)) if flow_style is None: if self.default_flow_style is not None: node.flow_style = self.default_flow_style else: node.flow_style = best_style return node
def unicode_representer(_, data): has_wide_lines = False for line in data.splitlines(): if len(line) > 80: has_wide_lines = True break if has_wide_lines: return yaml.ScalarNode( u'tag:yaml.org,2002:str', data, style='>') if "\n" in data: return yaml.ScalarNode( u'tag:yaml.org,2002:str', data, style='|') return yaml.ScalarNode( u'tag:yaml.org,2002:str', data, style='')
def format_node(cls, mapping, metric): if mapping.tag in [ 'tag:yaml.org,2002:str', Bytes2Kibibytes.yaml_tag, Number.yaml_tag, StripExtraDash.yaml_tag]: return yaml.ScalarNode(mapping.tag, mapping.value.format(**metric)) elif mapping.tag == 'tag:yaml.org,2002:map': values = [] for key, value in mapping.value: values.append((yaml.ScalarNode(key.tag, key.value), cls.format_node(value, metric))) return yaml.MappingNode(mapping.tag, values) elif mapping.tag in [ArrayItem.yaml_tag, ValueItem.yaml_tag]: values = [] for seq in mapping.value: map_values = list() for key, value in seq.value: if key.value == 'SELECT': map_values.append((yaml.ScalarNode(key.tag, key.value), cls.format_node(value, metric))) else: map_values.append((yaml.ScalarNode(key.tag, key.value), value)) values.append(yaml.MappingNode(seq.tag, map_values)) return yaml.SequenceNode(mapping.tag, values) elif mapping.tag in [MapValue.yaml_tag]: values = [] for key, value in mapping.value: if key.value == 'VALUE': values.append((yaml.ScalarNode(key.tag, key.value), cls.format_node(value, metric))) else: values.append((yaml.ScalarNode(key.tag, key.value), value)) return yaml.MappingNode(mapping.tag, values) return mapping
def from_yaml(cls, loader, node): logging.debug('{}:from_yaml(loader={})'.format(cls.__name__, loader)) default, select, value_desc = None, list(), None # find value description for elem in node.value: for key, value in elem.value: if key.value == 'VALUE': assert value_desc is None, "VALUE key already set" value_desc = value if key.value == 'SELECT': select.append(loader.construct_mapping(value)) if key.value == 'DEFAULT': assert default is None, "DEFAULT key already set" default = loader.construct_object(value) # if VALUE key isn't given, use default VALUE key # format: `VALUE: !Number '{vl.value}'` if value_desc is None: value_desc = yaml.ScalarNode(tag=u'!Number', value=u'{vl.value}') # select collectd metric based on SELECT condition metrics = loader.collector.items(select) assert len(metrics) < 2, \ 'Wrong SELECT condition {}, selected {} metrics'.format( select, len(metrics)) if len(metrics) > 0: item = cls.format_node(value_desc, {'vl': metrics[0], 'system': loader.system}) return loader.construct_object(item) # nothing has been found by SELECT condition, set to DEFAULT value. assert default is not None, "No metrics selected by SELECT condition" \ " {} and DEFAULT key isn't set".format(select) return default
def unicode_representer(dumper, uni): node = yaml.ScalarNode(tag=u'tag:yaml.org,2002:str', value=str(uni)) return node
def str_node_representer(dumper, str_node): node = yaml.ScalarNode(tag=u'tag:yaml.org,2002:str', value=str(str_node)) return node
def generic_object(loader, suffix, node): if isinstance(node, yaml.ScalarNode): constructor = loader.__class__.construct_scalar elif isinstance(node, yaml.SequenceNode): constructor = loader.__class__.construct_sequence elif isinstance(node, yaml.MappingNode): constructor = loader.__class__.construct_mapping else: raise ValueError(node) # TODO(tailhook) wrap into some object? return constructor(loader, node)
def include(self, node): if isinstance(node, yaml.ScalarNode): return self.extractFile(self.construct_scalar(node)) elif isinstance(node, yaml.SequenceNode): result = [] for filename in self.construct_sequence(node): result += self.extractFile(filename) return result else: raise yaml.constructor.ConstructorError
def from_yaml(cls, loader, node): key = node.tag[1:] if node.tag not in ('!Ref', '!Condition'): key = 'Fn::' + key if isinstance(node, ScalarNode): val = loader.construct_scalar(node) elif isinstance(node, SequenceNode): val = loader.construct_sequence(node) elif isinstance(node, MappingNode): val = loader.construct_mapping(node) else: raise Exception("Unable to handle node: %r"%node) return {str(key): str(val)}
def from_yaml(cls, loader, node): logging.debug('{}:process(loader={}, node={})'.format(cls.__name__, loader, node)) # e.g.: # SequenceNode(tag=u'!ArrayItem', value=[ # MappingNode(tag=u'tag:yaml.org,2002:map', value=[ # (ScalarNode(tag=u'tag:yaml.org,2002:str', value=u'SELECT'), # MappingNode(tag=u'tag:yaml.org,2002:map', value=[ # (ScalarNode(tag=u'tag:yaml.org,2002:str', value=u'plugin'), # , ...) # ]), ... # ), (key, value), ... ]) # , ... ]) assert isinstance(node, yaml.SequenceNode), \ "{} tag isn't YAML array".format(cls.__name__) select, index_keys, items, item_desc = list(), list(), list(), None for elem in node.value: for key, value in elem.value: if key.value == 'ITEM-DESC': assert item_desc is None, "ITEM-DESC key already set" item_desc = value if key.value == 'INDEX-KEY': assert len(index_keys) == 0, "INDEX-KEY key already set" index_keys = loader.construct_sequence(value) if key.value == 'SELECT': select.append(loader.construct_mapping(value)) # validate item description assert item_desc is not None, "Mandatory ITEM-DESC key isn't set" assert len(select) > 0 or len(index_keys) > 0, \ "Mandatory key (INDEX-KEY or SELECT) isn't set" metrics = loader.collector.items(select) # select metrics based on INDEX-KEY provided if len(index_keys) > 0: metric_set = set() for metric in metrics: value = CollectdValue() for key in index_keys: setattr(value, key, getattr(metric, key)) metric_set.add(value) metrics = list(metric_set) # build items based on SELECT and/or INDEX-KEY criteria for metric in metrics: item = cls.format_node(item_desc, {'vl': metric, 'system': loader.system, 'config': loader.config}) items.append(loader.construct_mapping(item)) return items