我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用charmhelpers.core.host.mkdir()。
def test_creates_a_directory_if_it_doesnt_exist(self, os_, log, getgrnam, getpwnam): uid = 123 gid = 234 owner = 'some-user' group = 'some-group' path = '/some/other/path/from/link' realpath = '/some/path' path_exists = False perms = 0o644 getpwnam.return_value.pw_uid = uid getgrnam.return_value.gr_gid = gid os_.path.abspath.return_value = realpath os_.path.exists.return_value = path_exists host.mkdir(path, owner=owner, group=group, perms=perms) getpwnam.assert_called_with('some-user') getgrnam.assert_called_with('some-group') os_.path.abspath.assert_called_with(path) os_.path.exists.assert_called_with(realpath) os_.makedirs.assert_called_with(realpath, perms) os_.chown.assert_called_with(realpath, uid, gid)
def test_creates_a_directory_with_defaults(self, os_, log): uid = 0 gid = 0 path = '/some/other/path/from/link' realpath = '/some/path' path_exists = False perms = 0o555 os_.path.abspath.return_value = realpath os_.path.exists.return_value = path_exists host.mkdir(path) os_.path.abspath.assert_called_with(path) os_.path.exists.assert_called_with(realpath) os_.makedirs.assert_called_with(realpath, perms) os_.chown.assert_called_with(realpath, uid, gid)
def mount_volume(config): if os.path.exists(config['mountpoint']): if not os.path.isdir(config['mountpoint']): hookenv.log('Not a directory: {}'.format(config['mountpoint'])) raise VolumeConfigurationError() else: host.mkdir(config['mountpoint']) if os.path.ismount(config['mountpoint']): unmount_volume(config) if not host.mount(config['device'], config['mountpoint'], persist=True): raise VolumeConfigurationError()
def configure_ceph(ceph): with charm.provide_charm_instance() as charm_class: # TODO(jamespage): refactor to avoid massaging helper ceph_helper.KEYRING = charm_class.ceph_keyring host.mkdir(os.path.dirname(charm_class.ceph_keyring)) ceph_helper.ensure_ceph_keyring(service=hookenv.service_name(), key=ceph.key(), user=charm_class.gnocchi_user, group=charm_class.gnocchi_group)
def install(self, plugins): """Install the given plugins, optionally removing unlisted ones. @params plugins: A whitespace-separated list of plugins to install. """ plugins = plugins or "" plugins = plugins.split() hookenv.log("Stopping jenkins for plugin update(s)") host.service_stop("jenkins") hookenv.log("Installing plugins (%s)" % " ".join(plugins)) host.mkdir( paths.PLUGINS, owner="jenkins", group="jenkins", perms=0o0755) existing_plugins = set(glob.glob("%s/*.hpi" % paths.PLUGINS)) try: installed_plugins = self._install_plugins(plugins) except: hookenv.log("Plugin installation failed, check logs for details") host.service_start("jenkins") # Make sure we don't leave jenkins down raise unlisted_plugins = existing_plugins - installed_plugins if unlisted_plugins: if hookenv.config()["remove-unlisted-plugins"] == "yes": self._remove_plugins(unlisted_plugins) else: hookenv.log( "Unlisted plugins: (%s) Not removed. Set " "remove-unlisted-plugins to 'yes' to clear them " "away." % ", ".join(unlisted_plugins)) hookenv.log("Starting jenkins to pickup configuration changes") host.service_start("jenkins")
def configure_cert(self, cert, key, cn=None): """Configure service SSL cert and key Write out service SSL certificate and key for Apache. @param cert string SSL Certificate @param key string SSL Key @param cn string Canonical name for service """ if os_utils.snap_install_requested(): ssl_dir = '/var/snap/{snap_name}/common/etc/nginx/ssl'.format( snap_name=self.primary_snap) else: ssl_dir = os.path.join('/etc/apache2/ssl/', self.name) if not cn: cn = os_ip.resolve_address(endpoint_type=os_ip.INTERNAL) ch_host.mkdir(path=ssl_dir) if cn: cert_filename = 'cert_{}'.format(cn) key_filename = 'key_{}'.format(cn) else: cert_filename = 'cert' key_filename = 'key' ch_host.write_file(path=os.path.join(ssl_dir, cert_filename), content=cert.encode('utf-8')) ch_host.write_file(path=os.path.join(ssl_dir, key_filename), content=key.encode('utf-8'))
def install_snap_certs(self): """Install systems CA certificates for a snap Installs the aggregated host system ca-certificates.crt into $SNAP_COMMON/etc/ssl/certs for services running within a sandboxed snap to consume. Snaps should set the REQUESTS_CA_BUNDLE environment variable to ensure requests based API calls use the updated system certs. """ if (os_utils.snap_install_requested() and os.path.exists(SYSTEM_CA_CERTS)): ca_certs = SNAP_CA_CERTS.format(self.primary_snap) ch_host.mkdir(os.path.dirname(ca_certs)) shutil.copyfile(SYSTEM_CA_CERTS, ca_certs)
def createFolders(): status_set('maintenance', 'Creating DC/OS Folders') mkdir(configdir) mkdir(configdir+'roles') mkdir(basedir) mkdir(configdir+'setup-flags') mkdir(basedir+'packages/dcos-config--setup_b3e41695178e35239659186b92f25820c610f961') mkdir(basedir+'packages/dcos-metadata--setup_b3e41695178e35239659186b92f25820c610f961') mkdir('/etc/profile.d') mkdir('/etc/systemd/journald.conf.d')
def createSymlinks(): status_set('maintenance', 'Creating DC/OS symlinks') #Hack to make start scripts work os.symlink("/bin/mkdir", "/usr/bin/mkdir")
def test_removes_file_with_same_path_before_mkdir(self, os_, log, getgrnam, getpwnam): uid = 123 gid = 234 owner = 'some-user' group = 'some-group' path = '/some/other/path/from/link' realpath = '/some/path' path_exists = True force = True is_dir = False perms = 0o644 getpwnam.return_value.pw_uid = uid getgrnam.return_value.gr_gid = gid os_.path.abspath.return_value = realpath os_.path.exists.return_value = path_exists os_.path.isdir.return_value = is_dir host.mkdir(path, owner=owner, group=group, perms=perms, force=force) getpwnam.assert_called_with('some-user') getgrnam.assert_called_with('some-group') os_.path.abspath.assert_called_with(path) os_.path.exists.assert_called_with(realpath) os_.unlink.assert_called_with(realpath) os_.makedirs.assert_called_with(realpath, perms) os_.chown.assert_called_with(realpath, uid, gid)
def render(source, target, context, owner='root', group='root', perms=0o444, templates_dir=None, encoding='UTF-8', template_loader=None): """ Render a template. The `source` path, if not absolute, is relative to the `templates_dir`. The `target` path should be absolute. It can also be `None`, in which case no file will be written. The context should be a dict containing the values to be replaced in the template. The `owner`, `group`, and `perms` options will be passed to `write_file`. If omitted, `templates_dir` defaults to the `templates` folder in the charm. The rendered template will be written to the file as well as being returned as a string. Note: Using this requires python-jinja2; if it is not installed, calling this will attempt to use charmhelpers.fetch.apt_install to install it. """ try: from jinja2 import FileSystemLoader, Environment, exceptions except ImportError: try: from charmhelpers.fetch import apt_install except ImportError: hookenv.log('Could not import jinja2, and could not import ' 'charmhelpers.fetch to install it', level=hookenv.ERROR) raise apt_install('python-jinja2', fatal=True) from jinja2 import FileSystemLoader, Environment, exceptions if template_loader: template_env = Environment(loader=template_loader) else: if templates_dir is None: templates_dir = os.path.join(hookenv.charm_dir(), 'templates') template_env = Environment(loader=FileSystemLoader(templates_dir)) try: source = source template = template_env.get_template(source) except exceptions.TemplateNotFound as e: hookenv.log('Could not load template %s from %s.' % (source, templates_dir), level=hookenv.ERROR) raise e content = template.render(context) if target is not None: target_dir = os.path.dirname(target) if not os.path.exists(target_dir): # This is a terrible default directory permission, as the file # or its siblings will often contain secrets. host.mkdir(os.path.dirname(target), owner, group, perms=0o755) host.write_file(target, content.encode(encoding), owner, group, perms) return content