Python charmhelpers.core.hookenv 模块,action_get() 实例源码

我们从Python开源项目中,提取了以下37个代码示例,用于说明如何使用charmhelpers.core.hookenv.action_get()

项目:charm-ceph-mon    作者:openstack    | 项目源码 | 文件源码
def create_pool():
    pool_name = action_get("name")
    pool_type = action_get("pool-type")
    try:
        if pool_type == "replicated":
            replicas = action_get("replicas")
            replicated_pool = ReplicatedPool(name=pool_name,
                                             service='admin',
                                             replicas=replicas)
            replicated_pool.create()

        elif pool_type == "erasure":
            crush_profile_name = action_get("erasure-profile-name")
            erasure_pool = ErasurePool(name=pool_name,
                                       erasure_code_profile=crush_profile_name,
                                       service='admin')
            erasure_pool.create()
        else:
            log("Unknown pool type of {}. Only erasure or replicated is "
                "allowed".format(pool_type))
            action_fail("Unknown pool type of {}. Only erasure or replicated "
                        "is allowed".format(pool_type))
    except CalledProcessError as e:
        action_fail("Pool creation failed because of a failed process. "
                    "Ret Code: {} Message: {}".format(e.returncode, str(e)))
项目:charm-ceph-mon    作者:openstack    | 项目源码 | 文件源码
def pool_stats():
    try:
        pool_name = action_get("pool-name")
        cluster = connect()
        ioctx = cluster.open_ioctx(pool_name)
        stats = ioctx.get_stats()
        ioctx.close()
        cluster.shutdown()
        return stats
    except (rados.Error,
            rados.IOError,
            rados.ObjectNotFound,
            rados.NoData,
            rados.NoSpace,
            rados.PermissionError) as e:
        action_fail(str(e))
项目:charm-ceph-mon    作者:openstack    | 项目源码 | 文件源码
def update_crushmap():
    try:
        encoded_text = action_get("map")
        json_map = base64.b64decode(encoded_text)
        try:
            # This needs json_map passed to it from stdin
            crushtool = Popen(
                ["crushtool", "-o", "compiled_crushmap", "-m", "compile"],
                stdin=PIPE)
            crushtool_stdout, crushtool_stderr = crushtool.communicate(
                input=json_map)
            if crushtool_stderr is not None:
                action_fail(
                    "Failed to compile json: {}".format(crushtool_stderr))
            check_output(
                ["ceph", "osd", "setcrushmap", "-i", "compiled_crushmap"])
        except (CalledProcessError, OSError) as err2:
            action_fail("Crush compile or load failed with error: {}".format(
                err2.output))
    except TypeError as err:
        action_fail(
            "Unable to base64 decode: {}. Error: {}".format(encoded_text, err))
项目:charm-ceph-mon    作者:openstack    | 项目源码 | 文件源码
def delete_cache_tier():
    backer_pool = action_get("backer-pool")
    cache_pool = action_get("cache-pool")

    # Pre flight checks
    if not pool_exists('admin', backer_pool):
        log("Backer pool {} must exist before calling this".format(
            backer_pool))
        action_fail("remove-cache-tier failed. Backer pool {} must exist "
                    "before calling this".format(backer_pool))

    if not pool_exists('admin', cache_pool):
        log("Cache pool {} must exist before calling this".format(
            cache_pool))
        action_fail("remove-cache-tier failed. Cache pool {} must exist "
                    "before calling this".format(cache_pool))

    pool = Pool(service='admin', name=backer_pool)
    try:
        pool.remove_cache_tier(cache_pool=cache_pool)
    except CalledProcessError as err:
        log("Removing the cache tier failed with message: {}".format(str(err)))
        action_fail("remove-cache-tier failed. Removing the cache tier failed "
                    "with message: {}".format(str(err)))
