Python six 模块,string_types() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用six.string_types()

项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def bool_from_string(value):
    """Interpret string value as boolean.

    Returns True if value translates to True otherwise False.
    """
    if isinstance(value, six.string_types):
        value = six.text_type(value)
    else:
        msg = "Unable to interpret non-string value '%s' as boolean" % (value)
        raise ValueError(msg)

    value = value.strip().lower()

    if value in ['y', 'yes', 'true', 't', 'on']:
        return True
    elif value in ['n', 'no', 'false', 'f', 'off']:
        return False

    msg = "Unable to interpret string value '%s' as boolean" % (value)
    raise ValueError(msg)
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def log(message, level=None):
    """Write a message to the juju log"""
    command = ['juju-log']
    if level:
        command += ['-l', level]
    if not isinstance(message, six.string_types):
        message = repr(message)
    command += [message]
    # Missing juju-log should not cause failures in unit tests
    # Send log output to stderr
    try:
        subprocess.call(command)
    except OSError as e:
        if e.errno == errno.ENOENT:
            if level:
                message = "{}: {}".format(level, message)
            message = "juju-log: {}".format(message)
            print(message, file=sys.stderr)
        else:
            raise
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def remove_cache_tier(self, cache_pool):
        """
        Removes a cache tier from Ceph.  Flushes all dirty objects from writeback pools and waits for that to complete.
        :param cache_pool: six.string_types.  The cache tier pool name to remove.
        :return: None
        """
        # read-only is easy, writeback is much harder
        mode = get_cache_mode(self.service, cache_pool)
        version = ceph_version()
        if mode == 'readonly':
            check_call(['ceph', '--id', self.service, 'osd', 'tier', 'cache-mode', cache_pool, 'none'])
            check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove', self.name, cache_pool])

        elif mode == 'writeback':
            pool_forward_cmd = ['ceph', '--id', self.service, 'osd', 'tier',
                                'cache-mode', cache_pool, 'forward']
            if version >= '10.1':
                # Jewel added a mandatory flag
                pool_forward_cmd.append('--yes-i-really-mean-it')

            check_call(pool_forward_cmd)
            # Flush the cache and wait for it to return
            check_call(['rados', '--id', self.service, '-p', cache_pool, 'cache-flush-evict-all'])
            check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove-overlay', self.name])
            check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove', self.name, cache_pool])
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def get_mon_map(service):
    """
    Returns the current monitor map.
    :param service: six.string_types. The Ceph user name to run the command under
    :return: json string. :raise: ValueError if the monmap fails to parse.
      Also raises CalledProcessError if our ceph command fails
    """
    try:
        mon_status = check_output(
            ['ceph', '--id', service,
             'mon_status', '--format=json'])
        try:
            return json.loads(mon_status)
        except ValueError as v:
            log("Unable to parse mon_status json: {}. Error: {}".format(
                mon_status, v.message))
            raise
    except CalledProcessError as e:
        log("mon_status command failed with message: {}".format(
            e.message))
        raise
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def hash_monitor_names(service):
    """
    Uses the get_mon_map() function to get information about the monitor
    cluster.
    Hash the name of each monitor.  Return a sorted list of monitor hashes
    in an ascending order.
    :param service: six.string_types. The Ceph user name to run the command under
    :rtype : dict.   json dict of monitor name, ip address and rank
    example: {
        'name': 'ip-172-31-13-165',
        'rank': 0,
        'addr': '172.31.13.165:6789/0'}
    """
    try:
        hash_list = []
        monitor_list = get_mon_map(service=service)
        if monitor_list['monmap']['mons']:
            for mon in monitor_list['monmap']['mons']:
                hash_list.append(
                    hashlib.sha224(mon['name'].encode('utf-8')).hexdigest())
            return sorted(hash_list)
        else:
            return None
    except (ValueError, CalledProcessError):
        raise
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def monitor_key_set(service, key, value):
    """
    Sets a key value pair on the monitor cluster.
    :param service: six.string_types. The Ceph user name to run the command under
    :param key: six.string_types.  The key to set.
    :param value: The value to set.  This will be converted to a string
        before setting
    """
    try:
        check_output(
            ['ceph', '--id', service,
             'config-key', 'put', str(key), str(value)])
    except CalledProcessError as e:
        log("Monitor config-key put failed with message: {}".format(
            e.output))
        raise
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def monitor_key_get(service, key):
    """
    Gets the value of an existing key in the monitor cluster.
    :param service: six.string_types. The Ceph user name to run the command under
    :param key: six.string_types.  The key to search for.
    :return: Returns the value of that key or None if not found.
    """
    try:
        output = check_output(
            ['ceph', '--id', service,
             'config-key', 'get', str(key)])
        return output
    except CalledProcessError as e:
        log("Monitor config-key get failed with message: {}".format(
            e.output))
        return None
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def monitor_key_exists(service, key):
    """
    Searches for the existence of a key in the monitor cluster.
    :param service: six.string_types. The Ceph user name to run the command under
    :param key: six.string_types.  The key to search for
    :return: Returns True if the key exists, False if not and raises an
     exception if an unknown error occurs. :raise: CalledProcessError if
     an unknown error occurs
    """
    try:
        check_call(
            ['ceph', '--id', service,
             'config-key', 'exists', str(key)])
        # I can return true here regardless because Ceph returns
        # ENOENT if the key wasn't found
        return True
    except CalledProcessError as e:
        if e.returncode == errno.ENOENT:
            return False
        else:
            log("Unknown error from ceph config-get exists: {} {}".format(
                e.returncode, e.output))
            raise
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def remove_pool_snapshot(service, pool_name, snapshot_name):
    """
    Remove a snapshot from a RADOS pool in ceph.
    :param service: six.string_types. The Ceph user name to run the command under
    :param pool_name: six.string_types
    :param snapshot_name: six.string_types
    :return: None.  Can raise CalledProcessError
    """
    cmd = ['ceph', '--id', service, 'osd', 'pool', 'rmsnap', pool_name, snapshot_name]
    try:
        check_call(cmd)
    except CalledProcessError:
        raise


