我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用colorama.Fore.YELLOW。
def ex_print (type, msg, ret): colorama.init() # Define color and style constants c_error = Fore.RED c_action = Fore.YELLOW c_ok = Fore.GREEN c_white = Fore.WHITE s_br = Style.BRIGHT s_reset = Style.RESET_ALL message = { "error": c_error + s_br, "action": c_action, "positive": c_ok + s_br, "info": c_white + s_br, "reset": s_reset } style = message.get(type, s_reset) if ret == 0: print(style + msg, end = "") else: print(style + msg)
def banner(): print(Style.DIM) print(' ___________________________') print(' / /\\') print(' / sadboyzvone\'s _/ /\\') print(' / Intel 8080 / \/') print(' / Assembler /\\') print('/___________________________/ /') print('\___________________________\/') print(' \\ \\ \\ \\ \\ \\ \\ \\ \\ \\ \\ \\ \\ \\ \\' + Style.RESET_ALL + Style.BRIGHT) print(Fore.WHITE + '\nPowered by ' + Fore.BLUE + 'Pyt' + Fore.YELLOW + 'hon' + Fore.WHITE + '\nCopyright (C) 2017, Zvonimir Rudinski') # Print usage information
def instant_giveaway(prize_name): global won_giveaways, lost_giveaways giveaway_box = chromedriver.find_element_by_id(instant_box) giveaway_box.click() time.sleep(10) get_result = chromedriver.find_element_by_id('title') time.sleep(5) if "you didn't win" in get_result.text: lost_giveaways += 1 print(Fore.YELLOW + Style.BRIGHT + '\n **** You did not win: %s' % prize_name) time.sleep(5) elif "you're a winner!" in get_result.text: won_giveaways += 1 print(Fore.GREEN + Style.BRIGHT + '\n **** Winner Winner! Chicken Dinner!: %s' % prize_name) time.sleep(1) playsound('.\sounds\\btyswt.mp3') time.sleep(5) else: print(Fore.RED + Style.BRIGHT + '\n ---- UNRECOGNIZED RESPONSE FOR: %s' % prize_name) # function to process the 'None' requirement giveaways.
def _update_ui(): global timeout_count, result_codes, connection_error_count print '\r', for k, v in result_codes.iteritems(): print "%s:" % k, if k == '200 OK': print(Fore.LIGHTGREEN_EX), else: print(Fore.RED), print "%s " % v, print(Style.RESET_ALL), if timeout_count > 0: print('Timeouts: '+Fore.YELLOW + str(timeout_count) + Style.RESET_ALL) + ' ', if connection_error_count >0: print('Connection Errors: '+Fore.RED + str(connection_error_count) + Style.RESET_ALL), sys.stdout.flush()
def command(self, command, value=None): func_str = 'GoProHero.command({}, {})'.format(command, value) if command in self.commandMaxtrix: args = self.commandMaxtrix[command] # accept both None and '' for commands without a value if value == '': value = None # for commands with values, translate the value if value is not None and value in args['translate']: value = args['translate'][value] # build the final url url = self._commandURL(args['cmd'], value) # attempt to contact the camera try: urlopen(url, timeout=self.timeout).read() logging.info('{} - http success!'.format(func_str)) return True except (HTTPError, URLError, socket.timeout) as e: logging.warning('{}{} - error opening {}: {}{}'.format( Fore.YELLOW, func_str, url, e, Fore.RESET)) # catchall return statement return False
def defrag(self): download_queue = self.handle.get_download_queue() downloading = [piece['piece_index'] for piece in download_queue] numerales = "" pieces = self.status.pieces for i, piece in enumerate(pieces): numeral = Fore.GREEN + "#" if piece else Fore.RED + "#" if i in downloading: numeral = Fore.YELLOW + "v" elif self._served_blocks is not None and self._served_blocks[i]: numeral = Fore.BLUE + ">" numeral += str(self.handle.piece_priority(i)) numerales += numeral if numerales != "": numerales = term.magenta("\nPieces download state:\n" + numerales) return "%s\n" % numerales
def getConfig(): try: with open('%s/visionarypm.conf' % path) as f: config = json.loads(f.read().strip()) if config['oLen'] < 16 or config['oLen'] > 64 or config['cost'] < 10 or config['cost'] > 20 or config['nwords'] > 16 or config['nwords'] < 4: exit('Invalid config! Please delete the configuration file (%s) and a new one will be generated on the next run.' % (path + '/visionarypm.conf')) return config, 1 except IOError: config = get_defaults() autosave = safe_input('Do you want to save this config? (Y/n) ').lower() if autosave == 'yes' or autosave == 'y' or autosave == '': print('\nAutosaving configuration...') try: with open('%s/visionarypm.conf' % path, 'a') as f: f.write(json.dumps(config)) return config, 1 except: print(color('Autosaving failed! [Permission denied]\n', Fore.RED)) print('In order to save these settings, place %s' % color(json.dumps(config), Fore.YELLOW)) print('in %s' % (color('%s/visionarypm.conf' % path, Fore.YELLOW))) return config, 0 except (KeyError, json.decoder.JSONDecodeError): exit('Invalid config! Please delete the configuration file (%s) and a new one will be generated on the next run.' % (path + '/visionarypm.conf'))
def get_scanning_result(self, iface): indent = ' ' * 1 flagExec = self.exec_iwlist(iface) if flagExec: header = indent + 'Scanning WiFi networks using interface \'' + iface + '\'\n' network_table = [['SSID', 'AP Address', 'Channel', 'Encryption', 'Quality']] for dict_network in self.parsed_cells: network_table.append([ dict_network['Name'], dict_network['Address'], dict_network['Channel'], dict_network['Encryption'], dict_network['Quality'] ]) table = AsciiTable(network_table) print (Fore.YELLOW + Style.DIM + header + table.table) return True else: return False
def print_ifaces_wireless_table(self): indent = ' ' * 1 header = indent + 'Wireless interfaces information:\n' self.wireless_table = [['Interface', 'Status', 'IP Address', 'Mask', 'Mode', 'SSID', 'AP Address', 'Wireless Type']] for data in self.ifaces_wireless: self.wireless_table.append([ data, self.dict_status[data], self.dict_ipaddr[data], self.dict_mask[data], self.dict_mode[data], self.dict_ssid[data], self.dict_apaddr[data], self.dict_wname[data] ]) table = AsciiTable(self.wireless_table) print (Fore.YELLOW + Style.DIM + header + table.table)
def log(self, prefix, text, line=False): now = datetime.now() message = "" if prefix == '?': c = Fore.CYAN elif prefix == '+': c = Fore.GREEN elif prefix == '-': c = Fore.RED elif prefix == '!': c = Fore.YELLOW c = Style.BRIGHT + c e = Style.RESET_ALL + Fore.RESET if line: print c+"["+now.strftime("%Y-%m-%d %H:%M:%S")+"]["+prefix+"] "+text+e else : print "["+now.strftime("%Y-%m-%d %H:%M:%S")+"]["+c+prefix+e+"] "+text
def log(prefix, text, line=False): now = datetime.now() message = "" if prefix == '?': c = Fore.CYAN elif prefix == '+': c = Fore.GREEN elif prefix == '-': c = Fore.RED elif prefix == '!': c = Fore.YELLOW c = Style.BRIGHT + c e = Style.RESET_ALL + Fore.RESET if line: print c+"["+now.strftime("%Y-%m-%d %H:%M")+"]["+prefix+"] "+text+e else : print "["+now.strftime("%Y-%m-%d %H:%M")+"]["+c+prefix+e+"] "+text
def require(package, requirement=None): requirement = requirement or package try: return __import__(package) except ImportError: from colorama import Fore, Style print( Fore.YELLOW, 'This example requires the {!r} package. Install it using:'. format(requirement), Style.RESET_ALL, sep='' ) print() print( Fore.YELLOW, ' $ pip install {!s}'.format(requirement), Style.RESET_ALL, sep='' ) print() raise
def setup_method(self, test_method): init() self.stack_status_colourer = StackStatusColourer() self.statuses = { "CREATE_COMPLETE": Fore.GREEN, "CREATE_FAILED": Fore.RED, "CREATE_IN_PROGRESS": Fore.YELLOW, "DELETE_COMPLETE": Fore.GREEN, "DELETE_FAILED": Fore.RED, "DELETE_IN_PROGRESS": Fore.YELLOW, "ROLLBACK_COMPLETE": Fore.RED, "ROLLBACK_FAILED": Fore.RED, "ROLLBACK_IN_PROGRESS": Fore.YELLOW, "UPDATE_COMPLETE": Fore.GREEN, "UPDATE_COMPLETE_CLEANUP_IN_PROGRESS": Fore.YELLOW, "UPDATE_FAILED": Fore.RED, "UPDATE_IN_PROGRESS": Fore.YELLOW, "UPDATE_ROLLBACK_COMPLETE": Fore.GREEN, "UPDATE_ROLLBACK_COMPLETE_CLEANUP_IN_PROGRESS": Fore.YELLOW, "UPDATE_ROLLBACK_FAILED": Fore.RED, "UPDATE_ROLLBACK_IN_PROGRESS": Fore.YELLOW }
def process_diagram(caption, body, src_dir, diag_dir_name, diag_id): """Save diagram body to .diag file, draw PNG from it with seqdiag, and return the image ref.""" diag_src_path = join(src_dir, diag_dir_name, SUBDIR_NAME, "%s.diag" % diag_id) makedirs(join(src_dir, diag_dir_name, SUBDIR_NAME), exist_ok=True) with open(diag_src_path, 'w', encoding="utf8") as diag_src: diag_src.write(body) try: subprocess.check_output( "%s %s" % (SEQDIAG_COMMAND, diag_src_path), stderr=subprocess.PIPE, shell=True ) except subprocess.CalledProcessError as exception: print( Fore.YELLOW + "\nWarning: Processing of diagram %s failed: %s" % (diag_src_path, exception) ) return "![%s](%s/%s/%s.png)" % (caption, diag_dir_name, SUBDIR_NAME, diag_id)
def process_diagram(caption, body, src_dir, diag_dir_name, diag_id): """Save diagram body to .diag file, draw PNG from it with plantuml, and return the image ref.""" diag_src_path = join(src_dir, diag_dir_name, SUBDIR_NAME, "%s.diag" % diag_id) with open(diag_src_path, 'w', encoding="utf8") as diag_src: diag_src.write(body) try: subprocess.check_output( "%s %s" % (PLANTUML_COMMAND, diag_src_path), stderr=subprocess.PIPE, shell=True ) except subprocess.CalledProcessError as exception: print( Fore.YELLOW + "\nWarning: Processing of diagram %s failed: %s" % (diag_src_path, exception) ) return "![%s](%s/%s/%s.png)" % (caption, diag_dir_name, SUBDIR_NAME, diag_id)
def earthquake_print(data): for i in data['features']: if i['id'] not in UPDATES: UPDATES.append(i['id']) print('___________________________________________________________________________________________________') print(i['properties']['title'], end=' MAG: ') print(Fore.RED+str(i['properties']['mag'])) print('Location:', i['properties']['place']) alert = i['properties']['alert'] if alert == 'green': print(Fore.RED + 'Type: ' + i['properties']['type'], end=' ') elif alert == 'yellow': print(Fore.YELLOW + 'Type: ' + i['properties']['type'], end=' ') else: print(Fore.BLUE + 'Type: ' + i['properties']['type'], end=' ') ms = i['properties']['time'] print(datetime.datetime.fromtimestamp(ms/1000.0)) print('INFO:', i['properties']['url'])
def clean_up_partial(self): ripper = self.ripper if ripper.audio_file is not None and path_exists(ripper.audio_file): print(Fore.YELLOW + "Deleting partially ripped file" + Fore.RESET) rm_file(ripper.audio_file) # check for any extra pcm or wav files def delete_extra_file(ext): audio_file = change_file_extension(ripper.audio_file, ext) if path_exists(audio_file): rm_file(audio_file) if self.args.plus_wav: delete_extra_file("wav") if self.args.plus_pcm: delete_extra_file("pcm")
def ban(ctx, *, member : discord.Member = None): if not ctx.message.author.server_permissions.ban_members: return if not member: embed = discord.Embed(description = ctx.message.author.mention + ", you did not specify a user to ban! :x:", color = 0xF00000) return await client.say(embed = embed) try: await client.ban(member) except Exception as e: if 'Privilege is too low' in str(e): embed = discord.Embed(description = "Privilege is too low. :x:", color = 0xF00000) return await client.say(embed = embed) print(Fore.RED + "Command Failed To Execute |\n Command Ran In:[" + ctx.message.server.id + "]\n User:[" + ctx.message.author.id + "]\n Channel:[" + ctx.message.channel.id + "]\n Reason: " + Fore.YELLOW + "Insufficient Permissions! Both user and bot need Ban Members permission!") embed = discord.Embed(description = "**%s** has been banned."%member.name, color = 0xF00000) return await client.say(embed = embed) print(Fore.CYAN + "Command Successfully Executed |\n Command Ran In:[" + ctx.message.server.id + "]\n User:[" + ctx.message.author.id + "]\n Channel:[" + ctx.message.channel.id + "]")
def kick(ctx, *, member : discord.Member = None): if not ctx.message.author.server_permissions.kick_members: return if not member: return await client.say(ctx.message.author.mention + "Specify a user to kick!") try: await client.kick(member) except Exception as e: if 'Privilege is too low' in str(e): embed = discord.Embed(description = "Privilege is too low. :x:", color = 0xF00000) return await client.say(embed = embed) print(Fore.RED + "Command Failed To Execute |\n Command Ran In:[" + ctx.message.server.id + "]\n User:[" + ctx.message.author.id + "]\n Channel:[" + ctx.message.channel.id + "]\n Reason: " + Fore.YELLOW + "Inusfficient Permissions! Both user and bot need Kick Members permission!") embed = discord.Embed(description = "**%s** has been kicked."%member.name, color = 0xF00000) return await client.say(embed = embed) print(Fore.CYAN + "Command Successfully Executed |\n Command Ran In:[" + ctx.message.server.id + "]\n User:[" + ctx.message.author.id + "]\n Channel:[" + ctx.message.channel.id + "]")
def menu(): # Using colour from colorama: https://pypi.python.org/pypi/colorama # Formatting e.g.: Fore.COLOUR, Back.COLOUR, Style.DIM with e.g. DIM, RED, CYAN, etc.: print(Fore.BLACK + Back.WHITE + "10cbazbt3 menu:" + Style.RESET_ALL) print(Fore.YELLOW + Style.DIM + "Main:") print(" b = Blurb m = Mentions") print(" r = Reply t = Timeline") print(" blog = BLOG o = Own blurbs") print(" pins = PINS") print("Admin:") print(" Login = Login menu = show Menu") print(" Logout = Logout. sites = my Sites") print(" exit = Exit") print(Style.RESET_ALL) # DEFINE 10C POST INTERACTIONS: # LOTS OF DUPLICATION HERE! # Define the 'blurb' (social post) subroutine:
def blurb(): # Input some text: posttext = input(Fore.YELLOW + Style.DIM + "Write some text: " + Style.RESET_ALL) # Saves the input text to 'posttext.txt': file = open("posttext.txt", "w") file.write(posttext) file.close() # Uses the global header & creates the data to be passed to the url: url = 'https://api.10centuries.org/content' data = {'content': posttext} response = requests.post(url, headers=headers, data=data) # Displays the server's response: responsestatus = response.status_code showapiresponse(responsestatus) # Define the 'post' (blog post) subroutine:
def post(): # Input blog post data: posttitle = input(Fore.YELLOW + Style.DIM + "Write a blog post title: " + Style.RESET_ALL) print(Fore.YELLOW + Style.DIM + "Write a blog post:") print("(Press [ctrl-d] to 'save' when you finish writing.)" + Style.RESET_ALL) posttext = sys.stdin.read() # Adds a post date & time, currently set as 'now': postdatetime = strftime("%Y-%m-%d %H:%M:%S") # Uses the global header & creates the data to be passed to the url: url = 'https://api.10centuries.org/content' # IMPORTANT: @bazbt3's channel_id = 6. SUBSTITUTE WITH YOUR CHANNEL_ID in global definitions! data = {'title': posttitle, 'content': posttext, 'channel_id': channelid, 'send_blurb': 'Y', 'pubdts': postdatetime} response = requests.post(url, headers=headers, data=data) # Displays the server's response: responsestatus = response.status_code showapiresponse(responsestatus) # Define the 'reply' subroutine:
def reply(): # INEFFICIENT: NEED TO MERGE MOST OF THE CODE FROM THIS AND THE REPLYINLINE SUBROUTINE: # Input a reply-to post number: replytoid = input(Fore.YELLOW + Style.DIM + "Post number to reply to: " + Style.RESET_ALL) # Input some text: posttext = input(Fore.YELLOW + Style.DIM + "Write some text (add usernames!): " + Style.RESET_ALL) # Saves the input text to 'posttext.txt': file = open("posttext.txt", "w") file.write(posttext) file.close() # Uses the global header & creates the data to be passed to the url: url = 'https://api.10centuries.org/content' data = {'reply_to': replytoid, 'content': posttext} response = requests.post(url, headers=headers, data=data) # Displays the server's response: responsestatus = response.status_code showapiresponse(responsestatus) # Define the 'replyinline' subroutine: # INEFFICIENT: SEE THE REPLY SUBROUTINE:
def replyinline(postidreply, poster): # Use the to-be-replied-to post id: replytoid = postidreply replytoposter = poster # Input some text: posttext = input(Fore.YELLOW + Style.DIM + "Write some text: " + Style.RESET_ALL) # Add '@', username to posttext: posttext = ("@" + replytoposter + " " + posttext) # Saves the input text to 'posttext.txt': file = open("posttext.txt", "w") file.write(posttext) file.close() # Uses the global header & creates the data to be passed to the url: url = 'https://api.10centuries.org/content' data = {'reply_to': replytoid, 'content': posttext} response = requests.post(url, headers=headers, data=data) # Displays the server's response: responsestatus = response.status_code showapiresponse(responsestatus) # Define the 'repostinline' subroutine:
def export_data_search_db(export, export_file, export_status, title): if len(export) > 0: if export_file == "": user_input = raw_input("operative (export file name ?) > ") if os.path.exists("export/"+user_input): export_file = "export/"+user_input elif os.path.exists(user_input): export_file = user_input else: print Fore.GREEN + "Writing " + user_input + " file" + Style.RESET_ALL export_file = "export/"+user_input export_data(export, export_file, export_status, title) elif export_status == False: file_open = open(export_file,"a+") file_open.write(title) for line in export: file_open.write("- " + line +"\n") print Fore.GREEN + "File writed : " + export_file + Style.RESET_ALL file_open.close() export_status = True else: print Back.YELLOW + Fore.BLACK + "Module empty result" + Style.RESET_ALL
def main(self): api_url = 'https://hacked-emails.com/api?q=' email = self.get_options('email') if email != "": if "@" in email and "." in email: complet_url = api_url + str(email) req = requests.get(complet_url) content = req.text if content != "": content = json.loads(content) if content['status'] and content['status'] == "found": print "Result found (" + Fore.GREEN + str(content['results']) + " results" + Style.RESET_ALL + ")" for line in content['data']: try: print Fore.BLUE + " * " + Style.RESET_ALL + " found in : " + Fore.GREEN + str(line['title']) + Style.RESET_ALL + \ " (" + Fore.YELLOW + str(line['date_leaked']) + Style.RESET_ALL + ")" self.export.append(line['title']) except: print Fore.BLUE + " * " + Style.RESET_ALL + " found in : ( can't parse leaks title)" else: print "Status (" + Fore.RED + "Not found" + Style.RESET_ALL + ")" else: print Fore.RED + "Error found in json" + Style.RESET_ALL + ")" else: print Fore.YELLOW + "Invalid email please retry with correct email address" + Style.RESET_ALL
def load_db(): global total_dbs count = 1 if not os.path.isdir("core/dbs/"): print Fore.RED + "core/dbs/ folder not found" + Style.RESET_ALL return False else: file_dbs = glob.glob("core/dbs/*.sql") if len(file_dbs) < 1: print Fore.YELLOW + "core/dbs/ No dbs found" + Style.RESET_ALL return False else: file_nb = len(file_dbs) print "Load "+str(file_nb)+" databases..." for line in file_dbs: if line not in total_dbs: print "Load database : "+Fore.GREEN + line + Style.RESET_ALL total_dbs.append(line) else: print "Already loaded : "+Fore.YELLOW + line + Style.RESET_ALL
def browser_hacks(): print Fore.YELLOW + " ! For use module please use :use moduleName" + Style.RESET_ALL if os.path.exists("core/BHDB/"): list_module = glob.glob("core/BHDB/*.py") for module in list_module: if ".py" in module: module_name = module.split(".py")[0] module_name = module_name.replace('core/BHDB/','') if "__init__" not in module: description = "No module description found" if "#description:" in open(module).read(): description = open(module).read().split("#description:")[1] description = description.split("#")[0] print Fore.BLUE + " * "+ Style.RESET_ALL + module_name + " " + description else: print Back.RED + Fore.BLACK + "Browserhacking directory not found" + Style.RESET_ALL
def print_yellow(string): if windows_client(): reinit() print (Fore.YELLOW + Style.BRIGHT + string + Style.RESET_ALL) if windows_client(): deinit()
def print_info(lines): """Prints informations messages, enhanced graphical aspects.""" print Fore.YELLOW + "------------------------- Informations -------------------------\r" for line in lines: print Fore.YELLOW + "| " + line + '\r' print Fore.YELLOW + "------------------------------------------------------------------\r" + Fore.WHITE
def input_judgment(message): print(Fore.YELLOW + message, end = '(y/n): ') choice = input().lower() if choice == 'y': return True else: return False
def quick_deploy_introduction(): user_message = Fore.YELLOW + "# Quick Deploy Script v1.0" print(user_message)
def printHelp(): print('\nThis ' + Fore.BLUE + 'Intel' + Fore.WHITE + ' 8080 assembler was made for ' + Fore.BLUE + 'Project ' + Fore.YELLOW + 'Week' + Fore.WHITE + ' at my school.') print('It is written in ' + Fore.BLUE + 'Pyt' + Fore.YELLOW + 'hon' + Fore.WHITE) print('Modules: ' + Fore.RED + 'Co' + Fore.BLUE + 'lo' + Fore.YELLOW + 'ra' + Fore.GREEN + 'ma' + Fore.WHITE) print('\nPass a file path as an arguement.') # Main function
def print_to_stdout(level, str_out): """ The default debug function """ if level == NOTICE: col = Fore.GREEN elif level == WARNING: col = Fore.RED else: col = Fore.YELLOW if not is_py3: str_out = str_out.encode(encoding, 'replace') print(col + str_out + Fore.RESET) # debug_function = print_to_stdout
def m_warn(self, m): m = '[!] ' + m if COLORAMA: print Fore.YELLOW + m else: print m
def copyright(): copyright = """ =[ {0}smali-code-injector v1.1-dev{1} ] + -- --=[ Alexandre Teyar @Ares ] + -- --=[ Pentester at Ambersail Ltd. ] + -- --=[ GitHub: https://github.com/AresS31 ] """.format(Fore.YELLOW, Fore.RESET) print("{}".format(copyright))
def print_warnings(wn): """ Args: wn (int): Counts of warnings Returns: str: Colorized information by using colorama """ if wn > 0: return Fore.YELLOW + str(wn) + Fore.RESET else: return Fore.GREEN + str(wn) + Fore.RESET
def important(string): print(Fore.YELLOW + string + Fore.RESET)
def reboot_loop(self): while True: self.ethers_manager.export_ethers() print(Fore.YELLOW + "Reboot index %d" % self.reboot_string) self.reboot_string += 1 gevent.sleep(self.reboot_delay)
def colorize(value, warning, error): return wrap(value, Fore.RED if value >= error else YELLOW if value >= warning else Fore.RESET)
def _color_from_status(self, status): mapping = { 'failed': Fore.RED, 'skipped': Fore.YELLOW, 'passed': Fore.GREEN, 'pipeline_error': Fore.RED } return mapping[status]
def _run_ssh_pexpect(cmd, password, using_bashc=False): """ Run a given command using pexpect. """ logger.debug(u'{}SSH Command: {}{}'.format(Style.DIM, cmd, Style.RESET_ALL)) if using_bashc: ssh_cli = pexpect.spawn('/bin/bash', ['-c', cmd]) else: ssh_cli = pexpect.spawn(cmd) i = ssh_cli.expect(['[Pp]assword: ', '\(yes/no\)\? ']) if i == 1: ssh_cli.sendline('yes') ssh_cli.expect('[Pp]assword: ') ssh_cli.sendline(password) time.sleep(1) ssh_cli.expect(['Connection to [0-9\.a-z]+ is closed.', pexpect.EOF, pexpect.TIMEOUT], timeout=5) # Expected behavior is to get pexpect.EOF or closed connection, but due to # a bug in docker we have to send an additional new line or Ctrl^C out = str(ssh_cli.before) + str(ssh_cli.after) logger.debug(u'Output:\n {}{}{}'.format(Fore.YELLOW, out, Style.RESET_ALL)) if ssh_cli.isalive(): ssh_cli.close() return out
def _run_scp_command(cmd, user, host, password): """ Emulate user command line interation using SCP protocol :param cmd: command to be executed :param user: remote host user :param host: remote host IP/hostname :param password: passwrod for remote user on host :returns None: """ logger.debug(u'{}Running SCP: {}{}'.format( Style.DIM, cmd, Style.RESET_ALL)) scp = pexpect.spawn(cmd) i = scp.expect(['\(yes/no\)\? ', '[Pp]assword: ']) if i == 0: scp.sendline('yes') scp.expect('[Pp]assword: ') scp.sendline(password) time.sleep(1) try: while True: i = scp.expect([pexpect.EOF, '[0-9][0-9]:[0-9][0-9] '], timeout=5) if i == 0: logger.debug(u'{}{}{}'.format(Fore.YELLOW, scp.before, Style.RESET_ALL)) break logger.debug(u'{}{}{}{}'.format(Fore.YELLOW, scp.before, scp.after, Style.RESET_ALL)) time.sleep(.1) except pexpect.TIMEOUT: # A docker bug expecting an extra new line in the end. Ideally we # will exit the loop getting pexpect.EOF, i.e. i==0 logger.debug(u'{}{}{}'.format(Fore.YELLOW, scp.before, Style.RESET_ALL)) finally: if scp.isalive(): scp.close()
def handle(self): self.server.connection_list.append(self.client_address[0]) try: # For TCP connections data = self.request.recv(1024).strip() except AttributeError: # For UDP connections data = self.request[0].strip() LOG.debug('{}Client: {}\ndata: {}{}'.format( Fore.YELLOW, self.client_address[0], data, Style.RESET_ALL))
def warn(message, detail=""): level = Fore.YELLOW + "[WARN] " + Fore.RESET print(prefix + level + Style.BRIGHT + message + Style.RESET_ALL + " " + detail + Style.RESET_ALL)
def _check_num_nodes(self): if self.num_nodes < 1: raise ValueError('Must provision at least one node.') elif self.num_nodes < 3: print(Fore.YELLOW + 'A Deis cluster must have 3 or more nodes, only continue if you adding to a current cluster.' + Fore.RESET) print(Fore.YELLOW + 'Continue? (y/n)' + Fore.RESET) accept = None while True: if accept == 'y': return elif accept == 'n': raise StandardError('User canceled provisioning') else: accept = self._get_user_input('--> ').strip().lower()
def log_warning(message): print(Fore.YELLOW + message + Fore.RESET)