项目:charm-glusterfs    作者:openstack    | 项目源码 | 文件源码
def enable_volume_quota():
    """
    Enable quotas on the volume
    """
    # Gather our action parameters
    vol = action_get("volume")
    usage_limit = action_get("usage-limit")
    parsed_usage_limit = int(usage_limit)
    path = action_get("path")
    # Turn quotas on if not already enabled
    quotas_enabled = volume.volume_quotas_enabled(vol)
    if quotas_enabled.is_err():
        action_fail("Enable quota failed: {}".format(quotas_enabled.value))
    if not quotas_enabled.value:
        try:
            volume.volume_enable_quotas(vol)
        except GlusterCmdException as e:
            action_fail("Enable quotas failed: {}".format(e))

    try:
        volume.volume_add_quota(vol, path, parsed_usage_limit)
    except GlusterCmdException as e:
        action_fail("Add quota failed: {}".format(e))
项目:charm-glusterfs    作者:openstack    | 项目源码 | 文件源码
def list_volume_quotas():
    """
    List quotas on the volume
    """
    vol = action_get("volume")
    quotas_enabled = volume.volume_quotas_enabled(vol)
    if quotas_enabled.is_err():
        action_fail("List quota failed: {}".format(quotas_enabled.value))
    if quotas_enabled.value:
        quotas = volume.quota_list(vol)
        if quotas.is_err():
            action_fail(
                "Failed to get volume quotas: {}".format(quotas.value))
        quota_strings = []
        for quota in quotas.value:
            quota_string = "path:{} limit:{} used:{}".format(
                quota.path,
                quota.hard_limit,
                quota.used)
            quota_strings.append(quota_string)
        action_set({"quotas": "\n".join(quota_strings)})
项目:charm-glusterfs    作者:openstack    | 项目源码 | 文件源码
def set_volume_options():
    """
    Set one or more options on the volume at once
    """
    vol = action_get("volume")

    # Gather all of the action parameters up at once.  We don't know what
    # the user wants to change.
    options = action_get()
    settings = []
    for key in options:
        if key != "volume":
            settings.append(
                volume.GlusterOption.from_str(key, options[key]))

    volume.volume_set_options(vol, settings)


# Actions to function mapping, to allow for illegal python action names that
# can map to a python function.
项目:charm-ceph    作者:openstack    | 项目源码 | 文件源码
def pool_stats():
    try:
        pool_name = action_get("pool-name")
        cluster = connect()
        ioctx = cluster.open_ioctx(pool_name)
        stats = ioctx.get_stats()
        ioctx.close()
        cluster.shutdown()
        return stats
    except (rados.Error,
            rados.IOError,
            rados.ObjectNotFound,
            rados.NoData,
            rados.NoSpace,
            rados.PermissionError) as e:
        action_fail(e.message)
项目:charm-ceph-proxy    作者:openstack    | 项目源码 | 文件源码
def pool_stats():
    try:
        pool_name = action_get("pool-name")
        cluster = connect()
        ioctx = cluster.open_ioctx(pool_name)
        stats = ioctx.get_stats()
        ioctx.close()
        cluster.shutdown()
        return stats
    except (rados.Error,
            rados.IOError,
            rados.ObjectNotFound,
            rados.NoData,
            rados.NoSpace,
            rados.PermissionError) as e:
        action_fail(e.message)
项目:charm-ceph-proxy    作者:openstack    | 项目源码 | 文件源码
def delete_cache_tier():
    backer_pool = action_get("backer-pool")
    cache_pool = action_get("cache-pool")

    # Pre flight checks
    if not pool_exists('admin', backer_pool):
        log("Backer pool {} must exist before calling this".format(
            backer_pool))
        action_fail("remove-cache-tier failed. Backer pool {} must exist "
                    "before calling this".format(backer_pool))

    if not pool_exists('admin', cache_pool):
        log("Cache pool {} must exist before calling this".format(
            cache_pool))
        action_fail("remove-cache-tier failed. Cache pool {} must exist "
                    "before calling this".format(cache_pool))

    pool = Pool(service='admin', name=backer_pool)
    try:
        pool.remove_cache_tier(cache_pool=cache_pool)
    except CalledProcessError as err:
        log("Removing the cache tier failed with message: {}".format(
            err.message))
        action_fail("remove-cache-tier failed. Removing the cache tier failed "
                    "with message: {}".format(err.message))
