Python six 模块,viewvalues() 实例源码

我们从Python开源项目中,提取了以下15个代码示例,用于说明如何使用six.viewvalues()

项目:plotnine    作者:has2k1    | 项目源码 | 文件源码
def setup_params(self, data):
        params = self.params.copy()
        lookup = {
            'biweight': 'biw',
            'cosine': 'cos',
            'cosine2': 'cos2',
            'epanechnikov': 'epa',
            'gaussian': 'gau',
            'triangular': 'tri',
            'triweight': 'triw',
            'uniform': 'uni'}

        with suppress(KeyError):
            params['kernel'] = lookup[params['kernel'].lower()]

        if params['kernel'] not in six.viewvalues(lookup):
            msg = ("kernel should be one of {}. "
                   "You may use the abbreviations {}")
            raise PlotnineError(msg.format(six.viewkeys(lookup),
                                           six.viewvalues(lookup)))

        return params
项目:striatum    作者:ntucllab    | 项目源码 | 文件源码
def calculate_cum_reward(policy):

    """Calculate cumulative reward with respect to time.

        Parameters
        ----------
        policy: bandit object
            The bandit algorithm you want to evaluate.

        Return
        ---------
        cum_reward: dict
            The dict stores {history_id: cumulative reward} .

        cum_n_actions: dict
            The dict stores
            {history_id: cumulative number of recommended actions}.
    """
    cum_reward = {-1: 0.0}
    cum_n_actions = {-1: 0}
    for i in range(policy.history_storage.n_histories):
        reward = policy.history_storage.get_history(i).rewards
        cum_n_actions[i] = cum_n_actions[i - 1] + len(reward)
        cum_reward[i] = cum_reward[i - 1] + sum(six.viewvalues(reward))
    return cum_reward, cum_n_actions
项目:treadmill    作者:Morgan-Stanley    | 项目源码 | 文件源码
def add_mark_rule(src_ip, environment):
    """Add an environment mark for all traffic coming from an IP.

    :param ``str`` src_ip:
        Source IP to be marked
    :param ``str`` environment:
        Environment to use for the mark
    """
    assert environment in SET_BY_ENVIRONMENT, \
        "Unknown environment: %r" % environment

    target_set = SET_BY_ENVIRONMENT[environment]
    add_ip_set(target_set, src_ip)

    # Check that the IP is not marked in any other environment
    other_env_sets = {
        env_set for env_set in six.viewvalues(SET_BY_ENVIRONMENT)
        if env_set != target_set
    }
    for other_set in other_env_sets:
        if test_ip_set(other_set, src_ip) is True:
            raise Exception('%r is already in %r', src_ip, other_set)
项目:flake8-bugbear    作者:PyCQA    | 项目源码 | 文件源码
def this_is_okay():
    d = {}
    iterkeys(d)
    six.iterkeys(d)
    six.itervalues(d)
    six.iteritems(d)
    six.iterlists(d)
    six.viewkeys(d)
    six.viewvalues(d)
    six.viewlists(d)
    itervalues(d)
    future.utils.iterkeys(d)
    future.utils.itervalues(d)
    future.utils.iteritems(d)
    future.utils.iterlists(d)
    future.utils.viewkeys(d)
    future.utils.viewvalues(d)
    future.utils.viewlists(d)
    six.next(d)
    builtins.next(d)
项目:plotnine    作者:has2k1    | 项目源码 | 文件源码
def setup_params(self, data):
        params = self.params.copy()

        valid_scale = ('area', 'count', 'width')
        if params['scale'] not in valid_scale:
            msg = "Parameter scale should be one of {}"
            raise PlotnineError(msg.format(valid_scale))

        lookup = {
            'biweight': 'biw',
            'cosine': 'cos',
            'cosine2': 'cos2',
            'epanechnikov': 'epa',
            'gaussian': 'gau',
            'triangular': 'tri',
            'triweight': 'triw',
            'uniform': 'uni'}

        with suppress(KeyError):
            params['kernel'] = lookup[params['kernel'].lower()]

        if params['kernel'] not in six.viewvalues(lookup):
            msg = ("kernel should be one of {}. "
                   "You may use the abbreviations {}")
            raise PlotnineError(msg.format(six.viewkeys(lookup),
                                           six.viewvalues()))

        missing_params = (six.viewkeys(stat_density.DEFAULT_PARAMS) -
                          six.viewkeys(params))
        for key in missing_params:
            params[key] = stat_density.DEFAULT_PARAMS[key]

        return params