# max_bytes should be an int or long
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def get_cache_mode(service, pool_name):
    """
    Find the current caching mode of the pool_name given.
    :param service: six.string_types. The Ceph user name to run the command under
    :param pool_name: six.string_types
    :return: int or None
    """
    validator(value=service, valid_type=six.string_types)
    validator(value=pool_name, valid_type=six.string_types)
    out = check_output(['ceph', '--id', service, 'osd', 'dump', '--format=json'])
    try:
        osd_json = json.loads(out)
        for pool in osd_json['pools']:
            if pool['pool_name'] == pool_name:
                return pool['cache_mode']
        return None
    except ValueError:
        raise
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def _validate_dict_data(self, expected, actual):
        """Validate dictionary data.

           Compare expected dictionary data vs actual dictionary data.
           The values in the 'expected' dictionary can be strings, bools, ints,
           longs, or can be a function that evaluates a variable and returns a
           bool.
           """
        self.log.debug('actual: {}'.format(repr(actual)))
        self.log.debug('expected: {}'.format(repr(expected)))

        for k, v in six.iteritems(expected):
            if k in actual:
                if (isinstance(v, six.string_types) or
                        isinstance(v, bool) or
                        isinstance(v, six.integer_types)):
                    # handle explicit values
                    if v != actual[k]:
                        return "{}:{}".format(k, actual[k])
                # handle function pointers, such as not_null or valid_ip
                elif not v(actual[k]):
                    return "{}:{}".format(k, actual[k])
            else:
                return "key '{}' does not exist".format(k)
        return None
项目:nidaqmx-python    作者:ni    | 项目源码 | 文件源码
def __getitem__(self, index):
        """
        Indexes an expiration state on this collection.

        Args:
            index (str): Name of the physical channel of which the
                expiration state to retrieve.
        Returns:
            nidaqmx.system._watchdog_modules.expiration_state.ExpirationState:

            The object representing the indexed expiration state.
        """
        if isinstance(index, six.string_types):
            return ExpirationState(self._handle, index)
        else:
            raise DaqError(
                'Invalid index type "{0}" used to access expiration states.'
                .format(type(index)), -1)