项目:charm-ceph-osd    作者:openstack    | 项目源码 | 文件源码
def get_devices():
    """Parse 'osd-devices' action parameter, returns list."""
    devices = []
    for path in hookenv.action_get('osd-devices').split(' '):
        path = path.strip()
        if not os.path.isabs(path):
            raise Error('{}: Not absolute path.'.format(path))
        devices.append(path)
    return devices
项目:charm-ceph-osd    作者:openstack    | 项目源码 | 文件源码
def get_devices():
    devices = []
    for path in hookenv.action_get('osd-devices').split(' '):
        path = path.strip()
        if os.path.isabs(path):
            devices.append(path)

    return devices
项目:charm-ceph-mon    作者:openstack    | 项目源码 | 文件源码
def delete_erasure_profile():
    name = action_get("name")

    try:
        remove_erasure_profile(service='admin', profile_name=name)
    except CalledProcessError as e:
        log("Remove erasure profile failed with error {}".format(str(e)),
            level="ERROR")
        action_fail("Remove erasure profile failed with error: {}"
                    .format(str(e)))
项目:charm-ceph-mon    作者:openstack    | 项目源码 | 文件源码
def pool_get():
    key = action_get("key")
    pool_name = action_get("pool_name")
    try:
        value = (check_output(['ceph', 'osd', 'pool', 'get', pool_name, key])
                 .decode('UTF-8'))
        return value
    except CalledProcessError as e:
        action_fail(str(e))
项目:charm-ceph-mon    作者:openstack    | 项目源码 | 文件源码
def set_pool():
    key = action_get("key")
    value = action_get("value")
    pool_name = action_get("pool_name")
    pool_set(service='ceph', pool_name=pool_name, key=key, value=value)
项目:charm-ceph-mon    作者:openstack    | 项目源码 | 文件源码
def set_pool_max_bytes():
    pool_name = action_get("pool-name")
    max_bytes = action_get("max")
    set_pool_quota(service='ceph',
                   pool_name=pool_name,
                   max_bytes=max_bytes)
项目:charm-ceph-mon    作者:openstack    | 项目源码 | 文件源码
def snapshot_ceph_pool():
    pool_name = action_get("pool-name")
    snapshot_name = action_get("snapshot-name")
    snapshot_pool(service='ceph',
                  pool_name=pool_name,
                  snapshot_name=snapshot_name)
项目:charm-ceph-mon    作者:openstack    | 项目源码 | 文件源码
def make_cache_tier():
    backer_pool = action_get("backer-pool")
    cache_pool = action_get("cache-pool")
    cache_mode = action_get("cache-mode")

    # Pre flight checks
    if not pool_exists('admin', backer_pool):
        log("Please create {} pool before calling create-cache-tier".format(
            backer_pool))
        action_fail("create-cache-tier failed. Backer pool {} must exist "
                    "before calling this".format(backer_pool))

    if not pool_exists('admin', cache_pool):
        log("Please create {} pool before calling create-cache-tier".format(
            cache_pool))
        action_fail("create-cache-tier failed. Cache pool {} must exist "
                    "before calling this".format(cache_pool))

    pool = Pool(service='admin', name=backer_pool)
    try:
        pool.add_cache_tier(cache_pool=cache_pool, mode=cache_mode)
    except CalledProcessError as err:
        log("Add cache tier failed with message: {}"
            .format(str(err)))
        action_fail("create-cache-tier failed.  Add cache tier failed with "
                    "message: {}".format(str(err)))