项目:sd2    作者:gae123    | 项目源码 | 文件源码
def poll(self):
        for host in six.viewvalues(self.host_to_contact):
            self.handle_host(host)
项目:sd2    作者:gae123    | 项目源码 | 文件源码
def shutdown(self):
        for host in six.viewvalues(self.host_to_contact):
            proc = host.get('proc')
            kill_subprocess_process(
                proc,
                "DOCKER {} ".format(host['name']))
项目:catalyst    作者:enigmampc    | 项目源码 | 文件源码
def required_estimates_fields(columns):
    """
    Compute the set of resource columns required to serve
    `columns`.
    """
    # We also expect any of the field names that our loadable columns
    # are mapped to.
    return metadata_columns.union(viewvalues(columns))
项目:striatum    作者:ntucllab    | 项目源码 | 文件源码
def _exp3_probs(self):
        """Exp3 algorithm.
        """
        w = self._model_storage.get_model()['w']
        w_sum = sum(six.viewvalues(w))

        probs = {}
        n_actions = self._action_storage.count()
        for action_id in self._action_storage.iterids():
            probs[action_id] = ((1 - self.gamma) * w[action_id]
                                / w_sum
                                + self.gamma / n_actions)

        return probs
项目:striatum    作者:ntucllab    | 项目源码 | 文件源码
def _exp4p_score(self, context):
        """The main part of Exp4.P.
        """
        advisor_ids = list(six.viewkeys(context))

        w = self._modelstorage.get_model()['w']
        if len(w) == 0:
            for i in advisor_ids:
                w[i] = 1
        w_sum = sum(six.viewvalues(w))

        action_probs_list = []
        for action_id in self.action_ids:
            weighted_exp = [w[advisor_id] * context[advisor_id][action_id]
                            for advisor_id in advisor_ids]
            prob_vector = np.sum(weighted_exp) / w_sum
            action_probs_list.append((1 - self.n_actions * self.p_min)
                                     * prob_vector
                                     + self.p_min)
        action_probs_list = np.asarray(action_probs_list)
        action_probs_list /= action_probs_list.sum()

        estimated_reward = {}
        uncertainty = {}
        score = {}
        for action_id, action_prob in zip(self.action_ids, action_probs_list):
            estimated_reward[action_id] = action_prob
            uncertainty[action_id] = 0
            score[action_id] = action_prob
        self._modelstorage.save_model(
            {'action_probs': estimated_reward, 'w': w})

        return estimated_reward, uncertainty, score
项目:striatum    作者:ntucllab    | 项目源码 | 文件源码
def __iter__(self):
        return iter(six.viewvalues(self._actions))
项目:etc    作者:sublee    | 项目源码 | 文件源码
def canonicalize(self, include_nodes=True, sorted=False):
        """Generates a canonical :class:`etc.Node` object from this mock node.
        """
        node_class = Directory if self.dir else Value
        kwargs = {attr: getattr(self, attr) for attr in node_class.__slots__}
        if self.dir:
            if include_nodes:
                nodes = [node.canonicalize() for node in
                         six.viewvalues(kwargs['nodes'])]
                if sorted:
                    nodes.sort(key=lambda n: n.key)
                kwargs['nodes'] = nodes
            else:
                kwargs['nodes'] = []
        return node_class(**kwargs)
项目:flake8-bugbear    作者:PyCQA    | 项目源码 | 文件源码
def everything_else_is_wrong():
    d = None  # note: bugbear is no type checker
    d.iterkeys()
    d.itervalues()
    d.iteritems()
    d.iterlists()  # Djangoism
    d.viewkeys()
    d.viewvalues()
    d.viewitems()
    d.viewlists()  # Djangoism
    d.next()
    d.keys().next()