项目:ios-xr-grpc-python    作者:cisco-grpc-connection-libs    | 项目源码 | 文件源码
def _ConvertValueMessage(self, value, message):
    """Convert a JSON representation into Value message."""
    if isinstance(value, dict):
      self._ConvertStructMessage(value, message.struct_value)
    elif isinstance(value, list):
      self. _ConvertListValueMessage(value, message.list_value)
    elif value is None:
      message.null_value = 0
    elif isinstance(value, bool):
      message.bool_value = value
    elif isinstance(value, six.string_types):
      message.string_value = value
    elif isinstance(value, _INT_OR_FLOAT):
      message.number_value = value
    else:
      raise ParseError('Unexpected type for Value message.')
项目:DropboxConnect    作者:raguay    | 项目源码 | 文件源码
def generic_type_name(v):
    """Return a descriptive type name that isn't Python specific. For example,
    an int value will return 'integer' rather than 'int'."""
    if isinstance(v, numbers.Integral):
        # Must come before real numbers check since integrals are reals too
        return 'integer'
    elif isinstance(v, numbers.Real):
        return 'float'
    elif isinstance(v, (tuple, list)):
        return 'list'
    elif isinstance(v, six.string_types):
        return 'string'
    elif v is None:
        return 'null'
    else:
        return type(v).__name__
项目:placebo    作者:huseyinyilmaz    | 项目源码 | 文件源码
def _get_url(self):
        """
        Returns: string_type or parse.ParseResult or parse SplitResult
        We have a vague return type for _get_url method
        because we could not generalise regex support for all
        backends. So we are directly passing result to backends.
        That way users can use regex that their backend provides.
        """
        if self.url is NotImplemented:
            raise NotImplementedError('To use placebo, you need to either '
                                      'provide url attribute or '
                                      'overwrite get_url method in subclass.')
        else:
            url = invoke_or_get(self.url)
            # if url is a string convert it to ParsedUrl
            # if isinstance(url, six.string_types):
            #     url = parse.urlparse(url)
            # TODO: check return type
            return url
