我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用yaml.load()。
def openGeoPackage(self, filename=None): if not filename: filename = QFileDialog.getOpenFileName( parent=None, caption=self.tr(u'Select GeoPackage file'), filter=self.tr(u'GeoPackage File') + u' (*.gpkg *.geopackage)') if not filename or not os.path.isfile(filename): return filename = os.path.abspath(filename) styleFile = os.path.join(self.path, 'res', 'wp_style.yaml') with open(styleFile, 'r') as f: style = yaml.load(f) applyStyle(filename, style) for layer in self.iface.legendInterface().layers(): self.iface.legendInterface().refreshLayerSymbology(layer) self.createPie()
def retrieve(cont, filename): stream = file(filename, 'r') data = yaml.load(stream) #return yaml.dump(data, encoding=('utf-8'), default_flow_style=False, allow_unicode=True) return data[cont].encode('utf-8')
def internal_data(filename, io, entry, cont, cont_in = None, cont_in2 = None): #Supports up to 3 containers stacked on top. "filename = 'string',, io = [in, out],, entry = val,, cont,,..." stream = open(filename, 'r') prof = yaml.load(stream) if io == 'out': if cont_in == None: val = prof[cont] else: if cont_in2 == None: val = prof[cont][cont_in] else: val = prof[cont][cont_in][cont_in2] return val if io == 'in': if cont_in == None: prof[cont] = entry else: if cont_in2 == None: prof[cont][cont_in] = entry else: prof[cont][cont_in][cont_in2] = entry with open(filename, 'w') as yaml_file: yaml_file.write(yaml.dump(prof, default_flow_style = False))
def __init__(self): settings_files = [] project_path = os.path.dirname(__file__) project_settings_file = os.path.join(project_path, 'settings.yaml') settings_files.append(project_settings_file) settings_files.append('/etc/git-exension-settings.yaml') settings_files.append('/etc/nailgun/git-exension-settings.yaml') self.config = {} for sf in settings_files: try: logger.debug("Trying to read config file %s" % sf) with open(sf) as custom_config: self.config.update(yaml.load(custom_config.read())) except Exception as e: logger.error("Error while reading config file %s: %s" % (sf, str(e)))
def configure(self): # load config values try: with open(self.configfile) as configfile_contents: self.config = json.load(configfile_contents) except: self.config = {} try: self.agent_services = self.api_session.get(self.api_endpoint + '/agent/services?stale').json() except: print_exc() exit(135) self.managed_service = self.agent_services[self.service] if self.managed_service['Tags'] == None: self.managed_service['Tags'] = [] if self.role_source == "facter": self.get_facter_state(self.DEFAULT_FACTERFILE) else: print("!! unsupported PG role source !!") exit(140)
def __load_config(self): """Loads the configuration file. Loads all usergroups and users as a dict from the configuration file into the config attribute. """ if not os.path.exists(self.USERS_CONF_PATH): self.__config = {} return with open(self.USERS_CONF_PATH, "r") as config_file: config = yaml.load(config_file) if not config: self.__config = {} return self.__config = config
def load(self): # Prepare + load directory. super().load() # Load the files and parse Yaml. parsed_settings = dict() try: for file_name in self.files: file_path = os.path.join(self.directory, file_name) with open(file_path, 'r') as file_handle: parsed_settings.update(yaml.load(file_handle)) except (yaml.YAMLError, yaml.MarkedYAMLError) as e: raise ImproperlyConfigured( 'Your settings file(s) contain invalid YAML syntax! Please fix and restart!, {}'.format(str(e)) ) # Loop and set in local settings (+ uppercase keys). for key, value in parsed_settings.items(): self.settings[key.upper()] = value
def test_without_extra_args(self): pipeline_def = yaml.load(""" class: ParallelTextInputPipeline params: source_files: ["file1"] target_files: ["file2"] num_epochs: 1 shuffle: True """) pipeline = input_pipeline.make_input_pipeline_from_def( pipeline_def, tf.contrib.learn.ModeKeys.TRAIN) self.assertIsInstance(pipeline, input_pipeline.ParallelTextInputPipeline) #pylint: disable=W0212 self.assertEqual(pipeline.params["source_files"], ["file1"]) self.assertEqual(pipeline.params["target_files"], ["file2"]) self.assertEqual(pipeline.params["num_epochs"], 1) self.assertEqual(pipeline.params["shuffle"], True)
def test_with_extra_args(self): pipeline_def = yaml.load(""" class: ParallelTextInputPipeline params: source_files: ["file1"] target_files: ["file2"] num_epochs: 1 shuffle: True """) pipeline = input_pipeline.make_input_pipeline_from_def( def_dict=pipeline_def, mode=tf.contrib.learn.ModeKeys.TRAIN, num_epochs=5, shuffle=False) self.assertIsInstance(pipeline, input_pipeline.ParallelTextInputPipeline) #pylint: disable=W0212 self.assertEqual(pipeline.params["source_files"], ["file1"]) self.assertEqual(pipeline.params["target_files"], ["file2"]) self.assertEqual(pipeline.params["num_epochs"], 5) self.assertEqual(pipeline.params["shuffle"], False)
def __init__(self): self.config_dir = os.path.expanduser("~/.config/ytbrowser/") self.defaults['format'] = "mkv" self.defaults['quality'] = "bestvideo" self.defaults['preferredcodec'] = "mp3" self.defaults['preferredquality'] = 192 self.defaults['developerKey'] = "AIzaSyDFuK00HWV0fd1VMb17R8GghRVf_iQx9uk" self.defaults['apiServiceName'] = "youtube" self.defaults['apiVersion'] = "v3" if not os.path.exists(self.config_dir): os.makedirs(self.config_dir) if not os.path.exists(self.config_dir + "config.yml"): open(self.config_dir + "config.yml", "a").close() with open(self.config_dir + "config.yml", 'r') as ymlfile: self.user_settings = yaml.load(ymlfile) if self.user_settings is None: self.user_settings = {}
def setup(self, bottom, top): """Setup the RoIDataLayer.""" # parse the layer parameter string, which must be valid YAML layer_params = yaml.load(self.param_str_) self._batch_size = config.BATCH_SIZE self._triplet = self._batch_size/3 assert self._batch_size % 3 == 0 self._name_to_top_map = { 'data': 0, 'labels': 1} self.data_container = sampledata() self._index = 0 # data blob: holds a batch of N images, each with 3 channels # The height and width (100 x 100) are dummy values top[0].reshape(self._batch_size, 3, 224, 224) top[1].reshape(self._batch_size)
def __getattr__(self, item): try: val = super(NewBaseModel, self).__getattribute__(item) except AttributeError: pass # fall through to the rest of the logic... # try looking up via self._info, if we already have it. if item in self._info: return self._info[item] # if we're still here, let's load the object if we haven't done so already. if not self._full_init: self._refresh() # try one more time. if item in self._info: return self._info[item] else: raise AttributeError("'{0}' object has no attribute '{1}'".format(self.__class__.__name__, item))
def __str__(self): lines = [] lines.append("{0:s} object, bound to {1:s}.".format(self.__class__.__name__, self._cb.session.server)) if self._last_refresh_time: lines.append(" Last refreshed at {0:s}".format(time.ctime(self._last_refresh_time))) if not self._full_init: lines.append(" Partially initialized. Use .refresh() to load all attributes") lines.append("-"*79) lines.append("") for attr in sorted(self._info): status = " " if attr in self._dirty_attributes: if self._dirty_attributes[attr] is None: status = "(+)" else: status = "(*)" val = str(self._info[attr]) if len(val) > 50: val = val[:47] + u"..." lines.append(u"{0:s} {1:>20s}: {2:s}".format(status, attr, val)) return "\n".join(lines)
def __init__(self, ansible_config_path, aws_key_name=None, interval=60, qname='sm_annotate', debug=False): with open(ansible_config_path) as fp: self.ansible_config = yaml.load(fp) self.interval = min(interval, 1200) self.aws_key_name = aws_key_name or self.ansible_config['aws_key_name'] self.master_hostgroup = self.ansible_config['cluster_configuration']['instances']['master']['hostgroup'] self.slave_hostgroup = self.ansible_config['cluster_configuration']['instances']['slave']['hostgroup'] self.stage = self.ansible_config['stage'] self.qname = qname self.debug = debug self._setup_logger() self.ec2 = boto3.resource('ec2', self.ansible_config['aws_region'])
def resolve_dotenv_file(path, stage=None): ''' Resolve dotenv file and load environment vars if it exists. If stage parameter is provided, then stage specific .env file is resolved, for instance .env.production if stage=production etc. If stage is None, just .env file is resolved. ''' filename = '.env' + ('' if not stage else '.{}'.format(stage)) dotenv_path = os.path.join(path, filename) fallback_path = os.path.join(path, '.env') if fs.exists(dotenv_path): info('Resolving env file: {}'.format(cyan(dotenv_path))) dotenv.load_dotenv(dotenv_path) elif fs.exists(fallback_path): info('Resolving env file: {}'.format(cyan(fallback_path))) dotenv.load_dotenv(fallback_path)
def load(filename=DEFAULT_CONFIG_FILE, stage=None): ''' Load the configuration and return it. ''' try: # pass file_contents = fs.read(filename) resolve_dotenv_file(os.path.dirname(filename), stage) # Expand the environment variables used in the yaml config. loaded_config = os.path.expandvars(file_contents) # Parse the yaml configuration. # And merge it with the defaults before it's used everywhere. loaded_config = yaml.load(loaded_config) merged_config = merge_config(loaded_config) _config.update(merged_config) return get() except KeyError: halt('Invalid configuration file "{}"'.format(filename)) except IOError: halt('Error loading config file "%s"' % filename)
def main(): global ARGS parser = argparse.ArgumentParser(description='Run a PyWebRunner YAML/JSON script.') parser.add_argument('-b', '--browser', help='Which browser to load. Defaults to Chrome.') parser.add_argument('--base-url', help='Base URL to use with goto command.') parser.add_argument('-t', '--timeout', help='Global wait timeout (in seconds). Defaults to 30.') parser.add_argument('-p', '--processes', help='Number of processes (browsers) to use. Defaults to 1') parser.add_argument('-do', '--default-offset', help='New default offset for scroll_to_element. (Default is 0)') parser.add_argument('--errors', dest='errors', action='store_true', help='Show errors.') parser.add_argument('--focus', dest='focus', action='store_true', help='Focus the browser on launch.') parser.add_argument('-v', '--verbose', dest='verbose', action='store_true', help='Verbose output of commands being executed.') parser.add_argument('files', nargs='*') ARGS = parser.parse_args() processes = ARGS.processes or 1 pool = Pool(int(processes)) pool.map(run_test, ARGS.files) pool.close() pool.join()
def get_device_info(): """ ?????????devices :return: ?????? """ device_list = [] ini = U.ConfigIni() test_info = ini.get_ini('test_info', 'info') test_device = ini.get_ini('test_device', 'device') with open(test_info) as f: test_dic = yaml.load(f)[0] with open(test_device) as f: for device in yaml.load(f): device_list.append(dict(test_dic.items() + device.items())) return device_list
def iter_keys_values(self, keys, inds=None, verbose=False): for key in keys: if key not in self.keys_: raise RuntimeError('Key %s not found in dataset. keys: %s' % (key, self.keys_)) idx, ii = 0, 0 total_chunks = len(self.meta_file_.chunks) inds = np.sort(inds) if inds is not None else None for chunk_idx, chunk in enumerate(progressbar(self.meta_file_.chunks, size=total_chunks, verbose=verbose)): data = AttrDict.load(self.get_chunk_filename(chunk_idx)) # if inds is None: items = (data[key] for key in keys) for item in izip(*items): yield item # else: # for i, item in enumerate(data[key]): # if inds[ii] == idx + i: # yield item # ii += 1 # if ii >= len(inds): break # idx += len(data[key])
def iterchunks(self, key, batch_size=10, verbose=False): if key not in self.keys_: raise RuntimeError('Key %s not found in dataset. keys: %s' % (key, self.keys_)) idx, ii = 0, 0 total_chunks = len(self.meta_file_.chunks) batch_chunks = grouper(range(len(self.meta_file_.chunks)), batch_size) for chunk_group in progressbar(batch_chunks, size=total_chunks / batch_size, verbose=verbose): items = [] # print key, chunk_group for chunk_idx in chunk_group: # grouper will fill chunks with default none values if chunk_idx is None: continue # Load chunk data = AttrDict.load(self.get_chunk_filename(chunk_idx)) for item in data[key]: items.append(item) yield items
def test_config(self): with open('config.yml') as inp: config = yaml.load(inp.read()) _find_entities = find_entities.Subscriber(test_helper.get_mock_pipeline([])) _find_entities.setup(config) doc = document.get_document('dummy') for entity_type, pattern_conf in config.get(helper.ENTITIES, {}).items(): if not isinstance(pattern_conf['test'], list): pattern_conf['test'] = [pattern_conf['test']] for test in pattern_conf['test']: doc.text = 'dum dum {} dum'.format(test) _find_entities.consume(doc, None) entities = doc.entities.get_all() self.assertEqual(1, len(entities), msg='regex for %s found nothing' % entity_type) self.assertEqual(entity_type, entities[0][1]['type']) self.assertEqual(test, entities[0][1]['value'])
def import_scitools_yaml_to_neo4j(ctx, yaml_path, neo4j_url='bolt://localhost', user='neo4j', labels=''): """ :param labels: :param ctx: :param yaml_path: :param neo4j_url: :param user: """ label_list = to_label_list(labels) with open(yaml_path, 'r') as input_stream: scitools_db = yaml.load(input_stream) neo4j_client = connect_neo4j(ctx, neo4j_url, user) ScitoolsETL.import_to_neo4j(scitools_db, neo4j_client, labels=label_list) # noinspection PyUnusedLocal
def split_config(config): """Split the 'config' object into a set of fields. :param config: Configuration :type param: dict or str :raises: yaml.error.YAMLError if config is a str that doesn't parse """ if not isinstance(config, dict): config = yaml.load(config) db = config.get("db", None) host = config.get("host", "0.0.0.0") user_name = config.get("user_name", None) password = config.get("password", None) port = int(config.get("port", 27017)) coll = config.get("collection", None) return db, host, user_name, password, port, coll
def parse_messier_config(self, config_filepath=".messier"): """ Read YAML config file for Messier. Defaults to .messier. Supported options include: `serverspec_commands`: list of shell commands to run for Serverspec `serverspec_base_directory`: directory to cd into prior to running Serverspec """ try: config_file = open(config_filepath,'r') except IOError: config = {} else: config = yaml.load(config_file) if not config: config = {} return config # Elegant solution from https://gist.github.com/LeoHuckvale/8f50f8f2a6235512827b # Stuffing this method into class because it's harder to reference otherwise
def __init__(self, options=[], config_path=None): if config_path is None: config_path = os.path.join(os.path.expanduser("~"), ".caduc", "config.yml") if os.path.exists(config_path): config = yaml.load(open(config_path, 'r')) else: config = {} else: config = yaml.load(open(config_path, 'r')) super(Config, self).__init__(**config) for opt in options: k, v = self.parse_kv(opt) node = {} child = node keys = self.parse_key(k) for key in keys[:-1]: child[key] = {} child = child[key] child[keys[-1]] = v self.update(node)
def _get_job_parameters(self, job_spec, job_log_dir, client): with open('{}/{}'.format(self.bench_dir, job_spec, 'r')) as yml: try: job = yaml.load(yml) except YAMLError as error: log.error('Error parsing job spec in file {}/fio/{}'.format(self.bench_dir, job_spec)) log.error(error) raise error output_options = ''' write_bw_log={logdir}/output write_lat_log={logdir}/output write_hist_log={logdir}/output write_iops_log={logdir}/output '''.format(logdir=job_log_dir) job.update({'dir': self.work_dir, 'output_options': output_options, 'client': client}) return job
def _record_filter(args, base_dir): """ Save the filter provided """ filter_file = '{}/.filter'.format(base_dir) if not isfile(filter_file): # do a touch filter_file open(filter_file, 'a').close() current_filter = {} with open(filter_file) as filehandle: current_filter = yaml.load(filehandle) if current_filter is None: current_filter = {} pprint.pprint(current_filter) # filter a bunch of salt content and the target key before writing rec_args = {k: v for k, v in args.items() if k is not 'target' and not k.startswith('__')} current_filter[args['target']] = rec_args with open(filter_file, 'w') as filehandle: yaml.dump(current_filter, filehandle, default_flow_style=False)
def load_manage_dict(filename=None): manage_filename = None if not MANAGE_DICT: if filename: manage_filename = filename elif os.path.exists(MANAGE_FILE): manage_filename = MANAGE_FILE elif os.path.exists(HIDDEN_MANAGE_FILE): manage_filename = HIDDEN_MANAGE_FILE else: MANAGE_DICT.update(copy.deepcopy(default_manage_dict)) MANAGE_DICT['shell']['banner']['message'] = ( "WARNING: This is not a managed project\n" "\tPlease `exit()` and \n" "\trun `$ manage init`\n" "\tand edit `manage.yml` file with desired options" ) MANAGE_DICT['shell']['auto_import']['display'] = False if manage_filename: with open(manage_filename) as manage_file: MANAGE_DICT.update(yaml.load(manage_file)) return MANAGE_DICT
def load_parameters(stack): """load parameters from yaml file and return as dictionary""" params = [] param_path = path.join('stacks', stack, 'parameters.yaml') if not path.exists(param_path): return params with open(param_path, encoding='utf-8') as file: params_raw = yaml.load(file.read()) # build parameter dict for param in params_raw.keys(): params.append({ 'ParameterKey': param, 'ParameterValue': params_raw[param] }) return params
def run(self): ''' Called by twisted ''' # load initial config self.refresh_config() if self.config is None: logging.critical("cannot start due to error in config file") return # refresh and check status every event_period seconds self.refresh_task = task.LoopingCall(self.refresh_loop) refresh_deferred = self.refresh_task.start(self.config['event_period'], now=False) refresh_deferred.addErrback(errorCallback) # setup server for receiving blinded counts from the DC nodes and key shares from the SK nodes listen_port = self.config['listen_port'] key_path = self.config['key'] cert_path = self.config['cert'] ssl_context = ssl.DefaultOpenSSLContextFactory(key_path, cert_path) logging.info("Tally Server listening on port {}".format(listen_port)) reactor.listenSSL(listen_port, self, ssl_context) reactor.run()
def __init__(self): self._classification_rules = {} self._exclusion_rules = {} self._enrichment_rules = {} self._routing_rules = {} self._config = None try: # TODO: Absolute path? Where should this live? with open('config/klaxer.yml', 'r') as ymlfile: self._config = yaml.load(ymlfile) except yaml.YAMLError as ye: raise ConfigurationError('failed to parse config') from ye for section in self._config: # Subsequent definitions of the same service will overwrite the # previous ones. self._build_rules(section)
def process_abot_test_result(file_path): """ Process ABoT Result """ with open(file_path) as test_result: data = json.load(test_result) res = [] for tests in data: tests = update_data(tests) try: flatten_steps = tests['elements'][0].pop('flatten_steps') for steps in flatten_steps: steps['result'] = steps['step_status'] res.append(steps) except: logging.error("Could not post data to ElasticSearch host") raise return res
def post(self): if self.current_user['level'] != 0: self.custom_error() settings = { 'init_money': int(self.get_body_argument('init_money')), 'reg_type': self.get_body_argument('reg_type'), 'cookie_secret': self.get_body_argument('cookie_secret') or self.settings['cookie_secret'], 'site': { 'name': self.get_body_argument('sitename'), 'keyword': self.get_body_argument('keyword'), 'description': self.get_body_argument('description') } } self.settings.update(settings) custom_settings = {} with open(self.settings['config_file'], 'r') as f: custom_settings = yaml.load(f) custom_settings['global'].update(settings) with open(self.settings['config_file'], 'w') as f: yaml.dump(custom_settings, f, default_flow_style=False, default_style='"') self.redirect('/ushio/setting')
def parse_config(conf_file): if not os.path.isfile(conf_file): logging.error('Invalid config file: %s.' % conf_file) return False return yaml.load(open(conf_file).read())
def __init__(self, *args): self.required_options = args self['config'] = hookenv.config() with open(os.path.join(hookenv.charm_dir(), 'config.yaml')) as fp: self.config = yaml.load(fp).get('options', {})
def read_context(self, file_name): if not os.path.isabs(file_name): file_name = os.path.join(hookenv.charm_dir(), file_name) with open(file_name, 'r') as file_stream: data = yaml.load(file_stream) if not data: raise OSError("%s is empty" % file_name) return data
def get_matchmaker_map(mm_file='/etc/oslo/matchmaker_ring.json'): mm_map = {} if os.path.isfile(mm_file): with open(mm_file, 'r') as f: mm_map = json.load(f) return mm_map
def _git_yaml_load(projects_yaml): """ Load the specified yaml into a dictionary. """ if not projects_yaml: return None return yaml.load(projects_yaml)
def __init__(self, iface): self.iface = iface self.path = os.path.dirname(os.path.realpath(__file__)) locale = QSettings().value("locale/userLocale")[0:2] localePath = os.path.join(self.path, 'i18n', '{}.qm'.format(locale)) if os.path.exists(localePath): self.translator = QTranslator() self.translator.load(localePath) QCoreApplication.installTranslator(self.translator)
def createRotationLayer(self): pies = QgsMapLayerRegistry.instance().mapLayersByName(PIE_LAYER) if not pies: self.iface.messageBar().pushCritical( self.tr(u'No layer'), self.tr(u'Please add "{}" layer.').format(PIE_LAYER)) return pie = pies[0] if not pie.featureCount(): self.iface.messageBar().pushInfo( self.tr(u'No data'), self.tr(u'No features in the "{}" layer.').format(PIE_LAYER)) return if pie.isEditable(): self.iface.vectorLayerTools().saveEdits(pie) boxes = runalg('qgis:orientedminimumboundingbox', pie, True, None) boxesLayer = QgsVectorLayer(boxes['OUTPUT'], ROTATION_LAYER, 'ogr') if not boxesLayer.isValid(): self.iface.messageBar().pushCritical( self.tr(u'Access error'), self.tr(u'Failed to load a temporary processing layer.')) return self.addFieldToLayer(boxesLayer, NAME_FIELD, QVariant.String) rotIndex = boxesLayer.dataProvider().fieldNameIndex('ANGLE') nameIndex = boxesLayer.dataProvider().fieldNameIndex(NAME_FIELD) iterpie = pie.getFeatures() for box in boxesLayer.getFeatures(): name = next(iterpie)['name'] angle = round(box['ANGLE']) if box['WIDTH'] > box['HEIGHT']: angle += 90 if angle < 0 else -90 geom = QgsGeometry(box.geometry()) geom.rotate(angle, box.geometry().boundingBox().center()) boxesLayer.dataProvider().changeAttributeValues( {box.id(): {rotIndex: angle, nameIndex: name}}) boxesLayer.dataProvider().changeGeometryValues({box.id(): geom}) QgsMapLayerRegistry.instance().addMapLayer(boxesLayer) self.iface.legendInterface().setLayerVisible(boxesLayer, False) self.iface.legendInterface().setLayerVisible(pie, False) return boxesLayer
def show_io(input_dir, output_dir): swrite('\n=== DIRECTORIES ===\n\n') # Show this directory swrite("-- Current directory " + pwd() + ":\n") write_list(ls('.')) write_list(ls('./*')) write_list(ls('./*/*')) swrite("\n") # List input and output directories swrite("-- Input directory " + input_dir + ":\n") write_list(ls(input_dir)) write_list(ls(input_dir + '/*')) write_list(ls(input_dir + '/*/*')) write_list(ls(input_dir + '/*/*/*')) swrite("\n") swrite("-- Output directory " + output_dir + ":\n") write_list(ls(output_dir)) write_list(ls(output_dir + '/*')) swrite("\n") # write meta data to sdterr swrite('\n=== METADATA ===\n\n') swrite("-- Current directory " + pwd() + ":\n") try: metadata = yaml.load(open('metadata', 'r')) for key,value in metadata.items(): swrite(key + ': ') swrite(str(value) + '\n') except: swrite("none\n"); swrite("-- Input directory " + input_dir + ":\n") try: metadata = yaml.load(open(os.path.join(input_dir, 'metadata'), 'r')) for key,value in metadata.items(): swrite(key + ': ') swrite(str(value) + '\n') swrite("\n") except: swrite("none\n");
def handle(self, *args, **options): reference = 0 # open file with data directory = os.path.dirname(os.path.dirname(__file__)) path = os.path.join(directory, 'data', 'albums.yml') with open(path, 'r') as file: data = yaml.load(file) albums = data['albums'] for album in albums: # Create artists artists = [] for artist in album['artists']: try: stored_artist = Artist.objects.get(name=artist) lg.info('Artist found: %s'%stored_artist) except ObjectDoesNotExist: stored_artist = Artist.objects.create(name=artist) lg.info('Artist created: %s'%stored_artist) artists.append(stored_artist) # Find or create album try: stored_album = Album.objects.get(title=album['title']) lg.info('Album found: %s'%stored_album.title) except ObjectDoesNotExist: reference += 1 album = Album.objects.create( title=album['title'], reference=reference, picture=album['picture'] ) album.artists = artists lg.info('New album: %s'%stored_artist)
def __init__(self, conf_file): f = open(conf_file, "r") conf = yaml.load(f) f.close() cert_bin = self.__hex2bin(conf["cert_hex"]) priv = conf["priv_key"] P_bin = self.__hex2bin(priv["P_hex"]) Q_bin = self.__hex2bin(priv["Q_hex"]) d_mod_p_1_bin = self.__hex2bin(priv["d_mod_p-1_hex"]) d_mod_q_1_bin = self.__hex2bin(priv["d_mod_q-1_hex"]) inv_q_mod_p_bin = self.__hex2bin(priv["inv_q_mod_p_hex"]) self.__cert_bin = cert_bin self.__priv_key_P = int.from_bytes(P_bin, "big") self.__priv_key_Q = int.from_bytes(Q_bin, "big") self.__priv_key_d_mod_p_1 = int.from_bytes(d_mod_p_1_bin, "big") self.__priv_key_d_mod_q_1 = int.from_bytes(d_mod_q_1_bin, "big") self.__priv_key_inv_q_mod_p = int.from_bytes(inv_q_mod_p_bin, "big") f = open("yaml/global.yaml", "r") conf = yaml.load(f) f.close() global_constant_hex = re.sub(r'\s', "", conf["global_constant"]) self.__global_constant_bin = bytes.fromhex(global_constant_hex)
def __init__(self, conf_yaml): f = open(conf_yaml, "r") conf = yaml.load(f) f.close()
def load(self, filename): self.filename = filename with open(filename, 'r') as stream: self.config = yaml.load(stream)
def reload(self): self.load(self.filename)
def load(self, name, force_reload_if_unmanaged=False): if name in sys.modules and name not in self.plugins: # we're getting an already loaded module, which we has not been # loaded through PluginManager, return it from sys.modules and # add it to our list module = sys.modules[name] if force_reload_if_unmanaged: importlib.reload(module) else: module = importlib.import_module(name) self.plugins.add(name) return module
def load_yaml(self, job_yaml): obj = yaml.load(job_yaml) job_steps = [] for step in obj['steps']: handler_name = list(step.keys())[0] plugin = engine.plugins.get_job_handler(handler_name) job_steps.append(plugin(step[handler_name])) job = Job() job.steps = job_steps return job
def docs(): """ Provides the manifest.json file for the 21 endpoint crawler. """ with open('./manifest.yaml', 'r') as f: manifest = yaml.load(f) return json.dumps(manifest)
def load_yaml_config(filepath): with open(filepath) as f: return yaml.load(f)