项目:sd2    作者:gae123    | 项目源码 | 文件源码
def poll(self):
        for host in six.viewvalues(self.hosts_to_ssh):
            if host['ssh']['proc']:
                host['ssh']['proc'].poll()
                while (True):
                    try:
                        line = host['ssh']['proc'].stderr.readline()
                    except IOError:
                        break
                    if line:
                        logging.info("SSH {}: {}".format(
                            host['name'], line))
                    else:
                        break
            # logging.info("%s %s",host, host['sshproc'].returncode if host['sshproc'] else '???')
            if host['ssh']['proc'] is None or host['ssh']['proc'].returncode != None:
                if host['ssh']['proc'] and host['ssh']['proc'].returncode != None:
                    log = logging.info if host['ssh']['proc'].returncode == 0 else logging.error
                    log("SSH:RC %s=%s",
                                 host['sshcmd'],
                                 host['ssh']['proc'].returncode)
                    host['ssh']['proc'] = None
                # Only try to start once every 30 seconds
                if (host['ssh'].get('last_connection') and
                            (datetime.datetime.now() - host[
                                'ssh']['last_connection']).seconds < 30):
                    continue
                logging.info("SSH:START %s", host['sshcmd'])
                try:
                    host['ssh']['proc'] = subprocess.Popen(
                        host['sshcmd'],
                        shell=True,
                        stdin=subprocess.PIPE,
                        stdout=subprocess.PIPE,
                        stderr=subprocess.PIPE,
                        close_fds=ON_POSIX)
                    host['ssh']['last_connection'] = datetime.datetime.now()
                except Exception as ex:
                    logging.warning("SSH:START:EXC: %s", ex)
                fl = fcntl.fcntl(host['ssh']['proc'].stderr, fcntl.F_GETFL)
                fcntl.fcntl(host['ssh']['proc'].stderr, fcntl.F_SETFL,
                            fl | os.O_NONBLOCK)
项目:niceman    作者:ReproNim    | 项目源码 | 文件源码
def identify_packages_from_files(self, files):
        """Identifies "packages" for a given collection of files

        From an iterative collection of files, we identify the packages
        that contain the files and any files that are not related.

        Parameters
        ----------
        files : iterable
            Container (e.g. list or set) of file paths

        Return
        ------
        (found_packages, unknown_files)
            - found_packages is a list of dicts that holds information about
              the found packages. Package dicts need at least "name" and
              "files" (that contains an array of related files)
            - unknown_files is a list of files that were not found in
              a package
        """
        unknown_files = set()
        found_packages = {}
        nb_pkg_files = 0

        # TODO: probably that _get_packagefields should create packagespecs
        # internally and just return them.  But we should make them hashable
        file_to_package_dict = self._get_packagefields_for_files(files)
        for f in files:
            # Stores the file
            if f not in file_to_package_dict:
                unknown_files.add(f)
            else:
                # TODO: pkgname should become  pkgid
                # where for packages from distributions would be name,
                # for VCS -- their path
                pkgfields = file_to_package_dict[f]
                if pkgfields is None:
                    unknown_files.add(f)
                else:
                    pkgfields_hashable = tuple(x for x in pkgfields.items())
                    if pkgfields_hashable in found_packages:
                        found_packages[pkgfields_hashable].files.append(f)
                        nb_pkg_files += 1
                    else:
                        pkg = self._create_package(**pkgfields)
                        if pkg:
                            found_packages[pkgfields_hashable] = pkg
                            # we store only non-directories within 'files'
                            if not self._session.isdir(f):
                                pkg.files.append(f)
                            nb_pkg_files += 1
                        else:
                            unknown_files.add(f)

        lgr.info("%s: %d packages with %d files, and %d other files",
                 self.__class__.__name__,
                 len(found_packages),
                 nb_pkg_files,
                 len(unknown_files))

        return list(viewvalues(found_packages)), list(unknown_files)