我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用ansible.module_utils.basic.get_exception()。
def receive(self, cmd=None): recv = StringIO() handled = False while True: data = self.shell.recv(200) recv.write(data) recv.seek(recv.tell() - 200) window = self.strip(recv.read()) if hasattr(cmd, 'prompt') and not handled: handled = self.handle_prompt(window, cmd) try: if self.find_prompt(window): resp = self.strip(recv.getvalue()) return self.sanitize(cmd, resp) except ShellError: exc = get_exception() exc.command = cmd raise
def send(self, commands): responses = list() try: for command in to_list(commands): signal.alarm(self._timeout) self._history.append(str(command)) cmd = '%s\r' % str(command) self.shell.sendall(cmd) if self._timeout == 0: return responses.append(self.receive(command)) except socket.timeout: raise ShellError("timeout trying to send command: %s" % cmd) except socket.error: exc = get_exception() raise ShellError("problem sending command to host: %s" % to_native(exc)) return responses
def get_instance(module): try: resp = module.cli('show management api http-commands', 'json') return dict( http=resp[0]['httpServer']['configured'], http_port=resp[0]['httpServer']['port'], https=resp[0]['httpsServer']['configured'], https_port=resp[0]['httpsServer']['port'], local_http=resp[0]['localHttpServer']['configured'], local_http_port=resp[0]['localHttpServer']['port'], socket=resp[0]['unixSocketServer']['configured'], vrf=resp[0]['vrf'] ) except NetworkError: exc = get_exception() module.fail_json(msg=str(exc), **exc.kwargs)
def register_ip_to_tag_map(device, ip_addresses, tag): """ :param device: :param ip_addresses: :param tag: :return: """ exc = None try: device.userid.register(ip_addresses, tag) except Exception, e: exc = get_exception() if exc: return (False, exc) else: return (True, exc)
def get_all_address_group_mapping(device): """ Retrieve all the tag to IP address mappings :param device: :return: """ exc = None ret = None try: ret = device.userid.get_registered_ip() except Exception, e: exc = get_exception() if exc: return (False, exc) else: return (ret, exc)
def delete_address_from_mapping(device, ip_address, tags): """ Delete an IP address from a tag mapping. :param device: :param ip_address: :param tags: :return: """ exc = None try: ret = device.userid.unregister(ip_address, tags) except Exception, e: exc = get_exception() if exc: return (False, exc) else: return (True, exc)
def get_all_address_group(device): """ Retrieve all the tag to IP address mappings :param device: :return: """ exc = None try: ret = objects.AddressGroup.refreshall(device) except Exception, e: exc = get_exception() if exc: return (False, exc) else: l = [] for item in ret: l.append(item.name) s = ",".join(l) return (s, exc)
def delete_address_group(device, dev_group, obj_name): """ :param device: :param dev_group: :param obj_name: :return: """ static_obj = find_object(device, dev_group, obj_name, objects.AddressGroup) # If found, delete it if static_obj: try: static_obj.delete() except Exception: exc = get_exception() return False, exc return True, None else: return False, None
def connect(self, params, kickstart=True): host = params['host'] port = params.get('port') or 22 username = params['username'] password = params.get('password') key_file = params.get('ssh_keyfile') timeout = params['timeout'] try: self.shell = Shell( kickstart=kickstart, prompts_re=self.CLI_PROMPTS_RE, errors_re=self.CLI_ERRORS_RE, timeout=timeout ) self.shell.open(host, port=port, username=username, password=password, key_filename=key_file) except ShellError: exc = get_exception() raise NetworkError(msg='failed to connect to %s:%s' % (host, port), exc=to_native(exc)) self._connected = True
def execute(self, commands): try: return self.shell.send(commands) except ShellError: exc = get_exception() commands = [str(c) for c in commands] raise NetworkError(to_native(exc), commands=commands)
def send(self, commands): try: return self.shell.send(commands) except ShellError: e = get_exception() self.module.fail_json(msg=e.message, commands=commands)
def main(): """ main entry point for module execution """ argument_spec = dict( src=dict(type='path'), lines=dict(aliases=['commands'], type='list'), parents=dict(type='list'), match=dict(default='line', choices=['line', 'none']), config=dict(), defaults=dict(type='bool', default=False, aliases=['detail']), backup=dict(type='bool', default=False), save=dict(type='bool', default=False), ) mutually_exclusive = [('lines', 'src')] module = NetworkModule(argument_spec=argument_spec, connect_on_load=False, mutually_exclusive=mutually_exclusive, supports_check_mode=True) result = dict(changed=False, warnings=list()) if module.params['backup']: result['__backup__'] = module.config.get_config() try: run(module, result) except NetworkError: exc = get_exception() module.fail_json(msg=str(exc), **exc.kwargs) module.exit_json(**result)
def main(): """ main entry point for module execution """ argument_spec = dict( rollback_location=dict(), local_max_checkpoints=dict(type='int'), remote_max_checkpoints=dict(type='int'), rescue_location=dict(), state=dict(default='present', choices=['present', 'absent']) ) module = NetworkModule(argument_spec=argument_spec, connect_on_load=False, supports_check_mode=True) state = module.params['state'] result = dict(changed=False) commands = list() invoke(state, module, commands) try: load_config(module, commands, result) except NetworkError: exc = get_exception() module.fail_json(msg=str(exc), **exc.kwargs) module.exit_json(**result)
def main(): """main entry point for module execution """ argument_spec = dict( netconf_port=dict(type='int', default=830, aliases=['listens_on']), state=dict(default='present', choices=['present', 'absent']), transport=dict(default='cli', choices=['cli']) ) module = NetworkModule(argument_spec=argument_spec, supports_check_mode=True) state = module.params['state'] port = module.params['netconf_port'] result = dict(changed=False) instance = get_instance(module) if state == 'present' and instance.get('state') == 'absent': commands = 'set system services netconf ssh port %s' % port elif state == 'present' and port != instance.get('port'): commands = 'set system services netconf ssh port %s' % port elif state == 'absent' and instance.get('state') == 'present': commands = 'delete system services netconf' else: commands = None if commands: if not module.check_mode: try: comment = 'configuration updated by junos_netconf' module.config(commands, comment=comment) except NetworkError: exc = get_exception() module.fail_json(msg=str(exc), **exc.kwargs) result['changed'] = True result['commands'] = commands module.exit_json(**result)
def main(): argument_spec = dict( nv_overlay_evpn=dict(required=True, type='bool'), include_defaults=dict(default=True), config=dict(), save=dict(type='bool', default=False) ) module = get_network_module(argument_spec=argument_spec, supports_check_mode=True) existing = invoke('get_existing', module) end_state = existing proposed = dict(nv_overlay_evpn=module.params['nv_overlay_evpn']) result = {} candidate = CustomNetworkConfig(indent=3) invoke('get_commands', module, existing, proposed, candidate) if proposed != existing: try: response = load_config(module, candidate) result.update(response) except ShellError: exc = get_exception() module.fail_json(msg=str(exc)) else: result['updates'] = [] result['connected'] = module.connected if module._verbosity > 0: end_state = invoke('get_existing', module) result['end_state'] = end_state result['existing'] = existing result['proposed'] = proposed module.exit_json(**result)
def execute_config_command(commands, module): try: module.configure(commands) except ShellError: clie = get_exception() module.fail_json(msg='Error sending CLI commands', error=str(clie), commands=commands) except AttributeError: try: module.config.load_config(commands) except NetworkError: clie = get_exception() module.fail_json(msg='Error sending CLI commands', error=str(clie), commands=commands)
def execute_show(cmds, module, command_type=None): command_type_map = { 'cli_show': 'json', 'cli_show_ascii': 'text' } try: if command_type: response = module.execute(cmds, command_type=command_type) else: response = module.execute(cmds) except ShellError: clie = get_exception() module.fail_json(msg='Error sending {0}'.format(cmds), error=str(clie)) except AttributeError: try: if command_type: command_type = command_type_map.get(command_type) module.cli.add_commands(cmds, output=command_type) response = module.cli.run_commands() else: module.cli.add_commands(cmds, raw=True) response = module.cli.run_commands() except NetworkError: clie = get_exception() module.fail_json(msg='Error sending {0}'.format(cmds), error=str(clie)) return response
def load_checkpoint(module, result): try: checkpoint = result['__checkpoint__'] module.cli(['rollback running-config checkpoint %s' % checkpoint, 'no checkpoint %s' % checkpoint], output='text') except KeyError: module.fail_json(msg='unable to rollback, checkpoint not found') except NetworkError: exc = get_exception() msg = 'unable to rollback configuration' module.fail_json(msg=msg, checkpoint=checkpoint, **exc.kwargs)
def execute_show(cmds, module, command_type=None): command_type_map = { 'cli_show': 'json', 'cli_show_ascii': 'text' } try: if command_type: response = module.execute(cmds, command_type=command_type) else: response = module.execute(cmds) except ShellError: clie = get_exception() module.fail_json(msg='Error sending {0}'.format(cmds), error=str(clie)) except AttributeError: try: if command_type: command_type = command_type_map.get(command_type) module.cli.add_commands(cmds, output=command_type) response = module.cli.run_commands() else: module.cli.add_commands(cmds, output=command_type) response = module.cli.run_commands() except NetworkError: clie = get_exception() module.fail_json(msg='Error sending {0}'.format(cmds), error=str(clie)) return response
def execute_commands(cmds, module, command_type=None): command_type_map = { 'cli_show': 'json', 'cli_show_ascii': 'text' } try: if command_type: response = module.execute(cmds, command_type=command_type) else: response = module.execute(cmds) except ShellError: clie = get_exception() module.fail_json(msg='Error sending {0}'.format(cmds), error=str(clie)) except AttributeError: try: if command_type: command_type = command_type_map.get(command_type) module.cli.add_commands(cmds, output=command_type) response = module.cli.run_commands() else: module.cli.add_commands(cmds, raw=True) response = module.cli.run_commands() except NetworkError: clie = get_exception() module.fail_json(msg='Error sending {0}'.format(cmds), error=str(clie)) return response