项目:placebo    作者:huseyinyilmaz    | 项目源码 | 文件源码
def _get_url(self):
        """
        Returns: string_type or parse.ParseResult or parse SplitResult
        We have a vague return type for _get_url method
        because we could not generalise regex support for all
        backends. So we are directly passing result to backends.
        That way users can use regex that their backend provides.
        """
        if self.url is NotImplemented:
            raise NotImplementedError('To use placebo, you need to either '
                                      'provide url attribute or '
                                      'overwrite get_url method in subclass.')
        else:
            url = invoke_or_get(self.url)
            # if url is a string convert it to ParsedUrl
            # if isinstance(url, six.string_types):
            #     url = parse.urlparse(url)
            # TODO: check return type
            return url
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def bool_from_string(value):
    """Interpret string value as boolean.

    Returns True if value translates to True otherwise False.
    """
    if isinstance(value, six.string_types):
        value = six.text_type(value)
    else:
        msg = "Unable to interpret non-string value '%s' as boolean" % (value)
        raise ValueError(msg)

    value = value.strip().lower()

    if value in ['y', 'yes', 'true', 't', 'on']:
        return True
    elif value in ['n', 'no', 'false', 'f', 'off']:
        return False

    msg = "Unable to interpret string value '%s' as boolean" % (value)
    raise ValueError(msg)
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def log(message, level=None):
    """Write a message to the juju log"""
    command = ['juju-log']
    if level:
        command += ['-l', level]
    if not isinstance(message, six.string_types):
        message = repr(message)
    command += [message]
    # Missing juju-log should not cause failures in unit tests
    # Send log output to stderr
    try:
        subprocess.call(command)
    except OSError as e:
        if e.errno == errno.ENOENT:
            if level:
                message = "{}: {}".format(level, message)
            message = "juju-log: {}".format(message)
            print(message, file=sys.stderr)
        else:
            raise
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def remove_cache_tier(self, cache_pool):
        """
        Removes a cache tier from Ceph.  Flushes all dirty objects from writeback pools and waits for that to complete.
        :param cache_pool: six.string_types.  The cache tier pool name to remove.
        :return: None
        """
        # read-only is easy, writeback is much harder
        mode = get_cache_mode(self.service, cache_pool)
        version = ceph_version()
        if mode == 'readonly':
            check_call(['ceph', '--id', self.service, 'osd', 'tier', 'cache-mode', cache_pool, 'none'])
            check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove', self.name, cache_pool])

        elif mode == 'writeback':
            pool_forward_cmd = ['ceph', '--id', self.service, 'osd', 'tier',
                                'cache-mode', cache_pool, 'forward']
            if version >= '10.1':
                # Jewel added a mandatory flag
                pool_forward_cmd.append('--yes-i-really-mean-it')

            check_call(pool_forward_cmd)
            # Flush the cache and wait for it to return
            check_call(['rados', '--id', self.service, '-p', cache_pool, 'cache-flush-evict-all'])
            check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove-overlay', self.name])
            check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove', self.name, cache_pool])
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def get_mon_map(service):
    """
    Returns the current monitor map.
    :param service: six.string_types. The Ceph user name to run the command under
    :return: json string. :raise: ValueError if the monmap fails to parse.
      Also raises CalledProcessError if our ceph command fails
    """
    try:
        mon_status = check_output(['ceph', '--id', service,
                                   'mon_status', '--format=json'])
        if six.PY3:
            mon_status = mon_status.decode('UTF-8')
        try:
            return json.loads(mon_status)
        except ValueError as v:
            log("Unable to parse mon_status json: {}. Error: {}"
                .format(mon_status, str(v)))
            raise
    except CalledProcessError as e:
        log("mon_status command failed with message: {}"
            .format(str(e)))
        raise
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def hash_monitor_names(service):
    """
    Uses the get_mon_map() function to get information about the monitor
    cluster.
    Hash the name of each monitor.  Return a sorted list of monitor hashes
    in an ascending order.
    :param service: six.string_types. The Ceph user name to run the command under
    :rtype : dict.   json dict of monitor name, ip address and rank
    example: {
        'name': 'ip-172-31-13-165',
        'rank': 0,
        'addr': '172.31.13.165:6789/0'}
    """
    try:
        hash_list = []
        monitor_list = get_mon_map(service=service)
        if monitor_list['monmap']['mons']:
            for mon in monitor_list['monmap']['mons']:
                hash_list.append(
                    hashlib.sha224(mon['name'].encode('utf-8')).hexdigest())
            return sorted(hash_list)
        else:
            return None
    except (ValueError, CalledProcessError):
        raise
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def monitor_key_set(service, key, value):
    """
    Sets a key value pair on the monitor cluster.
    :param service: six.string_types. The Ceph user name to run the command under
    :param key: six.string_types.  The key to set.
    :param value: The value to set.  This will be converted to a string
        before setting
    """
    try:
        check_output(
            ['ceph', '--id', service,
             'config-key', 'put', str(key), str(value)])
    except CalledProcessError as e:
        log("Monitor config-key put failed with message: {}".format(
            e.output))
        raise
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def monitor_key_get(service, key):
    """
    Gets the value of an existing key in the monitor cluster.
    :param service: six.string_types. The Ceph user name to run the command under
    :param key: six.string_types.  The key to search for.
    :return: Returns the value of that key or None if not found.
    """
    try:
        output = check_output(
            ['ceph', '--id', service,
             'config-key', 'get', str(key)]).decode('UTF-8')
        return output
    except CalledProcessError as e:
        log("Monitor config-key get failed with message: {}".format(
            e.output))
        return None
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def monitor_key_exists(service, key):
    """
    Searches for the existence of a key in the monitor cluster.
    :param service: six.string_types. The Ceph user name to run the command under
    :param key: six.string_types.  The key to search for
    :return: Returns True if the key exists, False if not and raises an
     exception if an unknown error occurs. :raise: CalledProcessError if
     an unknown error occurs
    """
    try:
        check_call(
            ['ceph', '--id', service,
             'config-key', 'exists', str(key)])
        # I can return true here regardless because Ceph returns
        # ENOENT if the key wasn't found
        return True
    except CalledProcessError as e:
        if e.returncode == errno.ENOENT:
            return False
        else:
            log("Unknown error from ceph config-get exists: {} {}".format(
                e.returncode, e.output))
            raise
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def remove_pool_snapshot(service, pool_name, snapshot_name):
    """
    Remove a snapshot from a RADOS pool in ceph.
    :param service: six.string_types. The Ceph user name to run the command under
    :param pool_name: six.string_types
    :param snapshot_name: six.string_types
    :return: None.  Can raise CalledProcessError
    """
    cmd = ['ceph', '--id', service, 'osd', 'pool', 'rmsnap', pool_name, snapshot_name]
    try:
        check_call(cmd)
    except CalledProcessError:
        raise


