我们从Python开源项目中,提取了以下31个代码示例,用于说明如何使用fabric.api.task()。
def chef_query(query, api=None, hostname_attr=DEFAULT_HOSTNAME_ATTR, environment=_default_environment): """A decorator to use an arbitrary Chef search query to find nodes to execute on. This is used like Fabric's ``roles()`` decorator, but accepts a Chef search query. Example:: from chef.fabric import chef_query @chef_query('roles:web AND tags:active') @task def deploy(): pass .. versionadded:: 0.2.1 """ api = _api(api) if api.version_parsed < Environment.api_version_parsed and environment is not None: raise ChefAPIVersionError('Environment support requires Chef API 0.10 or greater') rolename = 'query_'+query env.setdefault('roledefs', {})[rolename] = Roledef(query, api, hostname_attr, environment) return lambda fn: roles(rolename)(fn)
def chef_tags(*tags, **kwargs): """A decorator to use Chef node tags to find nodes to execute on. This is used like Fabric's ``roles()`` decorator, but accepts a list of tags. Example:: from chef.fabric import chef_tags @chef_tags('active', 'migrator') @task def migrate(): pass .. versionadded:: 0.2.1 """ # Allow passing a single iterable if len(tags) == 1 and not isinstance(tags[0], six.string_types): tags = tags[0] query = ' AND '.join('tags:%s'%tag.strip() for tag in tags) return chef_query(query, **kwargs)
def set_host(host_index): """A helper task to change env.hosts from the command line. It will only "stick" for the duration of the fab command that called it. Args: host_index (int): 0, 1, 2, 3, etc. Example: fab set_host:4 fab_task_A fab_task_B will set env.hosts = [public_dns_names[4]] but only for doing fab_task_A and fab_task_B """ env.hosts = [public_hosts[int(host_index)]] env.password = [public_pwds[int(host_index)]] # Install base software
def echo_output(func): """if the wrapped function is the sole task being run, then it's output is printed to stdout. this wrapper first attempts to pass the keyword 'verbose' to the function and, if it fails, calls it without. """ @wraps(func) def _wrapper(*args, **kwargs): if _sole_task(func.__name__): res = func(*args, **kwargs) errcho('output:') # printing to stderr avoids corrupting structured data if isinstance(res, str) or isinstance(res, unicode): print res else: print pformat(remove_ordereddict(res)) return res return func(*args, **kwargs) return _wrapper # avoid circular dependencies. # TODO: this is a design smell. refactor.
def define_preset_tasks(module, config): ''' Define tasks for the configured deployment preset. ''' deployment = deployer.import_preset(config) # Now that we have deployment preset set, import all the tasks. for (task_name, func) in deployment.__dict__.iteritems(): if not _is_task(func): continue # Set a new task named as the stage name in the main fabfile module. setattr(module, task_name, func)
def define_stage_tasks(module, config): ''' Define tasks for the stages dynamically. ''' for (stage_name, _) in config['stages'].iteritems(): task_func = task(name=stage_name)(configure_env) task_func.__doc__ = 'Configures the {} server environment.'.format( stage_name) # Set a new task named as the stage name in the main fabfile module. setattr(module, stage_name, task_func)
def _uncrawl(task, _cache={}): if not _cache: def _fill_cache(mapping=state.commands, keys=[]): for key, value in mapping.items(): _keys = keys + [key] if isinstance(value, dict): _fill_cache(value, _keys) else: _cache[value] = '.'.join(_keys) _fill_cache() return _cache.get(task)
def execute(*args, **kwargs): try: task, args = args[0], args[1:] except IndexError: raise TypeError('must provide task to execute') default_name = '{command}.{task_name}({id})'.format( command=fab.env.command, task_name=getattr(task, 'name', task.__name__), id=id(task), ) with utils.patch(task, 'name', _uncrawl(task) or default_name): return fab.execute(task, *args, **kwargs)
def skip_unknown_host(func): @functools.wraps(func) def _task(*args, **kwargs): if fab.env.get('host_string', False): return func(*args, **kwargs) fabricio.log( "'{func}' execution was skipped due to no host provided " "(command: {command})".format( func=func.__name__, command=fab.env.command, ) ) _task.wrapped = func # compatibility with 'fab --display <task>' option return _task
def __new__(cls, *args, **kwargs): self = super(Tasks, cls).__new__(cls) for attr in dir(cls): attr_value = getattr(cls, attr) if is_task_object(attr_value): task_decorator = fab.task( default=attr_value.is_default, name=attr_value.name, aliases=attr_value.aliases, task_class=attr_value.__class__, ) bounded_task = functools.partial(attr_value.wrapped, self) task = task_decorator(functools.wraps(attr_value)(bounded_task)) for wrapped_attr in [ 'parallel', 'serial', 'pool_size', 'hosts', 'roles', ]: if hasattr(attr_value.wrapped, wrapped_attr): setattr( task.wrapped, wrapped_attr, getattr(attr_value.wrapped, wrapped_attr), ) setattr(self, attr, task) return self
def __init__(self, roles=(), hosts=(), create_default_roles=True): if create_default_roles: for role in roles: fab.env.roledefs.setdefault(role, []) for task in self: if not hasattr(task, 'roles'): task.roles = roles if not hasattr(task, 'hosts'): task.hosts = hosts
def default(self, *args, **kwargs): """ select {name} infrastructure to run task(s) on """ if not console.confirm( 'Are you sure you want to select {name} ' 'infrastructure to run task(s) on?'.format( name=self.color(self.name), ), default=False, ): fab.abort('Aborted') self.confirm(*args, **kwargs)
def test_skip_unknown_host(self): mocked_task = mock.Mock() @fabricio.tasks.skip_unknown_host def task(): mocked_task() with fab.settings(fab.hide('everything')): fab.execute(task) mocked_task.assert_not_called() fab.execute(task, host='host') mocked_task.assert_called_once()
def test_infrastructure(self): class AbortException(Exception): pass def task(): pass cases = dict( default=dict( decorator=tasks.infrastructure, expected_infrastructure='task', ), invoked=dict( decorator=tasks.infrastructure(), expected_infrastructure='task', ), with_custom_name=dict( decorator=tasks.infrastructure(name='infrastructure'), expected_infrastructure='infrastructure', ), ) with fab.settings(abort_on_prompts=True, abort_exception=AbortException): with mock.patch.object(fab, 'abort', side_effect=AbortException): for case, data in cases.items(): with self.subTest(case=case): decorator = data['decorator'] infrastructure = decorator(task) self.assertTrue(is_task_object(infrastructure.confirm)) self.assertTrue(is_task_object(infrastructure.default)) fab.execute(infrastructure.confirm) self.assertEqual(data['expected_infrastructure'], fab.env.infrastructure) fab.env.infrastructure = None with mock.patch.object(console, 'confirm', side_effect=[True, False]): fab.execute(infrastructure.default) self.assertEqual(data['expected_infrastructure'], fab.env.infrastructure) with self.assertRaises(AbortException): fab.execute(infrastructure.default)
def chef_environment(name, api=None): """A Fabric task to set the current Chef environment context. This task works alongside :func:`~chef.fabric.chef_roledefs` to set the Chef environment to be used in future role queries. Example:: from chef.fabric import chef_environment, chef_roledefs env.roledefs = chef_roledefs() .. code-block:: bash $ fab env:production deploy The task can be configured slightly via Fabric ``env`` values. ``env.chef_environment_task_alias`` sets the task alias, defaulting to "env". This value must be set **before** :mod:`chef.fabric` is imported. ``env.chef_environment_validate`` sets if :class:`~chef.Environment` names should be validated before use. Defaults to True. .. versionadded:: 0.2 """ if env.get('chef_environment_validate', True): api = _api(api) chef_env = Environment(name, api=api) if not chef_env.exists: raise ChefError('Unknown Chef environment: %s' % name) env['chef_environment'] = name
def reindex_rethrottle(task_id=None, **kwargs): """ Change the value of requests_per_second of a running reindex task. https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html """ res = request("reindex_rethrottle", None, task_id=task_id, **kwargs) jsonprint(res) return res
def _run_task(task, args=None, kwargs=None): from fabfile import targets if targets: for target in targets: env = _build_env(target) if env: with fab_settings(** env): _run_task_core(task, args, kwargs) else: _run_task_core(task, args, kwargs)
def _run_task_core(task, args, kwargs): if task: if args: task(* args) elif kwargs: task(** kwargs) else: task()
def send_client_confile(confile): put(confile, 'tempfile') run('mv tempfile ~/.bigchaindb') print('For this node, bigchaindb show-config says:') run('simplechaindb show-config') # Initialize BigchainDB # i.e. create the database, the tables, # the indexes, and the genesis block. # (The @hosts decorator is used to make this # task run on only one node. See http://tinyurl.com/h9qqf3t )
def INIT(): '''Init repo `fabsetup_custom` with custom tasks and config.''' # decorator @needs_repo_fabsetup_custom makes the job print(green('Initialization finished\n')) print('List available tasks: ' + blue('fab -l')) print('Show details of a task: `fab -d <task>`, eg.: ' + blue('fab -d setup_webserver'))
def setup_desktop(): '''Run setup tasks to set up a nicely configured desktop pc.''' # decorator @needs_repo_fabsetup_custom makes the job print('Init completed. Now run this task again.') # Next time, the task setup_desktop from # fabsetup_custom/fabfile_/__init__.py will be executed # instead
def setup_webserver(): '''Run setup tasks to set up a nicely configured webserver.''' # decorator @needs_repo_fabsetup_custom makes the job print('Init completed. Now run this task again.') # Next time, the task setup_webserver from # fabsetup_custom/fabfile_/__init__.py will be executed # instead
def _local_needs_pythons(*args, **kwargs): with warn_only(): res = local(*args, **kwargs) print(res) if res.return_code == 127: print(cyan('missing python version(s), ' 'run fabric task `pythons`:\n\n ' 'fab pythons\n')) sys.exit(1)
def test(args='', py=None): '''Run unit tests. Keyword-Args: args: Optional arguments passed to pytest py: python version to run the tests against Example: fab test:args=-s,py=py27 ''' basedir = dirname(__file__) if py is None: # e.g. envlist: 'envlist = py26,py27,py33,py34,py35,py36' envlist = local(flo('cd {basedir} && grep envlist tox.ini'), capture=True) _, py = envlist.rsplit(',', 1) with warn_only(): res = local(flo('cd {basedir} && ' "PYTHONPATH='.' .tox/{py}/bin/python -m pytest {args}")) print(res) if res.return_code == 127: print(cyan('missing tox virtualenv, ' 'run fabric task `tox`:\n\n ' 'fab tox\n')) sys.exit(1)
def download_box(pname): "just download the vagrant .box for given project" boxurl = box_url(pname) if not boxurl.startswith('s3://'): print 'this task only downloads from s3. unhandled url', boxurl exit(1) dest = join('/tmp', os.path.basename(boxurl)) if os.path.exists(dest): print 'file %s already exists, skipping download. move or rename the file to re-download' % dest return dest cmd = "aws s3 cp %s %s" % (boxurl, dest) assert local(cmd).return_code == 0, "failed to successfully download %s" % boxurl return dest
def install_box(pname): """alternative to 'vagrant add boxname'. downloads and installs a vagrant box. macs appear to have a problem maintaining a connection to S3, so this task downloads it for Vagrant and then adds it from the filesystem""" if box_installed(pname): print 'the .box file for %r has already been installed (%s)' % (pname, prj(pname, 'vagrant.box')) return dest = download_box(pname) with settings(warn_only=True): cmd = "vagrant box add %s %s" % (prj(pname, 'vagrant.box'), dest) retval = local(cmd).return_code if retval == 0 and os.path.exists(dest): print 'removing downloaded file ...' local('rm -i %s' % dest)
def rtask(*roles): "role task. this task is only available for certain roles specified in the BLDR_ROLE env var" def wrapper(func): role_env = os.getenv('BLDR_ROLE') if role_env in roles: # a role has been set return task(func) return func return wrapper # pylint: disable=invalid-name
def stop_if_running_for(stackname, minimum_minutes='30'): """If a EC2 node has been running for a time greater than minimum_minutes, stop it. The assumption is that stacks where this command is used are not needed for long parts of the day/week, and that who needs them will call the start task first.""" return lifecycle.stop_if_running_for(stackname, int(minimum_minutes))
def with_defaults(func): """A decorator that sets all defaults for a task.""" @functools.wraps(func) def decorated(*args, **kwargs): env.setdefault('use_sudo', True) env.setdefault('git_branch', 'master') env.setdefault('python_bin', 'python') env.setdefault('remote_owner', 'www-data') env.setdefault('remote_group', 'www-data') env.setdefault('pip_install_command', 'pip install -r requirements.txt') env.setdefault('domain_path', "%(base_dir)s/%(app_name)s" % {'base_dir': env.base_dir, 'app_name': env.app_name}) env.setdefault('current_path', "%(domain_path)s/current" % {'domain_path': env.domain_path}) env.setdefault('releases_path', "%(domain_path)s/releases" % {'domain_path': env.domain_path}) env.setdefault('shared_path', "%(domain_path)s/shared" % {'domain_path': env.domain_path}) if not 'releases' not in env: if dir_exists(env.releases_path): env.releases = sorted(run('ls -x %(releases_path)s' % {'releases_path': env.releases_path}).split()) if len(env.releases) >= 1: env.current_revision = env.releases[-1] env.current_release = "%(releases_path)s/%(current_revision)s" % \ {'releases_path': env.releases_path, 'current_revision': env.current_revision} if len(env.releases) > 1: env.previous_revision = env.releases[-2] env.previous_release = "%(releases_path)s/%(previous_revision)s" % \ {'releases_path': env.releases_path, 'previous_revision': env.previous_revision} return func(*args, **kwargs) return decorated
def test_tasks(self): class TestTasks(tasks.Tasks): @fab.task(default=True, aliases=['foo', 'bar']) def default(self): pass @fab.task(name='name', alias='alias') def task(self): pass @fab.task @fab.serial def serial(self): pass @fab.task @fab.parallel def parallel(self): pass roles = ['role_1', 'role_2'] hosts = ['host_1', 'host_2'] tasks_list = TestTasks(roles=roles, hosts=hosts) self.assertTrue(is_task_module(tasks_list)) self.assertTrue(tasks_list.default.is_default) self.assertListEqual(['foo', 'bar'], tasks_list.default.aliases) self.assertEqual('name', tasks_list.task.name) self.assertListEqual(['alias'], tasks_list.task.aliases) for task in tasks_list: self.assertListEqual(roles, task.roles) self.assertListEqual(hosts, task.hosts) docstring, new_style, classic, default = load_tasks_from_module(tasks_list) self.assertIsNone(docstring) self.assertIn('default', new_style) self.assertIn('alias', new_style) self.assertIn('foo', new_style) self.assertIn('bar', new_style) self.assertIn('name', new_style) self.assertDictEqual({}, classic) self.assertIs(tasks_list.default, default) self.assertIn('serial', tasks_list.serial.wrapped.__dict__) self.assertTrue(tasks_list.serial.wrapped.serial) self.assertIn('parallel', tasks_list.parallel.wrapped.__dict__) self.assertTrue(tasks_list.parallel.wrapped.parallel)
def test_commands_list(self): cases = dict( default=dict( init_kwargs=dict(service='service'), expected_commands_list=['rollback'], unexpected_commands_list=['revert', 'migrate', 'migrate-back', 'backup', 'restore', 'pull', 'update', 'prepare', 'push', 'upgrade'], ), prepare_tasks=dict( init_kwargs=dict(service='service', registry='registry'), expected_commands_list=['rollback', 'prepare', 'push', 'upgrade'], unexpected_commands_list=['backup', 'restore', 'migrate', 'migrate-back', 'pull', 'update', 'revert'], ), migrate_tasks=dict( init_kwargs=dict(service='service', migrate_commands=True), expected_commands_list=['rollback', 'migrate', 'migrate-back'], unexpected_commands_list=['backup', 'restore', 'prepare', 'push', 'pull', 'update', 'revert', 'upgrade'], ), backup_tasks=dict( init_kwargs=dict(service='service', backup_commands=True), expected_commands_list=['rollback', 'backup', 'restore'], unexpected_commands_list=['migrate', 'migrate-back', 'prepare', 'push', 'pull', 'update', 'revert', 'upgrade'], ), revert_task=dict( init_kwargs=dict(service='service', revert_command=True), expected_commands_list=['rollback', 'revert'], unexpected_commands_list=['migrate', 'migrate-back', 'prepare', 'push', 'pull', 'update', 'backup', 'restore', 'upgrade'], ), pull_task=dict( init_kwargs=dict(service='service', pull_command=True), expected_commands_list=['rollback', 'pull'], unexpected_commands_list=['migrate', 'migrate-back', 'prepare', 'push', 'revert', 'update', 'backup', 'restore', 'upgrade'], ), update_task=dict( init_kwargs=dict(service='service', update_command=True), expected_commands_list=['rollback', 'update'], unexpected_commands_list=['migrate', 'migrate-back', 'prepare', 'push', 'pull', 'revert', 'backup', 'restore', 'upgrade'], ), complex=dict( init_kwargs=dict(service='service', backup_commands=True, migrate_commands=True, registry='registry', revert_command=True, update_command=True, pull_command=True), expected_commands_list=['pull', 'rollback', 'update', 'backup', 'restore', 'migrate', 'migrate-back', 'prepare', 'push', 'revert', 'upgrade'], unexpected_commands_list=[], ), all_tasks=dict( init_kwargs=dict(service='service', backup_commands=True, migrate_commands=True, registry='registry', revert_command=True, update_command=True, pull_command=True), expected_commands_list=['pull', 'rollback', 'update', 'backup', 'restore', 'migrate', 'migrate-back', 'prepare', 'push', 'revert', 'upgrade', 'deploy'], unexpected_commands_list=[], env=dict(tasks='task'), ), ) for case, data in cases.items(): with self.subTest(case=case): with mock.patch.dict(fab.env, data.get('env', {})): tasks_list = tasks.DockerTasks(**data['init_kwargs']) docstring, new_style, classic, default = load_tasks_from_module(tasks_list) for expected_command in data['expected_commands_list']: self.assertIn(expected_command, new_style) for unexpected_command in data['unexpected_commands_list']: self.assertNotIn(unexpected_command, new_style)