项目:charm-ceph-mon    作者:openstack    | 项目源码 | 文件源码
def make_erasure_profile():
    name = action_get("name")
    out = get_erasure_profile(service='admin', name=name)
    action_set({'message': out})
项目:charm-glusterfs    作者:openstack    | 项目源码 | 文件源码
def rebalance_volume():
    """
    Start a rebalance volume operation
    """
    vol = action_get("volume")
    if not vol:
        action_fail("volume not specified")
    try:
        volume.volume_rebalance(vol)
    except GlusterCmdException as e:
        action_fail("volume rebalance failed with error: {}".format(e))
项目:charm-glusterfs    作者:openstack    | 项目源码 | 文件源码
def enable_bitrot_scan():
    """
    Enable bitrot scan
    """
    vol = action_get("volume")
    if not vol:
        action_fail("volume not specified")
    try:
        volume.volume_enable_bitrot(vol)
    except GlusterCmdException as e:
        action_fail("enable bitrot failed with error: {}".format(e))
项目:charm-glusterfs    作者:openstack    | 项目源码 | 文件源码
def disable_bitrot_scan():
    """
    Disable bitrot scan
    """
    vol = action_get("volume")
    if not vol:
        action_fail("volume not specified")
    try:
        volume.volume_disable_bitrot(vol)
    except GlusterCmdException as e:
        action_fail("enable disable failed with error: {}".format(e))
项目:charm-glusterfs    作者:openstack    | 项目源码 | 文件源码
def pause_bitrot_scan():
    """
    Pause bitrot scan
    """
    vol = action_get("volume")
    option = volume.BitrotOption.Scrub(volume.ScrubControl.Pause)
    try:
        volume.volume_set_bitrot_option(vol, option)
    except GlusterCmdException as e:
        action_fail("pause bitrot scan failed with error: {}".format(e))
项目:charm-glusterfs    作者:openstack    | 项目源码 | 文件源码
def resume_bitrot_scan():
    """
    Resume bitrot scan
    """
    vol = action_get("volume")
    option = volume.BitrotOption.Scrub(volume.ScrubControl.Resume)
    try:
        volume.volume_set_bitrot_option(vol, option)
    except GlusterCmdException as e:
        action_fail("resume bitrot scan failed with error: {}".format(e))
项目:charm-glusterfs    作者:openstack    | 项目源码 | 文件源码
def set_bitrot_throttle():
    """
    Set how aggressive bitrot scanning should be
    """
    vol = action_get("volume")
    throttle = action_get("throttle")
    option = volume.ScrubAggression.from_str(throttle)
    try:
        volume.volume_set_bitrot_option(vol, volume.BitrotOption.ScrubThrottle(
            option))
    except GlusterCmdException as e:
        action_fail(
            "set bitrot throttle failed with error: {}".format(e))
项目:charm-glusterfs    作者:openstack    | 项目源码 | 文件源码
def disable_volume_quota():
    """
    Disable quotas on the volume
    """
    vol = action_get("volume")
    path = action_get("path")
    quotas_enabled = volume.volume_quotas_enabled(vol)
    if quotas_enabled.is_err():
        action_fail("Disable quota failed: {}".format(quotas_enabled.value))
    if quotas_enabled.value:
        try:
            volume.volume_remove_quota(vol, path)
        except GlusterCmdException as e:
            action_fail("remove quota failed with error: {}".format(e))
项目:charm-ceph    作者:openstack    | 项目源码 | 文件源码
def pool_get():
    key = action_get("key")
    pool_name = action_get("pool_name")
    try:
        value = check_output(['ceph', 'osd', 'pool', 'get', pool_name, key])
        return value
    except CalledProcessError as e:
        action_fail(e.message)
项目:charm-ceph    作者:openstack    | 项目源码 | 文件源码
def set_pool():
    key = action_get("key")
    value = action_get("value")
    pool_name = action_get("pool_name")
    pool_set(service='ceph', pool_name=pool_name, key=key, value=value)