# max_bytes should be an int or long
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def get_cache_mode(service, pool_name):
    """
    Find the current caching mode of the pool_name given.
    :param service: six.string_types. The Ceph user name to run the command under
    :param pool_name: six.string_types
    :return: int or None
    """
    validator(value=service, valid_type=six.string_types)
    validator(value=pool_name, valid_type=six.string_types)
    out = check_output(['ceph', '--id', service,
                        'osd', 'dump', '--format=json'])
    if six.PY3:
        out = out.decode('UTF-8')
    try:
        osd_json = json.loads(out)
        for pool in osd_json['pools']:
            if pool['pool_name'] == pool_name:
                return pool['cache_mode']
        return None
    except ValueError:
        raise
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def _validate_dict_data(self, expected, actual):
        """Validate dictionary data.

           Compare expected dictionary data vs actual dictionary data.
           The values in the 'expected' dictionary can be strings, bools, ints,
           longs, or can be a function that evaluates a variable and returns a
           bool.
           """
        self.log.debug('actual: {}'.format(repr(actual)))
        self.log.debug('expected: {}'.format(repr(expected)))

        for k, v in six.iteritems(expected):
            if k in actual:
                if (isinstance(v, six.string_types) or
                        isinstance(v, bool) or
                        isinstance(v, six.integer_types)):
                    # handle explicit values
                    if v != actual[k]:
                        return "{}:{}".format(k, actual[k])
                # handle function pointers, such as not_null or valid_ip
                elif not v(actual[k]):
                    return "{}:{}".format(k, actual[k])
            else:
                return "key '{}' does not exist".format(k)
        return None
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def bool_from_string(value):
    """Interpret string value as boolean.

    Returns True if value translates to True otherwise False.
    """
    if isinstance(value, six.string_types):
        value = six.text_type(value)
    else:
        msg = "Unable to interpret non-string value '%s' as boolean" % (value)
        raise ValueError(msg)

    value = value.strip().lower()

    if value in ['y', 'yes', 'true', 't', 'on']:
        return True
    elif value in ['n', 'no', 'false', 'f', 'off']:
        return False

    msg = "Unable to interpret string value '%s' as boolean" % (value)
    raise ValueError(msg)
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def log(message, level=None):
    """Write a message to the juju log"""
    command = ['juju-log']
    if level:
        command += ['-l', level]
    if not isinstance(message, six.string_types):
        message = repr(message)
    command += [message]
    # Missing juju-log should not cause failures in unit tests
    # Send log output to stderr
    try:
        subprocess.call(command)
    except OSError as e:
        if e.errno == errno.ENOENT:
            if level:
                message = "{}: {}".format(level, message)
            message = "juju-log: {}".format(message)
            print(message, file=sys.stderr)
        else:
            raise
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def ns_query(address):
    try:
        import dns.resolver
    except ImportError:
        if six.PY2:
            apt_install('python-dnspython', fatal=True)
        else:
            apt_install('python3-dnspython', fatal=True)
        import dns.resolver

    if isinstance(address, dns.name.Name):
        rtype = 'PTR'
    elif isinstance(address, six.string_types):
        rtype = 'A'
    else:
        return None

    try:
        answers = dns.resolver.query(address, rtype)
    except dns.resolver.NXDOMAIN:
        return None

    if answers:
        return str(answers[0])
    return None
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def get_mon_map(service):
    """
    Returns the current monitor map.
    :param service: six.string_types. The Ceph user name to run the command under
    :return: json string. :raise: ValueError if the monmap fails to parse.
      Also raises CalledProcessError if our ceph command fails
    """
    try:
        mon_status = check_output(['ceph', '--id', service,
                                   'mon_status', '--format=json'])
        if six.PY3:
            mon_status = mon_status.decode('UTF-8')
        try:
            return json.loads(mon_status)
        except ValueError as v:
            log("Unable to parse mon_status json: {}. Error: {}"
                .format(mon_status, str(v)))
            raise
    except CalledProcessError as e:
        log("mon_status command failed with message: {}"
            .format(str(e)))
        raise
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def hash_monitor_names(service):
    """
    Uses the get_mon_map() function to get information about the monitor
    cluster.
    Hash the name of each monitor.  Return a sorted list of monitor hashes
    in an ascending order.
    :param service: six.string_types. The Ceph user name to run the command under
    :rtype : dict.   json dict of monitor name, ip address and rank
    example: {
        'name': 'ip-172-31-13-165',
        'rank': 0,
        'addr': '172.31.13.165:6789/0'}
    """
    try:
        hash_list = []
        monitor_list = get_mon_map(service=service)
        if monitor_list['monmap']['mons']:
            for mon in monitor_list['monmap']['mons']:
                hash_list.append(
                    hashlib.sha224(mon['name'].encode('utf-8')).hexdigest())
            return sorted(hash_list)
        else:
            return None
    except (ValueError, CalledProcessError):
        raise
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def monitor_key_set(service, key, value):
    """
    Sets a key value pair on the monitor cluster.
    :param service: six.string_types. The Ceph user name to run the command under
    :param key: six.string_types.  The key to set.
    :param value: The value to set.  This will be converted to a string
        before setting
    """
    try:
        check_output(
            ['ceph', '--id', service,
             'config-key', 'put', str(key), str(value)])
    except CalledProcessError as e:
        log("Monitor config-key put failed with message: {}".format(
            e.output))
        raise
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def monitor_key_get(service, key):
    """
    Gets the value of an existing key in the monitor cluster.
    :param service: six.string_types. The Ceph user name to run the command under
    :param key: six.string_types.  The key to search for.
    :return: Returns the value of that key or None if not found.
    """
    try:
        output = check_output(
            ['ceph', '--id', service,
             'config-key', 'get', str(key)]).decode('UTF-8')
        return output
    except CalledProcessError as e:
        log("Monitor config-key get failed with message: {}".format(
            e.output))
        return None
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def remove_pool_snapshot(service, pool_name, snapshot_name):
    """
    Remove a snapshot from a RADOS pool in ceph.
    :param service: six.string_types. The Ceph user name to run the command under
    :param pool_name: six.string_types
    :param snapshot_name: six.string_types
    :return: None.  Can raise CalledProcessError
    """
    cmd = ['ceph', '--id', service, 'osd', 'pool', 'rmsnap', pool_name, snapshot_name]
    try:
        check_call(cmd)
    except CalledProcessError:
        raise


