我们从Python开源项目中,提取了以下15个代码示例,用于说明如何使用six.viewvalues()。
def setup_params(self, data): params = self.params.copy() lookup = { 'biweight': 'biw', 'cosine': 'cos', 'cosine2': 'cos2', 'epanechnikov': 'epa', 'gaussian': 'gau', 'triangular': 'tri', 'triweight': 'triw', 'uniform': 'uni'} with suppress(KeyError): params['kernel'] = lookup[params['kernel'].lower()] if params['kernel'] not in six.viewvalues(lookup): msg = ("kernel should be one of {}. " "You may use the abbreviations {}") raise PlotnineError(msg.format(six.viewkeys(lookup), six.viewvalues(lookup))) return params
def calculate_cum_reward(policy): """Calculate cumulative reward with respect to time. Parameters ---------- policy: bandit object The bandit algorithm you want to evaluate. Return --------- cum_reward: dict The dict stores {history_id: cumulative reward} . cum_n_actions: dict The dict stores {history_id: cumulative number of recommended actions}. """ cum_reward = {-1: 0.0} cum_n_actions = {-1: 0} for i in range(policy.history_storage.n_histories): reward = policy.history_storage.get_history(i).rewards cum_n_actions[i] = cum_n_actions[i - 1] + len(reward) cum_reward[i] = cum_reward[i - 1] + sum(six.viewvalues(reward)) return cum_reward, cum_n_actions
def add_mark_rule(src_ip, environment): """Add an environment mark for all traffic coming from an IP. :param ``str`` src_ip: Source IP to be marked :param ``str`` environment: Environment to use for the mark """ assert environment in SET_BY_ENVIRONMENT, \ "Unknown environment: %r" % environment target_set = SET_BY_ENVIRONMENT[environment] add_ip_set(target_set, src_ip) # Check that the IP is not marked in any other environment other_env_sets = { env_set for env_set in six.viewvalues(SET_BY_ENVIRONMENT) if env_set != target_set } for other_set in other_env_sets: if test_ip_set(other_set, src_ip) is True: raise Exception('%r is already in %r', src_ip, other_set)
def this_is_okay(): d = {} iterkeys(d) six.iterkeys(d) six.itervalues(d) six.iteritems(d) six.iterlists(d) six.viewkeys(d) six.viewvalues(d) six.viewlists(d) itervalues(d) future.utils.iterkeys(d) future.utils.itervalues(d) future.utils.iteritems(d) future.utils.iterlists(d) future.utils.viewkeys(d) future.utils.viewvalues(d) future.utils.viewlists(d) six.next(d) builtins.next(d)
def setup_params(self, data): params = self.params.copy() valid_scale = ('area', 'count', 'width') if params['scale'] not in valid_scale: msg = "Parameter scale should be one of {}" raise PlotnineError(msg.format(valid_scale)) lookup = { 'biweight': 'biw', 'cosine': 'cos', 'cosine2': 'cos2', 'epanechnikov': 'epa', 'gaussian': 'gau', 'triangular': 'tri', 'triweight': 'triw', 'uniform': 'uni'} with suppress(KeyError): params['kernel'] = lookup[params['kernel'].lower()] if params['kernel'] not in six.viewvalues(lookup): msg = ("kernel should be one of {}. " "You may use the abbreviations {}") raise PlotnineError(msg.format(six.viewkeys(lookup), six.viewvalues())) missing_params = (six.viewkeys(stat_density.DEFAULT_PARAMS) - six.viewkeys(params)) for key in missing_params: params[key] = stat_density.DEFAULT_PARAMS[key] return params
def poll(self): for host in six.viewvalues(self.host_to_contact): self.handle_host(host)
def shutdown(self): for host in six.viewvalues(self.host_to_contact): proc = host.get('proc') kill_subprocess_process( proc, "DOCKER {} ".format(host['name']))
def required_estimates_fields(columns): """ Compute the set of resource columns required to serve `columns`. """ # We also expect any of the field names that our loadable columns # are mapped to. return metadata_columns.union(viewvalues(columns))
def _exp3_probs(self): """Exp3 algorithm. """ w = self._model_storage.get_model()['w'] w_sum = sum(six.viewvalues(w)) probs = {} n_actions = self._action_storage.count() for action_id in self._action_storage.iterids(): probs[action_id] = ((1 - self.gamma) * w[action_id] / w_sum + self.gamma / n_actions) return probs
def _exp4p_score(self, context): """The main part of Exp4.P. """ advisor_ids = list(six.viewkeys(context)) w = self._modelstorage.get_model()['w'] if len(w) == 0: for i in advisor_ids: w[i] = 1 w_sum = sum(six.viewvalues(w)) action_probs_list = [] for action_id in self.action_ids: weighted_exp = [w[advisor_id] * context[advisor_id][action_id] for advisor_id in advisor_ids] prob_vector = np.sum(weighted_exp) / w_sum action_probs_list.append((1 - self.n_actions * self.p_min) * prob_vector + self.p_min) action_probs_list = np.asarray(action_probs_list) action_probs_list /= action_probs_list.sum() estimated_reward = {} uncertainty = {} score = {} for action_id, action_prob in zip(self.action_ids, action_probs_list): estimated_reward[action_id] = action_prob uncertainty[action_id] = 0 score[action_id] = action_prob self._modelstorage.save_model( {'action_probs': estimated_reward, 'w': w}) return estimated_reward, uncertainty, score
def __iter__(self): return iter(six.viewvalues(self._actions))
def canonicalize(self, include_nodes=True, sorted=False): """Generates a canonical :class:`etc.Node` object from this mock node. """ node_class = Directory if self.dir else Value kwargs = {attr: getattr(self, attr) for attr in node_class.__slots__} if self.dir: if include_nodes: nodes = [node.canonicalize() for node in six.viewvalues(kwargs['nodes'])] if sorted: nodes.sort(key=lambda n: n.key) kwargs['nodes'] = nodes else: kwargs['nodes'] = [] return node_class(**kwargs)
def everything_else_is_wrong(): d = None # note: bugbear is no type checker d.iterkeys() d.itervalues() d.iteritems() d.iterlists() # Djangoism d.viewkeys() d.viewvalues() d.viewitems() d.viewlists() # Djangoism d.next() d.keys().next()
def poll(self): for host in six.viewvalues(self.hosts_to_ssh): if host['ssh']['proc']: host['ssh']['proc'].poll() while (True): try: line = host['ssh']['proc'].stderr.readline() except IOError: break if line: logging.info("SSH {}: {}".format( host['name'], line)) else: break # logging.info("%s %s",host, host['sshproc'].returncode if host['sshproc'] else '???') if host['ssh']['proc'] is None or host['ssh']['proc'].returncode != None: if host['ssh']['proc'] and host['ssh']['proc'].returncode != None: log = logging.info if host['ssh']['proc'].returncode == 0 else logging.error log("SSH:RC %s=%s", host['sshcmd'], host['ssh']['proc'].returncode) host['ssh']['proc'] = None # Only try to start once every 30 seconds if (host['ssh'].get('last_connection') and (datetime.datetime.now() - host[ 'ssh']['last_connection']).seconds < 30): continue logging.info("SSH:START %s", host['sshcmd']) try: host['ssh']['proc'] = subprocess.Popen( host['sshcmd'], shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=ON_POSIX) host['ssh']['last_connection'] = datetime.datetime.now() except Exception as ex: logging.warning("SSH:START:EXC: %s", ex) fl = fcntl.fcntl(host['ssh']['proc'].stderr, fcntl.F_GETFL) fcntl.fcntl(host['ssh']['proc'].stderr, fcntl.F_SETFL, fl | os.O_NONBLOCK)
def identify_packages_from_files(self, files): """Identifies "packages" for a given collection of files From an iterative collection of files, we identify the packages that contain the files and any files that are not related. Parameters ---------- files : iterable Container (e.g. list or set) of file paths Return ------ (found_packages, unknown_files) - found_packages is a list of dicts that holds information about the found packages. Package dicts need at least "name" and "files" (that contains an array of related files) - unknown_files is a list of files that were not found in a package """ unknown_files = set() found_packages = {} nb_pkg_files = 0 # TODO: probably that _get_packagefields should create packagespecs # internally and just return them. But we should make them hashable file_to_package_dict = self._get_packagefields_for_files(files) for f in files: # Stores the file if f not in file_to_package_dict: unknown_files.add(f) else: # TODO: pkgname should become pkgid # where for packages from distributions would be name, # for VCS -- their path pkgfields = file_to_package_dict[f] if pkgfields is None: unknown_files.add(f) else: pkgfields_hashable = tuple(x for x in pkgfields.items()) if pkgfields_hashable in found_packages: found_packages[pkgfields_hashable].files.append(f) nb_pkg_files += 1 else: pkg = self._create_package(**pkgfields) if pkg: found_packages[pkgfields_hashable] = pkg # we store only non-directories within 'files' if not self._session.isdir(f): pkg.files.append(f) nb_pkg_files += 1 else: unknown_files.add(f) lgr.info("%s: %d packages with %d files, and %d other files", self.__class__.__name__, len(found_packages), nb_pkg_files, len(unknown_files)) return list(viewvalues(found_packages)), list(unknown_files)