我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用collections.OrderedDict()。
def _get_openstack_release_string(self): """Get openstack release string. Return a string representing the openstack release. """ releases = OrderedDict([ ('precise', 'essex'), ('quantal', 'folsom'), ('raring', 'grizzly'), ('saucy', 'havana'), ('trusty', 'icehouse'), ('utopic', 'juno'), ('vivid', 'kilo'), ('wily', 'liberty'), ('xenial', 'mitaka'), ]) if self.openstack: os_origin = self.openstack.split(':')[1] return os_origin.split('%s-' % self.series)[1].split('/')[0] else: return releases[self.series]
def read_text(filename,rel_hash): id_counter = 0 nodes = {} f = codecs.open(filename, "r", "utf-8") #Add some default relations if none have been supplied (at least 1 rst and 1 multinuc) if len(rel_hash) < 2: rel_hash["elaboration_r"] = "rst" rel_hash["joint_m"] = "multinuc" rels = collections.OrderedDict(sorted(rel_hash.items())) for line in f: id_counter += 1 nodes[str(id_counter)] = NODE(str(id_counter),id_counter,id_counter,"0",0,"edu",line.strip(),rels.keys()[0],rels.values()[0]) return nodes
def create_pie_chart (input_dict, input_colors, suffix, special_item_key=None) : if special_item_key != None : special_item = dict() special_item[special_item_key] = 0 output_text = u'{{#invoke:Chart|pie chart\n' \ u'| radius = 180\n' \ u'| slices = \n' input_dict = dict(input_dict) sorted_dict = OrderedDict(sorted(input_dict.items(), key=itemgetter(1), reverse=True)) for key, value in sorted_dict.iteritems() : if special_item_key == None or key != special_item_key : output_text += u' ( %d: %s : %s)\n' %(value, key, input_colors[key]) else : special_item[special_item_key] = value if special_item_key != None : output_text += u' ( %d: %s : %s)\n' % (special_item[special_item_key], special_item_key, input_colors[special_item_key]) output_text += u'| units suffix = _%s\n' \ u'| percent = true\n' \ u'}}\n' %(suffix) return output_text
def define_services(config): """Define the service settings for the current app. Arguments: config (:py:class:`list`): The service configuration required. Returns: :py:class:`collections.OrderedDict`: Configured services. Raises: :py:class:`ValueError`: If a non-existent service is requested. """ services = OrderedDict() for settings in config: name = settings['name'] if name not in SERVICES: logger.warning('unknown service %r', name) continue services[uuid4().hex] = SERVICES[name].from_config(**settings) return services
def artists(query=None): """List/search artists.""" artists = OrderedDict() if query: out = mpc('search', ['artist', query], ('-f', '%artist%')).strip() else: out = mpc('list', ['artist']).strip() if not out: return [] results = out.split('\n') log.debug('results=%r', results) for name in results: artists[name] = True return artists.keys()
def albums(query=None): """List/search all artists.""" albums = OrderedDict() if query: out = mpc('search', ['album', query], ('-f', '%album%')).strip() else: out = mpc('list', ['album']).strip() if not out: return [] results = out.split('\n') log.debug('results=%r', results) for name in results: albums[name] = True return albums.keys()
def commit_history(cli): """ Parse output of "show configuration history commit reverse detail" """ result = [] record = OrderedDict() for line in cli.splitlines(): r = re.search(' ([A-Z][a-z]+(?: ID)?): (.*?) +([A-Z][a-z]+): (.*)', line) if not r: continue record[r.group(1)] = r.group(2) record[r.group(3)] = r.group(4) if r.group(3) == 'Comment': result.append(record) record = OrderedDict() return result
def _reply(self, json_reply): """ Handle a reply that came in over the transport provided """ if not json_reply.startswith('{'): self.sdata.log('Received non-JSON data: "{}"'.format(json_reply)) return reply = json.loads(json_reply, object_pairs_hook=OrderedDict) if reply['jsonrpc'] != '2.0' or 'id' not in reply or reply['id'] is None: self.sdata.log('Received bad JSON-RPC reply: {}'.format(json_reply)) if len(self.pending_reply_map) == 1: # lucky! can guess a pending reply to kill this_id = self.pending_reply_map.keys()[0] d = self.pending_reply_map[this_id] del self.pending_reply_map[this_id] e = JsonRpcException('Bad reply: {}'.format(json_reply)) d.errback(e) return this_id = int(reply['id']) if 'method' in reply and this_id in self.pending_reply_map: self.sdata.log('Got echo of request for {}, ignoring'.format(this_id)) else: d = self.pending_reply_map[this_id] del self.pending_reply_map[this_id] d.callback(reply)
def get_prototxt_string(inp_dict,ntabs=0,inp_key=None): output_string = '' if type(inp_dict) is list: for elem in inp_dict: output_string += ' '*ntabs + str(inp_key) + get_prototxt_string(elem,ntabs)+'\n' elif type(inp_dict) is OrderedDict: output_string += ' {\n' for key,value in inp_dict.iteritems(): if type(value) is list: output_string += ' '*ntabs + get_prototxt_string(value,ntabs+1,key) + '\n' else: output_string += ' '*ntabs + str(key) +' ' + get_prototxt_string(value,ntabs+1,key) + '\n' output_string += ' '*ntabs + '}\n' else: output_string += ' : '+get_prototxt_format(inp_key,inp_dict) return output_string
def read_solver(solver_file): output_dict = OrderedDict() with open(solver_file, 'r') as inp_file: for line in inp_file: lv = line.strip() if not(len(lv) == 0 or lv[0].strip().startswith('#')): k,v = [x.strip() for x in lv.split(':')] v = get_formatted_input(v) if k not in output_dict: output_dict[k] = v else: if type(output_dict[k]) is list: output_dict[k].append(v) else: output_dict[k] = [output_dict[k], v] return output_dict
def sprite_to_dict(self,sprite,bone=None): dict_sprites = OrderedDict() dict_sprites["name"] = sprite dict_sprites["type"] = "SPRITE" dict_sprites["node_path"] = str(self.get_node_path(bpy.data.objects[sprite],[]))#,suffix=sprite)) dict_sprites["resource_path"] = self.get_sprite_path(sprite) dict_sprites["pivot_offset"] = self.get_sprite_offset(sprite) dict_sprites["position"] = self.get_relative_mesh_pos(bone,bpy.data.objects[sprite]) dict_sprites["rotation"] = self.get_sprite_rotation(sprite) dict_sprites["scale"] = self.get_sprite_scale(sprite) dict_sprites["opacity"] = self.get_sprite_opacity(sprite) dict_sprites["z"] = self.get_z_value(sprite) dict_sprites["tiles_x"] = self.get_sprite_tilesize(sprite)[0] dict_sprites["tiles_y"] = self.get_sprite_tilesize(sprite)[1] dict_sprites["frame_index"] = self.get_sprite_frame_index(sprite) dict_sprites["children"] = [] for child in bpy.data.objects[sprite].children: if child.type == "MESH": dict_sprites["children"].append(self.sprite_to_dict(child.name,bpy.data.objects[sprite])) return dict_sprites
def keyframe_to_dict(self,track,property,value,channels,key): time_idx_hist = channels[key][1]["time_idx_hist"] if os.path.basename(track) == property: if time_idx_hist in channels[key][0]: if channels[key][0][time_idx_hist]["value"] != value: dict_value_entry = OrderedDict() dict_value_entry["value"] = value channels[key][0][self.time_idx] = dict_value_entry if time_idx_hist != self.time_idx_last: channels[key][0][self.time_idx_last] = channels[key][0][time_idx_hist] channels[key][1]["time_idx_hist"] = self.time_idx elif self.f == 0 or (self.f and self.restpose): dict_value_entry = OrderedDict() dict_value_entry["value"] = value channels[key][0][self.time_idx] = dict_value_entry channels[key][1]["time_idx_hist"] = self.time_idx
def __init__(self, size): self.not_in_cache = not_in_cache = object() cache = _OrderedDict() def get(self, key): return cache.get(key, not_in_cache) def set(self, key, value): cache[key] = value if len(cache) > size: cache.popitem(False) def clear(self): cache.clear() self.get = types.MethodType(get, self) self.set = types.MethodType(set, self) self.clear = types.MethodType(clear, self)
def __init__(*args, **kwds): '''Initialize an ordered dictionary. The signature is the same as regular dictionaries, but keyword arguments are not recommended because their insertion order is arbitrary. ''' if not args: raise TypeError("descriptor '__init__' of 'OrderedDict' object " "needs an argument") self, *args = args if len(args) > 1: raise TypeError('expected at most 1 arguments, got %d' % len(args)) try: self.__root except AttributeError: self.__hardroot = _Link() self.__root = root = _proxy(self.__hardroot) root.prev = root.next = root self.__map = {} self.__update(*args, **kwds)
def _AnyMessageToJsonObject(self, message): """Converts Any message according to Proto3 JSON Specification.""" if not message.ListFields(): return {} # Must print @type first, use OrderedDict instead of {} js = OrderedDict() type_url = message.type_url js['@type'] = type_url sub_message = _CreateMessageFromTypeUrl(type_url) sub_message.ParseFromString(message.value) message_descriptor = sub_message.DESCRIPTOR full_name = message_descriptor.full_name if _IsWrapperMessage(message_descriptor): js['value'] = self._WrapperMessageToJsonObject(sub_message) return js if full_name in _WKTJSONMETHODS: js['value'] = methodcaller(_WKTJSONMETHODS[full_name][0], sub_message)(self) return js return self._RegularMessageToJsonObject(sub_message, js)
def __init__(self, config): self.service = None self.webServer = None self.config = config self.httpsPort = int(self.config.get('web', 'httpsPort')) self.httpPort = int(self.config.get('web', 'httpPort')) self.adminPasswordHash = self.config.get('web', 'adminPasswordHash') self.apiSecret = self.config.get('web', 'apiSecret') self.uploadDir = self.config.get('web', 'uploadDir') self.dbFile = self.config.get('web', 'dbFile') self.httpsCertFile = self.config.get('web', 'httpsCertFile') self.httpsKeyFile = self.config.get('web', 'httpsKeyFile') self.httpsChainFile = self.config.get('web', 'httpsChainFile') self.localVideoPort = int(self.config.get('web', 'localVideoPort')) dir = os.path.dirname(os.path.realpath(sys.argv[0])) self.database = database.Database(self.dbFile) self.deviceConfig = dict() for deviceId, jsonConf in dict(self.config.items('devices')).iteritems(): self.deviceConfig[deviceId] = json.loads(jsonConf, object_pairs_hook=OrderedDict) self.trends = dict() self.lock = threading.Lock()
def set_key(dotenv_path, key_to_set, value_to_set, quote_mode="always"): """ Adds or Updates a key/value to the given .env If the .env path given doesn't exist, fails instead of risking creating an orphan .env somewhere in the filesystem """ key_to_set = str(key_to_set) value_to_set = str(value_to_set).strip("'").strip('"') if not os.path.exists(dotenv_path): warnings.warn("can't write to %s - it doesn't exist." % dotenv_path) return None, key_to_set, value_to_set dotenv_as_dict = OrderedDict(parse_dotenv(dotenv_path)) dotenv_as_dict[key_to_set] = value_to_set success = flatten_and_write(dotenv_path, dotenv_as_dict, quote_mode) return success, key_to_set, value_to_set
def encode_struct_tree(self, validator, value): assert type(value) in validator.definition._pytype_to_tag_and_subtype_, \ '%r is not a serializable subtype of %r.' % (type(value), validator.definition) tags, subtype = validator.definition._pytype_to_tag_and_subtype_[type(value)] assert len(tags) == 1, tags assert not isinstance(subtype, bv.StructTree), \ 'Cannot serialize type %r because it enumerates subtypes.' % subtype.definition if self.old_style: d = { tags[0]: self.encode_struct(subtype, value), } else: d = collections.OrderedDict() d['.tag'] = tags[0] d.update(self.encode_struct(subtype, value)) return d
def __init__(self, text, code=None): if code is not None: self.code = code self.text = text self.xml = None # TODO(dcramer): pull in XML support from Jira if text: try: self.json = json.loads(text, object_pairs_hook=OrderedDict) except (JSONDecodeError, ValueError): if self.text[:5] == "<?xml": # perhaps it's XML? self.xml = BeautifulStoneSoup(self.text) # must be an awful code. self.json = None else: self.json = None super(ApiError, self).__init__(text[:128])
def omw_welcome(name=None): lang_id, lang_code = fetch_langs() src_meta=fetch_src_meta() ### sort by language, project version (Newest first) src_sort=od() keys=list(src_meta.keys()) keys.sort(key=lambda x: src_meta[x]['version'],reverse=True) keys.sort(key=lambda x: src_meta[x]['id']) keys.sort(key=lambda x: lang_id[lang_code['code'][src_meta[x]['language']]][1]) for k in keys: src_sort[k] = src_meta[k] return render_template('omw_welcome.html', src_meta=src_sort, lang_id=lang_id, lang_code=lang_code, licenses=licenses)
def __init__(self, instance): """ Initiate registry with pre-loaded apps. :param instance: Instance of the controller. :type instance: pyplanet.core.instance.Instance """ self.instance = instance self.apps = OrderedDict() self.unloaded_apps = OrderedDict() # Set ready states. self.apps_ready = self.ready = False # Set a lock for threading. self._lock = threading.Lock() # Listen to events self.instance.signals.listen('contrib.mode:script_mode_changed', self._on_mode_change)
def restart_map(): """Determine the correct resource map to be passed to charmhelpers.core.restart_on_change() based on the services configured. :returns dict: A dictionary mapping config file to lists of services that should be restarted when file changes. """ _map = [] for f, ctxt in CONFIG_FILES.items(): svcs = [] for svc in ctxt['services']: svcs.append(svc) if svcs: _map.append((f, svcs)) return OrderedDict(_map)
def _check_listening_on_services_ports(services, test=False): """Check that the unit is actually listening (has the port open) on the ports that the service specifies are open. If test is True then the function returns the services with ports that are open rather than closed. Returns an OrderedDict of service: ports and a list of booleans @param services: OrderedDict(service: [port, ...], ...) @param test: default=False, if False, test for closed, otherwise open. @returns OrderedDict(service: [port-not-open, ...]...), [boolean] """ test = not(not(test)) # ensure test is True or False all_ports = list(itertools.chain(*services.values())) ports_states = [port_has_listener('0.0.0.0', p) for p in all_ports] map_ports = OrderedDict() matched_ports = [p for p, opened in zip(all_ports, ports_states) if opened == test] # essentially opened xor test for service, ports in services.items(): set_ports = set(ports).intersection(matched_ports) if set_ports: map_ports[service] = set_ports return map_ports, ports_states
def ordered(orderme): """Converts the provided dictionary into a collections.OrderedDict. The items in the returned OrderedDict will be inserted based on the natural sort order of the keys. Nested dictionaries will also be sorted in order to ensure fully predictable ordering. :param orderme: the dict to order :return: collections.OrderedDict :raises: ValueError: if `orderme` isn't a dict instance. """ if not isinstance(orderme, dict): raise ValueError('argument must be a dict type') result = OrderedDict() for k, v in sorted(six.iteritems(orderme), key=lambda x: x[0]): if isinstance(v, dict): result[k] = ordered(v) else: result[k] = v return result
def _get_openstack_release_string(self): """Get openstack release string. Return a string representing the openstack release. """ releases = OrderedDict([ ('trusty', 'icehouse'), ('xenial', 'mitaka'), ('yakkety', 'newton'), ('zesty', 'ocata'), ('artful', 'pike'), ('bionic', 'queens'), ]) if self.openstack: os_origin = self.openstack.split(':')[1] return os_origin.split('%s-' % self.series)[1].split('/')[0] else: return releases[self.series]
def iterative_consensus(sequences, program='muscle-medium', threshold=0.6, subsample_size=200, maximum_subsample_size=1600): """ Compute a consensus sequence of the given sequences, but do not use all sequences if there are many: First, try to compute the consensus from a small subsample. If there are 'N' bases, increase the subsample size and repeat until either there are no more 'N' bases, all available sequences have been used or maximum_subsample_size is reached. """ while True: sample = downsampled(sequences, subsample_size) aligned = multialign(OrderedDict(enumerate(sample)), program=program) cons = consensus(aligned, threshold=threshold).strip('N') if 'N' not in cons: # This consensus is good enough break if len(sequences) <= subsample_size: # We have already used all the sequences that are available break subsample_size *= 2 if subsample_size > maximum_subsample_size: break return cons
def rollover(self): self.starting_value = self.ending_value self.starting_exposure = self.ending_exposure self.starting_cash = self.ending_cash self.period_cash_flow = 0.0 self.pnl = 0.0 self.processed_transactions = {} self.orders_by_modified = {} self.orders_by_id = OrderedDict() payout_assets = self._payout_last_sale_prices.keys() for asset in payout_assets: if asset in self._payout_last_sale_prices: self._payout_last_sale_prices[asset] = \ self.position_tracker.positions[asset].last_sale_price else: del self._payout_last_sale_prices[asset]
def __setstate__(self, state): OLDEST_SUPPORTED_STATE = 3 version = state.pop(VERSION_LABEL) if version < OLDEST_SUPPORTED_STATE: raise BaseException("PerformancePeriod saved state is too old.") processed_transactions = {} processed_transactions.update(state.pop('processed_transactions')) orders_by_id = OrderedDict() orders_by_id.update(state.pop('orders_by_id')) orders_by_modified = {} orders_by_modified.update(state.pop('orders_by_modified')) self.processed_transactions = processed_transactions self.orders_by_id = orders_by_id self.orders_by_modified = orders_by_modified self._execution_cash_flow_multipliers = {} self.__dict__.update(state)
def __setstate__(self, state): OLDEST_SUPPORTED_STATE = 3 version = state.pop(VERSION_LABEL) if version < OLDEST_SUPPORTED_STATE: raise BaseException("PositionTracker saved state is too old.") self.asset_finder = state['asset_finder'] self.positions = positiondict() # note that positions_store is temporary and gets regened from # .positions self._positions_store = zp.Positions() self._unpaid_dividends = state['unpaid_dividends'] # Arrays for quick calculations of positions value self._position_value_multipliers = OrderedDict() self._position_exposure_multipliers = OrderedDict() # Update positions is called without a finder self.update_positions(state['positions'])
def load_names_data(): fp = os.path.join(tempfile.gettempdir(), ZIP_NAME) if not os.path.exists(fp): r = requests.get(URL_NAMES) with open(fp, 'wb') as f: f.write(r.content) post = collections.OrderedDict() with zipfile.ZipFile(fp) as zf: # get ZipInfo instances for zi in sorted(zf.infolist(), key=lambda zi: zi.filename): fn = zi.filename if fn.startswith('yob'): year = int(fn[3:7]) df = pd.read_csv( zf.open(zi), header=None, names=('name', 'gender', 'count')) df['year'] = year post[year] = df df = pd.concat(post.values()) df.set_index('name', inplace=True, drop=True) return df
def build_metrics_summary_csv(filename, sample_properties, sample_data, pipeline): metrics, alarms, charts, all_prefixes = get_constants_for_pipeline(pipeline) tables, _ = build_tables(sample_properties, metrics, alarms, sample_data, all_prefixes=all_prefixes) if not tables: sys.stderr.write("No metrics tables were generated, skipping CSV generation.\n") return csv_metrics = collections.OrderedDict() for table in tables: if not table: continue for metric, _, value in table['rows']: if type(metric) == dict: metric = metric['v'] if type(value) == dict: value = value['v'] if metric not in csv_metrics: csv_metrics[metric] = value with open(filename, 'wb') as f: writer = csv.writer(f) writer.writerow(csv_metrics.keys()) writer.writerow(csv_metrics.values())
def getAntennaLogs(): ''' Retrieve information about antenna changes @return dictionary of antenna changes ''' store_location = data_util.getDataLocation('ngl_gps') store = pd.HDFStore(store_location, 'r') logs_df = store['ngl_steps'] store.close() metadata = DataFetcher.getStationMetadata() logs_dict = OrderedDict() for station in metadata.index: offset_dates = logs_df[logs_df['Station']==station].index.unique() offset_dates = pd.Series(offset_dates) logs_dict[station] = offset_dates return logs_dict
def getModisData(dataset, variable_name): ''' Loads modis data @param dataset: netCDF4 dataset @param variable_name: Name of variable to extract from dataset @return (modis_data, metadata) ''' variable = dataset[variable_name] variable.set_auto_maskandscale(False) data = variable[:,:] metadata = OrderedDict() for attribute in variable.ncattrs(): metadata[attribute] = variable.getncattr(attribute) return data,metadata
def __init__(self, option_strings, prog, parser_class, dest=SUPPRESS, help=None, metavar=None): self._prog_prefix = prog self._parser_class = parser_class self._name_parser_map = _collections.OrderedDict() self._choices_actions = [] super(_SubParsersAction, self).__init__( option_strings=option_strings, dest=dest, nargs=PARSER, choices=self._name_parser_map, help=help, metavar=metavar)
def __init__(self, servo_group, event, belt_speed, frequency, mqtt_client, master_shadow, args=(), kwargs={}): super(BeltControlThread, self).__init__( name="belt_control_thread", args=args, kwargs=kwargs ) self.sg = servo_group self.rolling = False self.cmd_event = event self.belt_speed = belt_speed self.frequency = frequency self.reversed = False self.active_state = 'initialized' self.last_state = 'initialized' self.control_stages = collections.OrderedDict() self.control_stages['roll'] = self.roll self.mqttc = mqtt_client self.master_shadow = master_shadow self.master_shadow.shadowRegisterDeltaCallback(self.shadow_mgr) log.debug("[bct.__init__] shadowRegisterDeltaCallback()")
def __init__(self, servo_group, event, stage_topic, mqtt_client, master_shadow, args=(), kwargs={}): super(ArmControlThread, self).__init__( name="arm_control_thread", args=args, kwargs=kwargs ) self.sg = servo_group log.debug("[act.__init__] servo_group:{0}".format(self.sg)) self.cmd_event = event self.active_state = 'initialized' self.last_state = 'initialized' self.control_stages = collections.OrderedDict() self.control_stages['home'] = self.home self.control_stages['find'] = self.find self.control_stages['pick'] = self.pick self.control_stages['sort'] = self.sort self.stage_topic = stage_topic self.mqtt_client = mqtt_client self.master_shadow = master_shadow self.found_box = None self.master_shadow.shadowRegisterDeltaCallback(self.shadow_mgr) log.debug("[arm.__init__] shadowRegisterDeltaCallback()")
def main(): experiment = EnergyExp() queue = OrderedDict() queue['HMC'] = experiment.run_baseline_hmc queue['SGLD'] = experiment.run_sgld queue['SGHMC'] = experiment.run_sghmc queue['pSGLD'] = experiment.run_psgld queue['BBB'] = experiment.run_bbb # queue["PBP"] = experiment.run_pbp queue['Dropout'] = experiment.run_dropout experiment.run_queue(queue, cpu=True) experiment.report_metrics_table(queue) del queue['HMC'] max_time = 15 experiment.plot_multiple_metrics('HMC', queue.keys(), ['KS'], max_time=max_time, title_name='KS distance') experiment.plot_multiple_metrics('HMC', queue.keys(), ['Precision'], max_time=max_time, title_name='Precision') experiment.plot_multiple_metrics('HMC', queue.keys(), ['Recall'], max_time=max_time, title_name='Recall') # experiment.plot_multiple_metrics("HMC", queue.keys(), ["KL"]) # experiment.plot_multiple_metrics("HMC", queue.keys(), ["F1"], max_time=max_time, title_name="F1 score") # experiment.plot_multiple_metrics("HMC", queue.keys(), ["IoU"], max_time=max_time)
def main(): experiment = BostonHousingExp() queue = OrderedDict() # queue['HMC'] = experiment.run_baseline_hmc # queue['SGLD'] = experiment.run_sgld # queue['SGHMC'] = experiment.run_sghmc # queue['pSGLD'] = experiment.run_psgld queue["BBB"] = experiment.run_bbb # queue["PBP"] = experiment.run_pbp queue['Dropout'] = experiment.run_dropout experiment.run_queue(queue, cpu=True) experiment.report_metrics_table(queue) del queue['HMC'] max_time = 15 experiment.plot_multiple_metrics('HMC', queue.keys(), ['KS'], max_time=max_time, title_name='KS distance') experiment.plot_multiple_metrics('HMC', queue.keys(), ['Precision'], max_time=max_time, title_name='Precision') experiment.plot_multiple_metrics('HMC', queue.keys(), ['Recall'], max_time=max_time, title_name='Recall') # experiment.plot_multiple_metrics("HMC", queue.keys(), ["KL"]) # experiment.plot_multiple_metrics("HMC", queue.keys(), ["F1"], max_time=max_time, title_name="F1 score") # experiment.plot_multiple_metrics("HMC", queue.keys(), ["IoU"], max_time=max_time)
def __init__(self): self.executable_path = None self.definitions = {} self.flags = {} self.include_dirs = DirectorySet() self.linker_flags = [] self.library_dirs = DirectorySet() self.libraries = collections.OrderedDict() self.unique_flags = {} self.recompile = recompile.PreprocessHash(self)
def get_root_apiview(self): handlers = sorted(self.handlers.items(), key = lambda x : x[1]['display']) def list_handlers(self, request, *args, **kwargs): resp = OrderedDict() # get all names for regex, data in handlers: name = data['name'] alias = data['display'] if alias != APIROOT_NAME: try: resp[alias] = reverse(name, args = args, kwargs = kwargs, request = request, format = kwargs.get('format', None)) except NoReverseMatch: # here we've got a path with defined params which are not specified in request continue return Response(resp, status = status.HTTP_200_OK) # get available info from schema info = self.schema.get('info', None) name = info.get('title', APIROOT_NAME).strip(' ').replace(' ', '_') vers = info.get('version', 'unknown') desc = info.get('description', 'Enumerates all available endpoints for current schema') # construct class apiroot = LazyClass(name, SwaggerViewClass) apiroot.set_attr('get', list_handlers) apiroot.set_attr('__doc__', 'v.' + vers + '\n\n' + desc) return apiroot().as_view() #: main schema processing function
def test_restart_map(self): _restart_map = nutils.restart_map() expect = OrderedDict([ (nutils.PG_CONF, ['plumgrid']), (nutils.PG_HN_CONF, ['plumgrid']), (nutils.PG_HS_CONF, ['plumgrid']), (nutils.OPS_CONF, ['plumgrid']), (nutils.PG_IFCS_CONF, []), ]) self.assertEqual(expect, _restart_map) for item in _restart_map: self.assertTrue(item in _restart_map) self.assertTrue(expect[item] == _restart_map[item])
def _extract_services_list_helper(services): """Extract a OrderedDict of {service: [ports]} of the supplied services for use by the other functions. The services object can either be: - None : no services were passed (an empty dict is returned) - a list of strings - A dictionary (optionally OrderedDict) {service_name: {'service': ..}} - An array of [{'service': service_name, ...}, ...] @param services: see above @returns OrderedDict(service: [ports], ...) """ if services is None: return {} if isinstance(services, dict): services = services.values() # either extract the list of services from the dictionary, or if # it is a simple string, use that. i.e. works with mixed lists. _s = OrderedDict() for s in services: if isinstance(s, dict) and 'service' in s: _s[s['service']] = s.get('ports', []) if isinstance(s, str): _s[s] = [] return _s
def _check_running_services(services): """Check that the services dict provided is actually running and provide a list of (service, boolean) tuples for each service. Returns both a zipped list of (service, boolean) and a list of booleans in the same order as the services. @param services: OrderedDict of strings: [ports], one for each service to check. @returns [(service, boolean), ...], : results for checks [boolean] : just the result of the service checks """ services_running = [service_running(s) for s in services] return list(zip(services, services_running)), services_running