# max_bytes should be an int or long
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def get_cache_mode(service, pool_name):
    """
    Find the current caching mode of the pool_name given.
    :param service: six.string_types. The Ceph user name to run the command under
    :param pool_name: six.string_types
    :return: int or None
    """
    validator(value=service, valid_type=six.string_types)
    validator(value=pool_name, valid_type=six.string_types)
    out = check_output(['ceph', '--id', service,
                        'osd', 'dump', '--format=json'])
    if six.PY3:
        out = out.decode('UTF-8')
    try:
        osd_json = json.loads(out)
        for pool in osd_json['pools']:
            if pool['pool_name'] == pool_name:
                return pool['cache_mode']
        return None
    except ValueError:
        raise
项目:flask-basic-roles    作者:ownaginatious    | 项目源码 | 文件源码
def add_roles(self, user, roles):
        """
        Adds one or more roles to a user. Adding the
        same role more than once has no effect.

        user  -- name of the user
        roles -- one or more roles to add
        """

        if user not in self.users:
            raise UserNotDefined(user)
        roles = (roles,) if isinstance(roles, six.string_types) else roles
        for role in roles:
            if ',' in role:
                raise BadRoleError('\',\' not allowed in role name (%s) '
                                   'for user %s' % (role, user))
            self.roles[user].add(role)
项目:pyselenium-js    作者:neetjn    | 项目源码 | 文件源码
def ng_call_scope_function(self, element, func, params='', return_out=False):
        """
        :Description: Will execute scope function with provided parameters.
        :Warning: This will only work for angular.js 1.x.
        :Warning: Requires angular debugging to be enabled.
        :param element: Element for browser instance to target.
        :param func: Function to execute from angular element scope.
        :type func: string
        :param params: String (naked) args, or list of parameters to pass to target function.
        :type params: string, tuple, list
        :param return_out: Return output of function call otherwise None
        :type return_out: bool
        """
        if isinstance(params, string_types):
            param_str = params
        elif isinstance(params, (tuple, list)):
            param_str = self.__serialize_params(params)
        else:
            raise ValueError('Invalid type specified for function parameters')
        exec_str = 'angular.element(arguments[0]).scope().%s(%s);' % (func, param_str)
        if return_out:
            return self.__type2python(
                self.browser.execute_script('return {}'.format(exec_str), element))
        else:
            self.browser.execute_script(exec_str, element)
