我们从Python开源项目中,提取了以下40个代码示例,用于说明如何使用yaml.CSafeLoader()。
def Deserializer(stream_or_string, **options): """ Deserialize a stream or string of YAML data. """ if isinstance(stream_or_string, bytes): stream_or_string = stream_or_string.decode('utf-8') if isinstance(stream_or_string, six.string_types): stream = StringIO(stream_or_string) else: stream = stream_or_string try: for obj in PythonDeserializer(yaml.load(stream, Loader=SafeLoader), **options): yield obj except GeneratorExit: raise except Exception as e: # Map to deserializer error six.reraise(DeserializationError, DeserializationError(e), sys.exc_info()[2])
def importyaml(connection,metadata,sourcePath): eveGraphics = Table('eveGraphics',metadata) print "Importing Graphics" print "opening Yaml" with open(os.path.join(sourcePath,'fsd','graphicIDs.yaml'),'r') as yamlstream: print "importing" trans = connection.begin() graphics=load(yamlstream,Loader=SafeLoader) print "Yaml Processed into memory" for graphic in graphics: connection.execute(eveGraphics.insert(), graphicID=graphic, sofFactionName=graphics[graphic].get('sofFactionName',''), graphicFile=graphics[graphic].get('graphicFile',''), sofHullName=graphics[graphic].get('sofHullName',''), sofRaceName=graphics[graphic].get('sofRaceName',''), description=graphics[graphic].get('description','')) trans.commit()
def importyaml(connection,metadata,sourcePath): print "Importing BSD Tables" files=glob.glob(os.path.join(sourcePath,'bsd','*.yaml')) for file in files: head, tail = os.path.split(file) tablename=tail.split('.')[0] print tablename tablevar = Table(tablename,metadata) print "Importing {}".format(file) print "Opening Yaml" trans = connection.begin() with open(file,'r') as yamlstream: rows=load(yamlstream,Loader=SafeLoader) print "Yaml Processed into memory" for row in rows: connection.execute(tablevar.insert().values(row)) trans.commit()
def parse_yaml(self, data, path_hint=None): ''' convert a yaml string to a data structure. Also supports JSON, ssssssh!!!''' stripped_data = data.lstrip() loaded = None if stripped_data.startswith("{") or stripped_data.startswith("["): # since the line starts with { or [ we can infer this is a JSON document. try: loaded = json.loads(data) except ValueError as ve: if path_hint: self.module.fail_json(msg=path_hint + ": " + str(ve)) else: self.module.fail_json(msg=str(ve)) else: # else this is pretty sure to be a YAML document loaded = yaml.load(data, Loader=Loader) return loaded
def from_yaml(cls, data): return cls.from_dict( load(data, Loader=SafeLoader), )
def importyaml(connection,metadata,sourcePath): eveIcons = Table('eveIcons',metadata) print "Importing Icons" print "Opening Yaml" with open(os.path.join(sourcePath,'fsd','iconIDs.yaml'),'r') as yamlstream: trans = connection.begin() icons=load(yamlstream,Loader=SafeLoader) print "Yaml Processed into memory" for icon in icons: connection.execute(eveIcons.insert(), iconID=icon, iconFile=icons[icon].get('iconFile',''), description=icons[icon].get('description','')) trans.commit()
def importyaml(connection,metadata,sourcePath): print "Importing Categories" invCategories = Table('invCategories',metadata) trnTranslations = Table('trnTranslations',metadata) print "opening Yaml" trans = connection.begin() with open(os.path.join(sourcePath,'fsd','categoryIDs.yaml'),'r') as yamlstream: print "importing" categoryids=load(yamlstream,Loader=SafeLoader) print "Yaml Processed into memory" for categoryid in categoryids: connection.execute(invCategories.insert(), categoryID=categoryid, categoryName=categoryids[categoryid].get('name',{}).get('en','').decode('utf-8'), iconID=categoryids[categoryid].get('iconID'), published=categoryids[categoryid].get('published',0)) if (categoryids[categoryid].has_key('name')): for lang in categoryids[categoryid]['name']: try: connection.execute(trnTranslations.insert(),tcID=6,keyID=categoryid,languageID=lang,text=categoryids[categoryid]['name'][lang].decode('utf-8')); except: print '{} {} has a category problem'.format(categoryid,lang) trans.commit()
def importyaml(connection,metadata,sourcePath): certCerts = Table('certCerts',metadata) certSkills = Table('certSkills',metadata,) skillmap={"basic":0,"standard":1,"improved":2,"advanced":3,"elite":4} print "Importing Certificates" print "opening Yaml" with open(os.path.join(sourcePath,'fsd','certificates.yaml'),'r') as yamlstream: trans = connection.begin() certificates=load(yamlstream,Loader=SafeLoader) print "Yaml Processed into memory" for certificate in certificates: connection.execute(certCerts.insert(), certID=certificate, name=certificates[certificate].get('name',''), description=certificates[certificate].get('description',''), groupID=certificates[certificate].get('groupID')) for skill in certificates[certificate]['skillTypes']: for skillLevel in certificates[certificate]['skillTypes'][skill]: connection.execute(certSkills.insert(), certID=certificate, skillID=skill, certLevelInt=skillmap[skillLevel], certLevelText=skillLevel, skillLevel=certificates[certificate]['skillTypes'][skill][skillLevel] ) trans.commit()
def yaml_load(filename): with filename.open("rt") as yamlfp: return yaml.load(yamlfp, Loader=YAMLLoader)
def run(self, lines): """ Parse Meta-Data and store in Markdown.Meta. """ meta = {} key = None yaml_block = [] have_yaml = False if lines and YAML_BEGIN_RE.match(lines[0]): have_yaml = True lines.pop(0) if self.config['yaml'] and not yaml: # pragma: no cover log.warning('Document with YAML header, but PyYAML unavailable') while lines: line = lines.pop(0) m1 = META_RE.match(line) if line.strip() == '' or have_yaml and YAML_END_RE.match(line): break # blank line or end of YAML header - done elif have_yaml and self.config['yaml'] and yaml: yaml_block.append(line) elif m1: key = m1.group('key').lower().strip() value = m1.group('value').strip() try: meta[key].append(value) except KeyError: meta[key] = [value] else: m2 = META_MORE_RE.match(line) if m2 and key: # Add another line to existing key meta[key].append(m2.group('value').strip()) else: lines.insert(0, line) break # no meta data - done if yaml_block: meta = yaml.load('\n'.join(yaml_block), SafeLoader) self.markdown.Meta = meta return lines
def load(stream): class Loader(yaml.CSafeLoader): pass def constructor(loader,node): return OrderedDict(loader.construct_pairs(node)) Loader.add_constructor( yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG, constructor) return yaml.load(stream, Loader=Loader)
def importyaml(connection,metadata,sourcePath): skinLicense = Table('skinLicense',metadata) skinMaterials = Table('skinMaterials',metadata) skins_table = Table('skins',metadata) skinShip = Table('skinShip',metadata) trans = connection.begin() print "Importing Skins" print "opening Yaml1" with open(os.path.join(sourcePath,'fsd','skins.yaml'),'r') as yamlstream: skins=load(yamlstream,Loader=SafeLoader) print "Yaml Processed into memory" for skinid in skins: connection.execute(skins_table.insert(), skinID=skinid, internalName=skins[skinid].get('internalName',''), skinMaterialID=skins[skinid].get('skinMaterialID','')) for ship in skins[skinid]['types']: connection.execute(skinShip.insert(), skinID=skinid, typeID=ship) print "opening Yaml2" with open(os.path.join(sourcePath,'fsd','skinLicenses.yaml'),'r') as yamlstream: skinlicenses=load(yamlstream,Loader=SafeLoader) print "Yaml Processed into memory" for licenseid in skinlicenses: connection.execute(skinLicense.insert(), licenseTypeID=licenseid, duration=skinlicenses[licenseid]['duration'], skinID=skinlicenses[licenseid]['skinID']) print "opening Yaml3" with open(os.path.join(sourcePath,'fsd','skinMaterials.yaml'),'r') as yamlstream: skinmaterials=load(yamlstream,Loader=SafeLoader) print "Yaml Processed into memory" for materialid in skinmaterials: connection.execute(skinMaterials.insert(), skinMaterialID=materialid, displayNameID=skinmaterials[materialid]['displayNameID'], materialSetID=skinmaterials[materialid]['materialSetID'] ) trans.commit()
def importyaml(connection,metadata,sourcePath): activityIDs={"copying":5,"manufacturing":1,"research_material":4,"research_time":3,"invention":8,"reaction":11}; industryBlueprints=Table('industryBlueprints',metadata) industryActivity = Table('industryActivity',metadata) industryActivityMaterials = Table('industryActivityMaterials',metadata) industryActivityProducts = Table('industryActivityProducts',metadata) industryActivitySkills = Table('industryActivitySkills',metadata) industryActivityProbabilities = Table('industryActivityProbabilities',metadata) print "importing Blueprints" print "opening Yaml" trans = connection.begin() with open(os.path.join(sourcePath,'fsd','blueprints.yaml'),'r') as yamlstream: blueprints=load(yamlstream,Loader=SafeLoader) print "Yaml Processed into memory" for blueprint in blueprints: connection.execute(industryBlueprints.insert(),typeID=blueprint,maxProductionLimit=blueprints[blueprint]["maxProductionLimit"]) for activity in blueprints[blueprint]['activities']: connection.execute(industryActivity.insert(), typeID=blueprint, activityID=activityIDs[activity], time=blueprints[blueprint]['activities'][activity]['time']) if blueprints[blueprint]['activities'][activity].has_key('materials'): for material in blueprints[blueprint]['activities'][activity]['materials']: connection.execute(industryActivityMaterials.insert(), typeID=blueprint, activityID=activityIDs[activity], materialTypeID=material['typeID'], quantity=material['quantity']) if blueprints[blueprint]['activities'][activity].has_key('products'): for product in blueprints[blueprint]['activities'][activity]['products']: connection.execute(industryActivityProducts.insert(), typeID=blueprint, activityID=activityIDs[activity], productTypeID=product['typeID'], quantity=product['quantity']) if product.has_key('probability'): connection.execute(industryActivityProbabilities.insert(), typeID=blueprint, activityID=activityIDs[activity], productTypeID=product['typeID'], probability=product['probability']) try: if blueprints[blueprint]['activities'][activity].has_key('skills'): for skill in blueprints[blueprint]['activities'][activity]['skills']: connection.execute(industryActivitySkills.insert(), typeID=blueprint, activityID=activityIDs[activity], skillID=skill['typeID'], level=skill['level']) except: print '{} has a bad skill'.format(blueprint) trans.commit()