我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用charmhelpers.core.hookenv.config()。
def add_device(request, device_path, bucket=None): ceph.utils.osdize(dev, hookenv.config('osd-format'), ceph_hooks.get_journal_devices(), hookenv.config('osd-reformat'), hookenv.config('ignore-device-errors'), hookenv.config('osd-encrypt'), hookenv.config('bluestore')) # Make it fast! if hookenv.config('autotune'): ceph.utils.tune_dev(dev) mounts = filter(lambda disk: device_path in disk.device, psutil.disk_partitions()) if mounts: osd = mounts[0] osd_id = osd.mountpoint.split('/')[-1].split('-')[-1] request.ops.append({ 'op': 'move-osd-to-bucket', 'osd': "osd.{}".format(osd_id), 'bucket': bucket}) return request
def az_info(): az_info = "" config_az = config("availability_zone") juju_az_info = os.environ.get('JUJU_AVAILABILITY_ZONE') if juju_az_info: # NOTE(jamespage): avoid conflicting key with root # of crush hierarchy if juju_az_info == 'default': juju_az_info = 'default-rack' az_info = "{} rack={}".format(az_info, juju_az_info) if config_az: # NOTE(jamespage): avoid conflicting key with root # of crush hierarchy if config_az == 'default': config_az = 'default-row' az_info = "{} row={}".format(az_info, config_az) if az_info != "": log("AZ Info: " + az_info) return az_info
def use_short_objects(): ''' Determine whether OSD's should be configured with limited object name lengths. @return: boolean indicating whether OSD's should be limited ''' if cmp_pkgrevno('ceph', "10.2.0") >= 0: if config('osd-format') in ('ext4'): return True for device in config('osd-devices'): if device and not device.startswith('/dev'): # TODO: determine format of directory based # OSD location return True return False
def prepare_disks_and_activate(): osd_journal = get_journal_devices() check_overlap(osd_journal, set(get_devices())) log("got journal devs: {}".format(osd_journal), level=DEBUG) already_zapped = read_zapped_journals() non_zapped = osd_journal - already_zapped for journ in non_zapped: ceph.maybe_zap_journal(journ) write_zapped_journals(osd_journal) if ceph.is_bootstrapped(): log('ceph bootstrapped, rescanning disks') emit_cephconf() for dev in get_devices(): ceph.osdize(dev, config('osd-format'), osd_journal, config('osd-reformat'), config('ignore-device-errors'), config('osd-encrypt'), config('bluestore')) # Make it fast! if config('autotune'): ceph.tune_dev(dev) ceph.start_osds(get_devices())
def get_devices(): devices = [] if config('osd-devices'): for path in config('osd-devices').split(' '): path = path.strip() # Make sure its a device which is specified using an # absolute path so that the current working directory # or any relative path under this directory is not used if os.path.isabs(path): devices.append(os.path.realpath(path)) # List storage instances for the 'osd-devices' # store declared for this charm too, and add # their block device paths to the list. storage_ids = storage_list('osd-devices') devices.extend((storage_get('location', s) for s in storage_ids)) # Filter out any devices in the action managed unit-local device blacklist _blacklist = get_blacklist() return [device for device in devices if device not in _blacklist]
def get_mon_hosts(): hosts = [] addr = get_public_addr() hosts.append('{}:6789'.format(format_ipv6_addr(addr) or addr)) rel_ids = relation_ids('mon') if config('no-bootstrap'): rel_ids += relation_ids('bootstrap-source') for relid in rel_ids: for unit in related_units(relid): addr = relation_get('ceph-public-address', unit, relid) if addr is not None: hosts.append('{}:6789'.format( format_ipv6_addr(addr) or addr)) return sorted(hosts)
def admin_relation_joined(relid=None): if ceph.is_quorum(): name = relation_get('keyring-name') if name is None: name = 'admin' log('mon cluster in quorum - providing client with keys') mon_hosts = config('monitor-hosts') or ' '.join(get_mon_hosts()) data = {'key': ceph.get_named_key(name=name, caps=ceph.admin_caps), 'fsid': leader_get('fsid'), 'auth': config('auth-supported'), 'mon_hosts': mon_hosts, } relation_set(relation_id=relid, relation_settings=data) else: log('mon cluster not in quorum - deferring key provision')
def client_relation_joined(relid=None): if ceph.is_quorum(): log('mon cluster in quorum - providing client with keys') service_name = None if relid is None: units = [remote_unit()] service_name = units[0].split('/')[0] else: units = related_units(relid) if len(units) > 0: service_name = units[0].split('/')[0] if service_name is not None: public_addr = get_public_addr() data = {'key': ceph.get_named_key(service_name), 'auth': config('auth-supported'), 'ceph-public-address': public_addr} if config('default-rbd-features'): data['rbd-features'] = config('default-rbd-features') relation_set(relation_id=relid, relation_settings=data) else: log('mon cluster not in quorum - deferring key provision')
def get_manual_bricks() -> Result: """ Get the list of bricks from the config.yaml :return: Result with Ok or Err """ log("Gathering list of manually specified brick devices") brick_list = [] manual_config_brick_devices = config("brick_devices") if manual_config_brick_devices: for brick in manual_config_brick_devices.split(" "): if brick is not None: brick_list.append(brick) log("List of manual storage brick devices: {}".format(brick_list)) bricks = scan_devices(brick_list) if bricks.is_err(): return Err(bricks.value) return Ok(bricks.value)
def render_samba_configuration(f: TextIOBase, volume_name: str) -> int: """ Write the samba configuration file out to disk :param f: TextIOBase handle to the sambe config file :param volume_name: str :return: int of bytes written """ bytes_written = 0 bytes_written += f.write("[{}]\n".format(volume_name)) bytes_written += f.write(b"path = /mnt/glusterfs\n" b"read only = no\n" b"guest ok = yes\n" b"kernel share modes = no\n" b"kernel oplocks = no\n" b"map archive = no\n" b"map hidden = no\n" b"map read only = no\n" b"map system = no\n" b"store dos attributes = yes\n") return bytes_written
def samba_config_changed() -> bool: """ Checks whether a samba config file has changed or not. :param volume_name: str. :return: True or False """ volume_name = config("volume_name") samba_path = os.path.join(os.sep, 'etc', 'samba', 'smb.conf') if os.path.exists(samba_path): # Lets check if the smb.conf matches what we're going to write. # If so then it was already setup and there's nothing to do with open(samba_path) as existing_config: old_config = existing_config.readlines() new_config = io.StringIO() render_samba_configuration(new_config, volume_name) if "".join(new_config) == "".join(old_config): # configs are identical return False else: return True # Config doesn't exist. return True
def setup_samba(): """ Installs and starts up samba :param volume_name: str. Gluster volume to start samba on """ volume_name = config("volume_name") cifs_config = config("cifs") if cifs_config is None: # Samba isn't enabled return if not samba_config_changed(volume_name): # log!("Samba is already setup. Not reinstalling") return status_set("Maintenance", "Installing Samba") apt_install(["samba"]) status_set("Maintenance", "Configuring Samba") with open(os.path.join(os.sep, 'etc', 'samba', 'smb.conf')) as samba_conf: bytes_written = render_samba_configuration(samba_conf, volume_name) log("Wrote {} bytes to /etc/samba/smb.conf", bytes_written) log("Starting Samba service") status_set("Maintenance", "Starting Samba") service_start("smbd") set_state('samba.installed')
def use_short_objects(): ''' Determine whether OSD's should be configured with limited object name lengths. @return: boolean indicating whether OSD's should be limited ''' if cmp_pkgrevno('ceph', "10.2.0") >= 0: if config('osd-format') in ('ext4'): return True for device in config('osd-devices'): if not device.startswith('/dev'): # TODO: determine format of directory based # OSD location return True return False
def get_devices(): devices = [] if config('osd-devices'): for path in config('osd-devices').split(' '): path = path.strip() # Make sure its a device which is specified using an # absolute path so that the current working directory # or any relative path under this directory is not used if os.path.isabs(path): devices.append(os.path.realpath(path)) # List storage instances for the 'osd-devices' # store declared for this charm too, and add # their block device paths to the list. storage_ids = storage_list('osd-devices') devices.extend((storage_get('location', s) for s in storage_ids)) return devices
def __init__(self, *args): self.required_options = args self['config'] = hookenv.config() with open(os.path.join(hookenv.charm_dir(), 'config.yaml')) as fp: self.config = yaml.load(fp).get('options', {})
def __bool__(self): for option in self.required_options: if option not in self['config']: return False current_value = self['config'][option] default_value = self.config[option].get('default') if current_value == default_value: return False if current_value in (None, '') and default_value in (None, ''): return False return True
def unmount_volume(config): if os.path.ismount(config['mountpoint']): if not host.umount(config['mountpoint'], persist=True): raise VolumeConfigurationError()
def configure_volume(before_change=lambda: None, after_change=lambda: None): '''Set up storage (or don't) according to the charm's volume configuration. Returns the mount point or "ephemeral". before_change and after_change are optional functions to be called if the volume configuration changes. ''' config = get_config() if not config: hookenv.log('Failed to read volume configuration', hookenv.CRITICAL) raise VolumeConfigurationError() if config['ephemeral']: if os.path.ismount(config['mountpoint']): before_change() unmount_volume(config) after_change() return 'ephemeral' else: # persistent storage if os.path.ismount(config['mountpoint']): mounts = dict(managed_mounts()) if mounts.get(config['mountpoint']) != config['device']: before_change() unmount_volume(config) mount_volume(config) after_change() else: before_change() mount_volume(config) after_change() return config['mountpoint']
def setUp(self): super(HeatUtilsTests, self).setUp(utils, TO_PATCH) self.config.side_effect = self.test_config.get
def test_openstack_upgrade(self): self.config.side_effect = None self.config.return_value = 'cloud:precise-havana' self.get_os_codename_install_source.return_value = 'havana' configs = MagicMock() utils.do_openstack_upgrade(configs) self.assertTrue(self.apt_update.called) self.assertTrue(self.apt_upgrade.called) self.assertTrue(self.apt_install.called) configs.set_release.assert_called_with(openstack_release='havana') self.assertTrue(configs.write_all.called)
def __call__(self): ctxt = super(HeatIdentityServiceContext, self).__call__() if not ctxt: return # the ec2 api needs to know the location of the keystone ec2 # tokens endpoint, set in nova.conf ec2_tokens = generate_ec2_tokens(ctxt['service_protocol'] or 'http', ctxt['service_host'], ctxt['service_port']) ctxt['keystone_ec2_url'] = ec2_tokens ctxt['region'] = config('region') return ctxt
def get_encryption_key(): encryption_key = config("encryption-key") if not encryption_key: encryption_key = leader_get('heat-auth-encryption-key') return encryption_key
def __call__(self): ctxt = {} instance_user = '' if config('instance-user'): instance_user = config('instance-user') ctxt['instance_user'] = instance_user return ctxt
def mount_volume(config): if os.path.exists(config['mountpoint']): if not os.path.isdir(config['mountpoint']): hookenv.log('Not a directory: {}'.format(config['mountpoint'])) raise VolumeConfigurationError() else: host.mkdir(config['mountpoint']) if os.path.ismount(config['mountpoint']): unmount_volume(config) if not host.mount(config['device'], config['mountpoint'], persist=True): raise VolumeConfigurationError()