项目:pyselenium-js    作者:neetjn    | 项目源码 | 文件源码
def ng_call_ctrl_function(self, element, func, params='', return_out=False):
        """
        :Description: Will execute controller function with provided parameters.
        :Warning: This will only work for angular.js 1.x.
        :Warning: Requires angular debugging to be enabled.
        :param element: Element for browser instance to target.
        :param func: Function to execute from angular element controller.
        :type func: string
        :param params: String (naked) args, or list of parameters to pass to target function.
        :type params: string, tuple, list
        :param return_out: Return output of function call otherwise None
        :type return_out: bool
        """
        if isinstance(params, string_types):
            param_str = params
        elif isinstance(params, (tuple, list)):
            param_str = self.__serialize_params(params)
        else:
            raise ValueError('Invalid type specified for function parameters')
        exec_str = 'angular.element(arguments[0]).controller().%s(%s);' % (func, param_str)
        if return_out:
            return self.__type2python(
                self.browser.execute_script('return {}'.format(exec_str), element))
        else:
            self.browser.execute_script(exec_str, element)
项目:pyselenium-js    作者:neetjn    | 项目源码 | 文件源码
def ng2_call_component_function(self, element, func, params='', return_out=False):
        """
        :Description: Will execute the component instance function with provided parameters.
        :Warning: This will only work for Angular components.
        :param element: Element for browser instance to target.
        :param func: Function to execute from component instance.
        :type func: string
        :param params: String (naked) args, or list of parameters to pass to target function.
        :type params: string, tuple, list
        :param return_out: Return output of function call otherwise None
        :type return_out: bool
        """
        if isinstance(params, string_types):
            param_str = params
        elif isinstance(params, (tuple, list)):
            param_str = self.__serialize_params(params)
        else:
            raise ValueError('Invalid type specified for function parameters')
        exec_str = 'ng.probe(arguments[0]).componentInstance.%s(%s);' % (func, param_str)
        if return_out:
            return self.__type2python(
                self.browser.execute_script('return {}'.format(exec_str), element))
        else:
            self.browser.execute_script(exec_str, element)
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def ensure_timezone(func, argname, arg):
    """Argument preprocessor that converts the input into a tzinfo object.

    Usage
    -----
    >>> from zipline.utils.preprocess import preprocess
    >>> @preprocess(tz=ensure_timezone)
    ... def foo(tz):
    ...     return tz
    >>> foo('utc')
    <UTC>
    """
    if isinstance(arg, tzinfo):
        return arg
    if isinstance(arg, string_types):
        return timezone(arg)

    raise TypeError(
        "{func}() couldn't convert argument "
        "{argname}={arg!r} to a timezone.".format(
            func=_qualified_name(func),
            argname=argname,
            arg=arg,
        ),
    )
项目:pogom-linux    作者:PokeHunterProject    | 项目源码 | 文件源码
def user_login(self):
        self.log.info('Google User Login for: {}'.format(self.username))

        if not isinstance(self.username, six.string_types) or not isinstance(self.password, six.string_types):
            raise AuthException("Username/password not correctly specified")

        user_login = perform_master_login(self.username, self.password, self.GOOGLE_LOGIN_ANDROID_ID)

        try:
            refresh_token = user_login.get('Token', None)
        except ConnectionError as e:
            raise AuthException("Caught ConnectionError: %s", e)

        if refresh_token is not None:
            self._refresh_token = refresh_token
            self.log.info('Google User Login successful.')
        else:
            self._refresh_token = None
            raise AuthException("Invalid Google Username/password")

        self.get_access_token()
        return self._login
