我们从Python开源项目中,提取了以下37个代码示例,用于说明如何使用charmhelpers.core.hookenv.action_get()。
def create_pool(): pool_name = action_get("name") pool_type = action_get("pool-type") try: if pool_type == "replicated": replicas = action_get("replicas") replicated_pool = ReplicatedPool(name=pool_name, service='admin', replicas=replicas) replicated_pool.create() elif pool_type == "erasure": crush_profile_name = action_get("erasure-profile-name") erasure_pool = ErasurePool(name=pool_name, erasure_code_profile=crush_profile_name, service='admin') erasure_pool.create() else: log("Unknown pool type of {}. Only erasure or replicated is " "allowed".format(pool_type)) action_fail("Unknown pool type of {}. Only erasure or replicated " "is allowed".format(pool_type)) except CalledProcessError as e: action_fail("Pool creation failed because of a failed process. " "Ret Code: {} Message: {}".format(e.returncode, str(e)))
def pool_stats(): try: pool_name = action_get("pool-name") cluster = connect() ioctx = cluster.open_ioctx(pool_name) stats = ioctx.get_stats() ioctx.close() cluster.shutdown() return stats except (rados.Error, rados.IOError, rados.ObjectNotFound, rados.NoData, rados.NoSpace, rados.PermissionError) as e: action_fail(str(e))
def update_crushmap(): try: encoded_text = action_get("map") json_map = base64.b64decode(encoded_text) try: # This needs json_map passed to it from stdin crushtool = Popen( ["crushtool", "-o", "compiled_crushmap", "-m", "compile"], stdin=PIPE) crushtool_stdout, crushtool_stderr = crushtool.communicate( input=json_map) if crushtool_stderr is not None: action_fail( "Failed to compile json: {}".format(crushtool_stderr)) check_output( ["ceph", "osd", "setcrushmap", "-i", "compiled_crushmap"]) except (CalledProcessError, OSError) as err2: action_fail("Crush compile or load failed with error: {}".format( err2.output)) except TypeError as err: action_fail( "Unable to base64 decode: {}. Error: {}".format(encoded_text, err))
def delete_cache_tier(): backer_pool = action_get("backer-pool") cache_pool = action_get("cache-pool") # Pre flight checks if not pool_exists('admin', backer_pool): log("Backer pool {} must exist before calling this".format( backer_pool)) action_fail("remove-cache-tier failed. Backer pool {} must exist " "before calling this".format(backer_pool)) if not pool_exists('admin', cache_pool): log("Cache pool {} must exist before calling this".format( cache_pool)) action_fail("remove-cache-tier failed. Cache pool {} must exist " "before calling this".format(cache_pool)) pool = Pool(service='admin', name=backer_pool) try: pool.remove_cache_tier(cache_pool=cache_pool) except CalledProcessError as err: log("Removing the cache tier failed with message: {}".format(str(err))) action_fail("remove-cache-tier failed. Removing the cache tier failed " "with message: {}".format(str(err)))
def enable_volume_quota(): """ Enable quotas on the volume """ # Gather our action parameters vol = action_get("volume") usage_limit = action_get("usage-limit") parsed_usage_limit = int(usage_limit) path = action_get("path") # Turn quotas on if not already enabled quotas_enabled = volume.volume_quotas_enabled(vol) if quotas_enabled.is_err(): action_fail("Enable quota failed: {}".format(quotas_enabled.value)) if not quotas_enabled.value: try: volume.volume_enable_quotas(vol) except GlusterCmdException as e: action_fail("Enable quotas failed: {}".format(e)) try: volume.volume_add_quota(vol, path, parsed_usage_limit) except GlusterCmdException as e: action_fail("Add quota failed: {}".format(e))
def list_volume_quotas(): """ List quotas on the volume """ vol = action_get("volume") quotas_enabled = volume.volume_quotas_enabled(vol) if quotas_enabled.is_err(): action_fail("List quota failed: {}".format(quotas_enabled.value)) if quotas_enabled.value: quotas = volume.quota_list(vol) if quotas.is_err(): action_fail( "Failed to get volume quotas: {}".format(quotas.value)) quota_strings = [] for quota in quotas.value: quota_string = "path:{} limit:{} used:{}".format( quota.path, quota.hard_limit, quota.used) quota_strings.append(quota_string) action_set({"quotas": "\n".join(quota_strings)})
def set_volume_options(): """ Set one or more options on the volume at once """ vol = action_get("volume") # Gather all of the action parameters up at once. We don't know what # the user wants to change. options = action_get() settings = [] for key in options: if key != "volume": settings.append( volume.GlusterOption.from_str(key, options[key])) volume.volume_set_options(vol, settings) # Actions to function mapping, to allow for illegal python action names that # can map to a python function.
def pool_stats(): try: pool_name = action_get("pool-name") cluster = connect() ioctx = cluster.open_ioctx(pool_name) stats = ioctx.get_stats() ioctx.close() cluster.shutdown() return stats except (rados.Error, rados.IOError, rados.ObjectNotFound, rados.NoData, rados.NoSpace, rados.PermissionError) as e: action_fail(e.message)
def delete_cache_tier(): backer_pool = action_get("backer-pool") cache_pool = action_get("cache-pool") # Pre flight checks if not pool_exists('admin', backer_pool): log("Backer pool {} must exist before calling this".format( backer_pool)) action_fail("remove-cache-tier failed. Backer pool {} must exist " "before calling this".format(backer_pool)) if not pool_exists('admin', cache_pool): log("Cache pool {} must exist before calling this".format( cache_pool)) action_fail("remove-cache-tier failed. Cache pool {} must exist " "before calling this".format(cache_pool)) pool = Pool(service='admin', name=backer_pool) try: pool.remove_cache_tier(cache_pool=cache_pool) except CalledProcessError as err: log("Removing the cache tier failed with message: {}".format( err.message)) action_fail("remove-cache-tier failed. Removing the cache tier failed " "with message: {}".format(err.message))
def get_devices(): """Parse 'osd-devices' action parameter, returns list.""" devices = [] for path in hookenv.action_get('osd-devices').split(' '): path = path.strip() if not os.path.isabs(path): raise Error('{}: Not absolute path.'.format(path)) devices.append(path) return devices
def get_devices(): devices = [] for path in hookenv.action_get('osd-devices').split(' '): path = path.strip() if os.path.isabs(path): devices.append(path) return devices
def delete_erasure_profile(): name = action_get("name") try: remove_erasure_profile(service='admin', profile_name=name) except CalledProcessError as e: log("Remove erasure profile failed with error {}".format(str(e)), level="ERROR") action_fail("Remove erasure profile failed with error: {}" .format(str(e)))
def pool_get(): key = action_get("key") pool_name = action_get("pool_name") try: value = (check_output(['ceph', 'osd', 'pool', 'get', pool_name, key]) .decode('UTF-8')) return value except CalledProcessError as e: action_fail(str(e))
def set_pool(): key = action_get("key") value = action_get("value") pool_name = action_get("pool_name") pool_set(service='ceph', pool_name=pool_name, key=key, value=value)
def set_pool_max_bytes(): pool_name = action_get("pool-name") max_bytes = action_get("max") set_pool_quota(service='ceph', pool_name=pool_name, max_bytes=max_bytes)
def snapshot_ceph_pool(): pool_name = action_get("pool-name") snapshot_name = action_get("snapshot-name") snapshot_pool(service='ceph', pool_name=pool_name, snapshot_name=snapshot_name)
def make_cache_tier(): backer_pool = action_get("backer-pool") cache_pool = action_get("cache-pool") cache_mode = action_get("cache-mode") # Pre flight checks if not pool_exists('admin', backer_pool): log("Please create {} pool before calling create-cache-tier".format( backer_pool)) action_fail("create-cache-tier failed. Backer pool {} must exist " "before calling this".format(backer_pool)) if not pool_exists('admin', cache_pool): log("Please create {} pool before calling create-cache-tier".format( cache_pool)) action_fail("create-cache-tier failed. Cache pool {} must exist " "before calling this".format(cache_pool)) pool = Pool(service='admin', name=backer_pool) try: pool.add_cache_tier(cache_pool=cache_pool, mode=cache_mode) except CalledProcessError as err: log("Add cache tier failed with message: {}" .format(str(err))) action_fail("create-cache-tier failed. Add cache tier failed with " "message: {}".format(str(err)))
def make_erasure_profile(): name = action_get("name") out = get_erasure_profile(service='admin', name=name) action_set({'message': out})
def rebalance_volume(): """ Start a rebalance volume operation """ vol = action_get("volume") if not vol: action_fail("volume not specified") try: volume.volume_rebalance(vol) except GlusterCmdException as e: action_fail("volume rebalance failed with error: {}".format(e))
def enable_bitrot_scan(): """ Enable bitrot scan """ vol = action_get("volume") if not vol: action_fail("volume not specified") try: volume.volume_enable_bitrot(vol) except GlusterCmdException as e: action_fail("enable bitrot failed with error: {}".format(e))
def disable_bitrot_scan(): """ Disable bitrot scan """ vol = action_get("volume") if not vol: action_fail("volume not specified") try: volume.volume_disable_bitrot(vol) except GlusterCmdException as e: action_fail("enable disable failed with error: {}".format(e))
def pause_bitrot_scan(): """ Pause bitrot scan """ vol = action_get("volume") option = volume.BitrotOption.Scrub(volume.ScrubControl.Pause) try: volume.volume_set_bitrot_option(vol, option) except GlusterCmdException as e: action_fail("pause bitrot scan failed with error: {}".format(e))
def resume_bitrot_scan(): """ Resume bitrot scan """ vol = action_get("volume") option = volume.BitrotOption.Scrub(volume.ScrubControl.Resume) try: volume.volume_set_bitrot_option(vol, option) except GlusterCmdException as e: action_fail("resume bitrot scan failed with error: {}".format(e))
def set_bitrot_throttle(): """ Set how aggressive bitrot scanning should be """ vol = action_get("volume") throttle = action_get("throttle") option = volume.ScrubAggression.from_str(throttle) try: volume.volume_set_bitrot_option(vol, volume.BitrotOption.ScrubThrottle( option)) except GlusterCmdException as e: action_fail( "set bitrot throttle failed with error: {}".format(e))
def disable_volume_quota(): """ Disable quotas on the volume """ vol = action_get("volume") path = action_get("path") quotas_enabled = volume.volume_quotas_enabled(vol) if quotas_enabled.is_err(): action_fail("Disable quota failed: {}".format(quotas_enabled.value)) if quotas_enabled.value: try: volume.volume_remove_quota(vol, path) except GlusterCmdException as e: action_fail("remove quota failed with error: {}".format(e))
def pool_get(): key = action_get("key") pool_name = action_get("pool_name") try: value = check_output(['ceph', 'osd', 'pool', 'get', pool_name, key]) return value except CalledProcessError as e: action_fail(e.message)
def delete_pool_snapshot(): pool_name = action_get("pool-name") snapshot_name = action_get("snapshot-name") remove_pool_snapshot(service='ceph', pool_name=pool_name, snapshot_name=snapshot_name) # Note only one or the other can be set
def make_cache_tier(): backer_pool = action_get("backer-pool") cache_pool = action_get("cache-pool") cache_mode = action_get("cache-mode") # Pre flight checks if not pool_exists('admin', backer_pool): log("Please create {} pool before calling create-cache-tier".format( backer_pool)) action_fail("create-cache-tier failed. Backer pool {} must exist " "before calling this".format(backer_pool)) if not pool_exists('admin', cache_pool): log("Please create {} pool before calling create-cache-tier".format( cache_pool)) action_fail("create-cache-tier failed. Cache pool {} must exist " "before calling this".format(cache_pool)) pool = Pool(service='admin', name=backer_pool) try: pool.add_cache_tier(cache_pool=cache_pool, mode=cache_mode) except CalledProcessError as err: log("Add cache tier failed with message: {}".format( err.message)) action_fail("create-cache-tier failed. Add cache tier failed with " "message: {}".format(err.message))
def test_action_get_with_key(self, check_output): action_data = 'bar' check_output.return_value = json.dumps(action_data).encode('UTF-8') result = hookenv.action_get(key='foo') self.assertEqual(result, 'bar') check_output.assert_called_with(['action-get', 'foo', '--format=json'])
def test_action_get_without_key(self, check_output): check_output.return_value = json.dumps(dict(foo='bar')).encode('UTF-8') result = hookenv.action_get() self.assertEqual(result['foo'], 'bar') check_output.assert_called_with(['action-get', '--format=json'])