我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用charmhelpers.core.hookenv.ERROR。
def mount_device(device: Device, mount_point: str) -> Result: """ mount a device at a mount point :param device: Device. :param mount_point: str. Place to mount to. :return: Result with Ok or Err """ arg_list = [] if device.id: arg_list.append("-U") arg_list.append(str(device.id)) else: arg_list.append("/dev/{}".format(device.name)) arg_list.append(mount_point) cmd = ["mount"] cmd.extend(arg_list) try: output = subprocess.check_output(cmd, stderr=subprocess.PIPE) return Ok(output.decode('utf-8')) except subprocess.CalledProcessError as e: log("subprocess failed stdout: {} stderr: {} returncode: {}".format( e.stdout, e.stderr, e.returncode), ERROR) return Err(e.output)
def quota_list(vol_name: str) -> Result: """ Return a list of quotas on the volume if any Enable bitrot detection and remediation on the volume volume: String. The volume to operate on. :return: List of Quota's on the volume :raises: GlusterError if the command fails to run """ cmd = ["gluster", "volume", "quota", vol_name, "list", "--xml"] try: output = subprocess.check_output(cmd, stderr=subprocess.PIPE) quota_list_result = parse_quota_list(output.decode('utf-8')) return quota_list_result except subprocess.CalledProcessError as e: log("subprocess failed stdout: {} stderr: {} returncode: {}".format( e.stdout, e.stderr, e.returncode), ERROR) return Err("Volume quota list command failed with error: {}".format( e.stderr))
def configure_designate_full(*args): """Write out all designate config include bootstrap domain info""" # If cluster relation is available it needs to passed in cluster = reactive.RelationBase.from_state('cluster.available') if cluster is not None: args = args + (cluster, ) dns_backend = reactive.RelationBase.from_state('dns-backend.available') if dns_backend is not None: args = args + (dns_backend, ) with provide_charm_instance() as instance: instance.upgrade_if_available(args) instance.configure_ssl() instance.render_full_config(args) try: # the following function should only run once for the leader. instance.create_initial_servers_and_domains() _render_sink_configs(instance, args) instance.render_rndc_keys() instance.update_pools() except subprocess.CalledProcessError as e: hookenv.log("ensure_api_responding() errored out: {}" .format(str(e)), level=hookenv.ERROR)
def vol_set(vol_name: str, options: Dict[str, str]) -> None: """ :param vol_name: String. Volume name to set the option on :param options: GlusterOption :return: Result. Return code and output of cmd """ try: volume.optset(volname=vol_name, opts=options) except GlusterCmdException as e: log("volume.optset failed: {}".format(e), ERROR) raise
def fuse_relation_joined(): # Fuse clients only need one ip address and they can discover the rest """ """ public_addr = unit_public_ip() volumes = volume_list() if volumes.is_err(): log("volume list is empty. Unable to complete fuse relation", ERROR) return data = {"gluster-public-address": public_addr, "volumes": " ".join(volumes.value)} relation_set(relation_settings=data)
def add_node(self, host, executors, labels=()): """Add a slave node with the given host name.""" self.wait() client = self._make_client() @retry_on_exception(3, 3, exc_type=RETRIABLE) def _add_node(): if client.node_exists(host): hookenv.log("Node exists - not adding") return hookenv.log("Adding node '%s' to Jenkins master" % host) # See the "Launch slave agent headlessly" section of the Jenkins # wiki page about distributed builds: # # https://wiki.jenkins-ci.org/display/JENKINS/Distributed+builds launcher = jenkins.LAUNCHER_JNLP client.create_node( host, int(executors), host, labels=labels, launcher=launcher) if not client.node_exists(host): hookenv.log( "Failed to create node '%s'" % host, level=ERROR) return _add_node()
def render_rndc_keys(self): """Render the rndc keys supplied via user config @returns None """ slaves = hookenv.config('dns-slaves') or '' try: for entry in slaves.split(): address, port, key = entry.split(':') unit_name = address.replace('.', '_') self.write_key_file(unit_name, key) except ValueError as e: hookenv.log("Problem with 'dns-slaves' config: {}" .format(str(e)), level=hookenv.ERROR)
def test_logs_messages_with_alternative_levels(self, mock_call): alternative_levels = [ hookenv.CRITICAL, hookenv.ERROR, hookenv.WARNING, hookenv.INFO, ] for level in alternative_levels: hookenv.log('foo', level) mock_call.assert_called_with(['juju-log', '-l', level, 'foo'])
def render(source, target, context, owner='root', group='root', perms=0o444, templates_dir=None, encoding='UTF-8', template_loader=None): """ Render a template. The `source` path, if not absolute, is relative to the `templates_dir`. The `target` path should be absolute. It can also be `None`, in which case no file will be written. The context should be a dict containing the values to be replaced in the template. The `owner`, `group`, and `perms` options will be passed to `write_file`. If omitted, `templates_dir` defaults to the `templates` folder in the charm. The rendered template will be written to the file as well as being returned as a string. Note: Using this requires python-jinja2; if it is not installed, calling this will attempt to use charmhelpers.fetch.apt_install to install it. """ try: from jinja2 import FileSystemLoader, Environment, exceptions except ImportError: try: from charmhelpers.fetch import apt_install except ImportError: hookenv.log('Could not import jinja2, and could not import ' 'charmhelpers.fetch to install it', level=hookenv.ERROR) raise apt_install('python-jinja2', fatal=True) from jinja2 import FileSystemLoader, Environment, exceptions if template_loader: template_env = Environment(loader=template_loader) else: if templates_dir is None: templates_dir = os.path.join(hookenv.charm_dir(), 'templates') template_env = Environment(loader=FileSystemLoader(templates_dir)) try: source = source template = template_env.get_template(source) except exceptions.TemplateNotFound as e: hookenv.log('Could not load template %s from %s.' % (source, templates_dir), level=hookenv.ERROR) raise e content = template.render(context) if target is not None: target_dir = os.path.dirname(target) if not os.path.exists(target_dir): # This is a terrible default directory permission, as the file # or its siblings will often contain secrets. host.mkdir(os.path.dirname(target), owner, group, perms=0o755) host.write_file(target, content.encode(encoding), owner, group, perms) return content
def get_config(): '''Gather and sanity-check volume configuration data''' volume_config = {} config = hookenv.config() errors = False if config.get('volume-ephemeral') in (True, 'True', 'true', 'Yes', 'yes'): volume_config['ephemeral'] = True else: volume_config['ephemeral'] = False try: volume_map = yaml.safe_load(config.get('volume-map', '{}')) except yaml.YAMLError as e: hookenv.log("Error parsing YAML volume-map: {}".format(e), hookenv.ERROR) errors = True if volume_map is None: # probably an empty string volume_map = {} elif not isinstance(volume_map, dict): hookenv.log("Volume-map should be a dictionary, not {}".format( type(volume_map))) errors = True volume_config['device'] = volume_map.get(os.environ['JUJU_UNIT_NAME']) if volume_config['device'] and volume_config['ephemeral']: # asked for ephemeral storage but also defined a volume ID hookenv.log('A volume is defined for this unit, but ephemeral ' 'storage was requested', hookenv.ERROR) errors = True elif not volume_config['device'] and not volume_config['ephemeral']: # asked for permanent storage but did not define volume ID hookenv.log('Ephemeral storage was requested, but there is no volume ' 'defined for this unit.', hookenv.ERROR) errors = True unit_mount_name = hookenv.local_unit().replace('/', '-') volume_config['mountpoint'] = os.path.join(MOUNT_BASE, unit_mount_name) if errors: return None return volume_config
def check_for_upgrade() -> Result: """ If the config has changed this will initiated a rolling upgrade :return: """ config = hookenv.config() if not config.changed("source"): # No upgrade requested log("No upgrade requested") return Ok(()) log("Getting current_version") current_version = get_glusterfs_version() log("Adding new source line") source = config["source"] if not source: # No upgrade requested log("Source not set. Cannot continue with upgrade") return Ok(()) add_source(source) log("Calling apt update") apt_update() log("Getting proposed_version") apt_pkg.init_system() proposed_version = get_candidate_package_version("glusterfs-server") if proposed_version.is_err(): return Err(proposed_version.value) version_compare = apt_pkg.version_compare(a=proposed_version.value, b=current_version) # Using semantic versioning if the new version is greater # than we allow the upgrade if version_compare > 0: log("current_version: {}".format(current_version)) log("new_version: {}".format(proposed_version.value)) log("{} to {} is a valid upgrade path. Proceeding.".format( current_version, proposed_version.value)) return roll_cluster(proposed_version.value) else: # Log a helpful error message log("Invalid upgrade path from {} to {}. The new version needs to be \ greater than the old version".format( current_version, proposed_version.value), ERROR) return Ok(())