项目:oscars2016    作者:0x0ece    | 项目源码 | 文件源码
def scopes_to_string(scopes):
    """Converts scope value to a string.

    If scopes is a string then it is simply passed through. If scopes is an
    iterable then a string is returned that is all the individual scopes
    concatenated with spaces.

    Args:
        scopes: string or iterable of strings, the scopes.

    Returns:
        The scopes formatted as a single string.
    """
    if isinstance(scopes, six.string_types):
        return scopes
    else:
        return ' '.join(scopes)
项目:oscars2016    作者:0x0ece    | 项目源码 | 文件源码
def string_to_scopes(scopes):
    """Converts stringifed scope value to a list.

    If scopes is a list then it is simply passed through. If scopes is an
    string then a list of each individual scope is returned.

    Args:
        scopes: a string or iterable of strings, the scopes.

    Returns:
        The scopes in a list.
    """
    if not scopes:
        return []
    if isinstance(scopes, six.string_types):
        return scopes.split(' ')
    else:
        return scopes
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def __init__(self, value):
        """
        Initializer value can be:

            - integer_type: absolute days from epoch (1970, 1, 1). Can be negative.
            - datetime.date: built-in date
            - string_type: a string time of the form "yyyy-mm-dd"
        """
        if isinstance(value, six.integer_types):
            self.days_from_epoch = value
        elif isinstance(value, (datetime.date, datetime.datetime)):
            self._from_timetuple(value.timetuple())
        elif isinstance(value, six.string_types):
            self._from_datestring(value)
        else:
            raise TypeError('Date arguments must be a whole number, datetime.date, or string')
项目:Projects    作者:it2school    | 项目源码 | 文件源码
def generic_type_name(v):
    """Return a descriptive type name that isn't Python specific. For example,
    an int value will return 'integer' rather than 'int'."""
    if isinstance(v, numbers.Integral):
        # Must come before real numbers check since integrals are reals too
        return 'integer'
    elif isinstance(v, numbers.Real):
        return 'float'
    elif isinstance(v, (tuple, list)):
        return 'list'
    elif isinstance(v, six.string_types):
        return 'string'
    elif v is None:
        return 'null'
    else:
        return type(v).__name__
项目:azure-python-devtools    作者:Azure    | 项目源码 | 文件源码
def process_response(self, response):
        import six
        body = response['body']['string']

        # backward compatibility. under 2.7 response body is unicode, under 3.5 response body is
        # bytes. when set the value back, the same type must be used.
        body_is_string = isinstance(body, six.string_types)

        content_in_string = (response['body']['string'] or b'').decode('utf-8')
        index = content_in_string.find(LargeResponseBodyProcessor.control_flag)

        if index > -1:
            length = int(content_in_string[index + len(LargeResponseBodyProcessor.control_flag):])
            if body_is_string:
                response['body']['string'] = '0' * length
            else:
                response['body']['string'] = bytes([0] * length)

        return response
项目:azure-python-devtools    作者:Azure    | 项目源码 | 文件源码
def _process_response_recording(self, response):
        if self.in_recording:
            # make header name lower case and filter unwanted headers
            headers = {}
            for key in response['headers']:
                if key.lower() not in self.FILTER_HEADERS:
                    headers[key.lower()] = response['headers'][key]
            response['headers'] = headers

            body = response['body']['string']
            if body and not isinstance(body, six.string_types):
                response['body']['string'] = body.decode('utf-8')

            for processor in self.recording_processors:
                response = processor.process_response(response)
                if not response:
                    break
        else:
            for processor in self.replay_processors:
                response = processor.process_response(response)
                if not response:
                    break

        return response
项目:Splunk_CBER_App    作者:MHaggis    | 项目源码 | 文件源码
def __init__(self, fileobj):
        global rrule
        if not rrule:
            from dateutil import rrule

        if isinstance(fileobj, string_types):
            self._s = fileobj
            # ical should be encoded in UTF-8 with CRLF
            fileobj = open(fileobj, 'r')
        elif hasattr(fileobj, "name"):
            self._s = fileobj.name
        else:
            self._s = repr(fileobj)

        self._vtz = {}

        self._parse_rfc(fileobj.read())
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def extract_options(inc, global_options=None):
    global_options = global_options or []
    if global_options and isinstance(global_options, six.string_types):
        global_options = [global_options]
    if '|' not in inc:
        return (inc, global_options)
    inc, opts = inc.split('|')
    return (inc, parse_sync_options(opts) + global_options)