项目:charm-ceph    作者:openstack    | 项目源码 | 文件源码
def delete_pool_snapshot():
    pool_name = action_get("pool-name")
    snapshot_name = action_get("snapshot-name")
    remove_pool_snapshot(service='ceph',
                         pool_name=pool_name,
                         snapshot_name=snapshot_name)


# Note only one or the other can be set
项目:charm-ceph    作者:openstack    | 项目源码 | 文件源码
def set_pool_max_bytes():
    pool_name = action_get("pool-name")
    max_bytes = action_get("max")
    set_pool_quota(service='ceph',
                   pool_name=pool_name,
                   max_bytes=max_bytes)
项目:charm-ceph-proxy    作者:openstack    | 项目源码 | 文件源码
def pool_get():
    key = action_get("key")
    pool_name = action_get("pool_name")
    try:
        value = check_output(['ceph', 'osd', 'pool', 'get', pool_name, key])
        return value
    except CalledProcessError as e:
        action_fail(e.message)
项目:charm-ceph-proxy    作者:openstack    | 项目源码 | 文件源码
def set_pool():
    key = action_get("key")
    value = action_get("value")
    pool_name = action_get("pool_name")
    pool_set(service='ceph', pool_name=pool_name, key=key, value=value)
项目:charm-ceph-proxy    作者:openstack    | 项目源码 | 文件源码
def delete_pool_snapshot():
    pool_name = action_get("pool-name")
    snapshot_name = action_get("snapshot-name")
    remove_pool_snapshot(service='ceph',
                         pool_name=pool_name,
                         snapshot_name=snapshot_name)


# Note only one or the other can be set
项目:charm-ceph-proxy    作者:openstack    | 项目源码 | 文件源码
def set_pool_max_bytes():
    pool_name = action_get("pool-name")
    max_bytes = action_get("max")
    set_pool_quota(service='ceph',
                   pool_name=pool_name,
                   max_bytes=max_bytes)
项目:charm-ceph-proxy    作者:openstack    | 项目源码 | 文件源码
def make_cache_tier():
    backer_pool = action_get("backer-pool")
    cache_pool = action_get("cache-pool")
    cache_mode = action_get("cache-mode")

    # Pre flight checks
    if not pool_exists('admin', backer_pool):
        log("Please create {} pool before calling create-cache-tier".format(
            backer_pool))
        action_fail("create-cache-tier failed. Backer pool {} must exist "
                    "before calling this".format(backer_pool))

    if not pool_exists('admin', cache_pool):
        log("Please create {} pool before calling create-cache-tier".format(
            cache_pool))
        action_fail("create-cache-tier failed. Cache pool {} must exist "
                    "before calling this".format(cache_pool))

    pool = Pool(service='admin', name=backer_pool)
    try:
        pool.add_cache_tier(cache_pool=cache_pool, mode=cache_mode)
    except CalledProcessError as err:
        log("Add cache tier failed with message: {}".format(
            err.message))
        action_fail("create-cache-tier failed.  Add cache tier failed with "
                    "message: {}".format(err.message))
项目:charm-helpers    作者:juju    | 项目源码 | 文件源码
def test_action_get_with_key(self, check_output):
        action_data = 'bar'
        check_output.return_value = json.dumps(action_data).encode('UTF-8')

        result = hookenv.action_get(key='foo')

        self.assertEqual(result, 'bar')
        check_output.assert_called_with(['action-get', 'foo', '--format=json'])
项目:charm-helpers    作者:juju    | 项目源码 | 文件源码
def test_action_get_without_key(self, check_output):
        check_output.return_value = json.dumps(dict(foo='bar')).encode('UTF-8')

        result = hookenv.action_get()

        self.assertEqual(result['foo'], 'bar')
        check_output.assert_called_with(['action-get', '--format=json'])