我们从Python开源项目中,提取了以下36个代码示例,用于说明如何使用colorama.Fore.MAGENTA。
def main(self, s): # Trims input s to be just the city/region name s = s.replace('time ', '').replace('in ', '') # Transforms a city name into coordinates using Google Maps API loc = getLocation(s) # Gets current date and time using TimeZoneDB API send_url = ( "http://api.timezonedb.com/v2/get-time-zone?" "key=BFA6XBCZ8AL5&format=json" "&by=position&lat={:.6f}&lng={:.6f}".format(*loc) ) r = requests.get(send_url) j = json.loads(r.text) time = j['formatted'] self.dst = j['dst'] # Prints current date and time as YYYY-MM-DD HH:MM:SS print(Fore.MAGENTA + "The current date and time in " + str(s).title() + " is: " + str(time) + Fore.RESET)
def radio_play(s): global prev, song, voice, player if (len(s) > 4): song = yt[s] player = voice.create_ffmpeg_player(ytDir + song['file'], before_options='-hide_banner -loglevel panic', options='-b:a 64k -bufsize 64k') player.start() dprint(Fore.MAGENTA + 'Playing:' + Style.NORMAL + ' [YT] ' + song['artist'] + ' - ' + song['title']) else: song = songListByID[s] player = voice.create_ffmpeg_player(musicDir + song['file'], before_options='-hide_banner -loglevel panic', options='-b:a 64k -bufsize 64k') player.start() if (song['artist']): dprint(Fore.MAGENTA + 'Playing:' + Style.NORMAL + ' [' + song['id'] + '] ' + song['artist'] + ' - ' + song['title']) else: dprint(Fore.MAGENTA + 'Playing:' + Style.NORMAL + ' [' + song['id'] + '] ' + song['title']) await client.change_status(game=discord.Game(name=song['title'], url='', type=0), idle=False) prev.append(song['id']) if (len(prev) > 5): prev.remove(prev[0]) return player
def present(self): for package in self.list_: tag_str = " ".join( ("{}[{}]".format(k.color, k) for k in self.tags[package]) ) print("{}/{} {}".format( Fore.MAGENTA + package.name + Style.RESET_ALL, package.version, tag_str ))
def status(f): """ :param f: :return: """ if f == 0: print('[' + Fore.GREEN + 'Done' + Fore.RESET + ']') # Python 3: , flush=True) if f < 0: print('[' + Fore.RED + 'Error' + Fore.RESET + ']') # Python 3: , flush=True) if f > 0: print('[' + Fore.MAGENTA + '???' + Fore.RESET + ']') # Python 3: , flush=True) sys.stdout.flush() # Force flush in Python 2
def print_msg(msg=u'', color=Fore.MAGENTA): """:type msg: basestring""" with print_lock: click.echo(u'{}{}{}'.format(color, force_unicode(msg), Fore.RESET), color=True) sys.stdout.flush()
def get_pipeline_logs(multilog): """Gets logs generated by each pipeline. :param multilog: logger handler instance """ def _format_log(name, log): name, log = force_unicode(name), force_unicode(log) return center_text_message(name, color=Fore.MAGENTA) + '\n' + log entries = { name: _format_log(name, log) for name, log in multilog.grouped_by_thread.items() } try: url = os.environ['KD_PASTEBIN_URL'] user = os.environ['KD_PASTEBIN_USER'] password = os.environ['KD_PASTEBIN_PASS'] with PastebinClient(url, user, password) as c: urls = (u'{}: {}'.format(n, c.post(e)) for n, e in entries.items()) msg = '\n' + '\n'.join(urls) except Exception as e: # Fallback if pastebin isn't accessible msg = u'\n!!! Could not upload logs to pastebin, ' \ u'falling back to console. Reason:\n{}\n\n{}'.format( u''.join(traceback.format_exception(*sys.exc_info())), u'\n'.join(entries.values()) ) msg = center_text_message( 'PIPELINE DETAILED LOGS', fill_char='=', color=Fore.MAGENTA) + msg return msg.encode('utf-8')
def info(self, message): print(Fore.MAGENTA + threading.current_thread().name + ': ' + Fore.CYAN + message + Fore.RESET)
def success(self, message): print(Fore.MAGENTA + threading.current_thread().name + ': ' + Fore.GREEN + message + Fore.RESET)
def log_debug(message): print(Style.DIM + Fore.MAGENTA + message + Fore.RESET + Style.RESET_ALL)
def log(color, m_str, log_this=True): global log_file pid = str(os.getpid()) print(Fore.MAGENTA + "[*] [thread {}] {} {} {}".format(Fore.GREEN + pid, color, m_str, Style.RESET_ALL)) if len(log_file) > 0 and log_this: log_strs.append(m_str)
def differenciate_list(custom_list, real_list, l_type): for k,i in enumerate(custom_list[:]): if k >= len(real_list): log(Fore.MAGENTA, "Added {} '{}' to current domain".format(l_type, custom_list[k]), log_this=False)
def formatEntry(e:Entry): R = _S.RESET_ALL ID = R + _B.BLUE P = R + _F.LIGHTGREEN_EX TAG = R + _F.RED NAME = R + _B.RED Bd = R + _F.CYAN PATH = R + _F.YELLOW TITLE = R + _F.LIGHTMAGENTA_EX AUTHORS = R + _F.MAGENTA prefix = Bd + '| ' + R comment = ( s + '\n' + prefix for s in e.comment.split('\n') ) if e.comment != '' else () return ''.join( ( Bd, '--------------------------------------------------------------------------------', R, '\n', prefix, ID, 'ID : ', '{:>5}'.format(e.ID or ''), R, ' '*47, P, '{:>20}'.format(e.priority), R, '\n', prefix, NAME, e.name, R, '\n', prefix, PATH, e.pathstr(), R, '\n', *( ( prefix, TITLE, e.bibtex.get('title', ''), R, '\n', prefix, AUTHORS, e.bibtex.get('author', ''), R, '\n', ) if e.bibtex else ( prefix, TITLE, '<No Bibtex>', R, '\n') ), prefix, (R + ' ').join(''.join((TAG, '#', t)) for t in e.tags), '\n', prefix, R, *comment , '\n', Bd, '--------------------------------------------------------------------------------', R, '\n', ))
def give_llama_if_exchanger(dev_name): stats = get_llama_stats(dev_name) if stats['Given'] > stats['Received']: dev_id = re.search(regex['llama_page_dev_id'], stats['HTML']).group(1) give_llama(dev_id) else: llama_counts['not_exchanger'] += 1 echo(Fore.MAGENTA + '{:<5} {:<22} {:<5} {:<5} {:<5}'.format(llama_counts['not_exchanger'], dev_name, stats['Received'], stats['Given'], stats['Received'] - stats['Given']))
def initialize_git_repo(current): os.chdir(current) result = subprocess.Popen(['git', 'init'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) res1 = result.communicate()[0] subprocess.Popen(['git', 'add', '.'], stdout=subprocess.PIPE, stderr=subprocess.PIPE).wait() subprocess.Popen(['git', 'commit', '-m', 'First commit'], stdout=subprocess.PIPE, stderr=subprocess.PIPE).wait() subprocess.Popen(['git', 'tag', '-a', '0.1', '-m', 'First alpha version'], stdout=subprocess.PIPE, stderr=subprocess.PIPE).wait() print(Fore.MAGENTA + " [+] " + str(res1, 'utf-8') + Fore.RESET)
def recv(self, str, mode): if str: print(Back.MAGENTA + str + Style.RESET_ALL) if str and mode == 'hex': print(Fore.MAGENTA + conv().hex(str, ':') + Style.RESET_ALL) # show information
def debug(self, message): print(Fore.MAGENTA + "[DEBUG] " + self.timestamp() + " " + message.encode('ascii', 'ignore').decode('ascii') + Fore.RESET)
def f(self, func="basic", message=""): print(Fore.MAGENTA + "[FUNC/{}] ".format(func.upper()) + self.timestamp() + " " + message.encode("ascii", "ignore").decode('ascii') + Fore.RESET)
def color_critical(message, logger_name = "root"): __log(LEVEL_CRITICAL, '%s%s%s'%(Fore.MAGENTA, message, Fore.RESET), logger_name) ########################interface#######################################
def open_detail_page(filtered_goods): """expect a number or a string which joined by ',' to open the target goods url in a browser window :param filtered_goods :return: None """ print(colorful_text('which do you prefer? type it\'s index', Fore.MAGENTA)) print(colorful_text('if many, use \',\' to split them', Fore.MAGENTA)) print(colorful_text('use \'control + c\' to exit.', Fore.MAGENTA)) try: index = input('goods index: ') result_goods = filter(get_target_goods( index.split(',')), filtered_goods) goods_list = [goods for goods in result_goods] if len(goods_list): for goods in goods_list: goods_url = goods["url"] if goods_url[0] == '/': goods_url = 'https:{}'.format(goods_url) webbrowser.open_new(goods_url) else: error_message('no such index') open_detail_page(filtered_goods) except KeyboardInterrupt: error_message('exit')
def print_purpul(*args): """""" raw = str(args) init(autoreset=True) print((Fore.MAGENTA + raw)) #----------------------------------------------------------------------
def __init__(self): self.colors = {'red':Fore.RED,'green':Fore.GREEN,'cyan':Fore.CYAN,'yellow':Fore.LIGHTYELLOW_EX,'magenta':Fore.MAGENTA,'bold':Style.BRIGHT,'reset':Style.RESET_ALL} self.translator = Translator() self.select_languages = "tr" self.database = "dictionary.db" self.connection = sqlite3.connect(self.database) self.cursor = self.connection.cursor() self.columns = {'isim':'i_anlam','fiil':'f_anlam','zarf':'z_anlam','edat':'e_anlam','baglac':'b_anlam','sifat':'s_anlam','zamir':'zz_anlam'} self.names2 = {'isim':'isim','zarf':'zarf','ba?laç':'baglac','s?fat':'sifat','zamir':'zamir','fiil':'fiil','edat':'edat'} self.c_word = "" self.c_word_last = "" self.c_word_new = ""
def yt_queue(s, m): global prev, song, voice, player, yt ydl_opts = { 'format': 'bestaudio/best', 'noplaylist': True, 'nocheckcertificate': True, 'quiet': True, 'outtmpl': ytDir + '%(id)s', 'default_search': 'auto' } with youtube_dl.YoutubeDL(ydl_opts) as ydl: meta = ydl.extract_info(s, download=False) yt[meta['id']] = { 'id': meta['id'], 'title': meta['title'], 'artist': meta['uploader'], 'album': 'YouTube', 'composer': None, # meta['view_count'] / meta['like_count'] / meta['dislike_count'] 'length': meta['duration'], 'file': meta['id'] } if (meta['id'] in prev): mainMessage = '[' + m.author.display_name + ']?The song [YT] _' + meta['title'] + '_ has already been played recently' elif (meta['id'] in queue): mainMessage = '[' + m.author.display_name + ']?The song [YT] _' + meta['title'] + '_ is already in the queue' elif (meta['duration'] > 900): mainMessage = '[' + m.author.display_name + ']?The song [YT] _' + meta['title'] + '_ is too long (max 15 minutes)' else: with youtube_dl.YoutubeDL(ydl_opts) as ydl: ydl.download([s]) queue.append(meta['id']) dprint(Fore.MAGENTA + m.author.display_name + ' queued:' + Style.NORMAL + ' [YT] ' + meta['uploader'] + ' - ' + meta['title']) mainMessage = '[' + m.author.display_name + ']?Queued [YT] _' + meta['title'] + '_' await client.send_message(m.channel, mainMessage) if (m.server): await client.delete_message(m)
def pretty_print_testcase(testcase, error=''): """ Pretty print a testcase """ if error: msg_template = Style.BRIGHT + '{id}' + Style.RESET_ALL + '\t' + \ Fore.MAGENTA + '{status}' + Fore.RESET + '\t' + \ '{name}\t=> ' + str(error) elif testcase['status'] == 'PASS': msg_template = Style.BRIGHT + '{id}' + Style.RESET_ALL + '\t' + \ Fore.LIGHTGREEN_EX + '{status}' + Fore.RESET + '\t' + \ '{name}\t' else: msg_template = Style.BRIGHT + '{id}' + Style.RESET_ALL + '\t' + \ Fore.LIGHTRED_EX + '{status}' + Fore.RESET + '\t' + \ '{name}\t' print(msg_template.format(**testcase), end=Style.RESET_ALL)
def _gray(self, s): return Fore.MAGENTA + s + Fore.RESET
def print_title( ): print( "\n\n\t\t\t\t" + Fore.YELLOW + "SAMP HELPER" + Fore.MAGENTA + " PYTHON TOOL " + Fore.GREEN + "BY" + Fore.RED + " SREYAS" );
def print_magenta( str ): print( Style.BRIGHT + Fore.MAGENTA + str );
def magenta(self, s): return Fore.MAGENTA + s + Fore.RESET # ??:? ?:?
def caller(self, proxy): """ Populate the XML-RPC system.multicall() with the maximum number of predefined of subrequests and then fire it. """ calls = 0 global exit_flag pbar = tqdm(self.queue.qsize(), desc=self.name, total=self.queue.qsize(), unit='multicall', unit_scale=True, dynamic_ncols=True) while not self.queue.empty() and not exit_flag: chunks_size = self.queue.get() multicall = xmlrpc.client.MultiCall(proxy) for passwords in chunks_size: # Can be any other available method that needs auth. multicall.wp.getUsersBlogs(self.username, passwords.strip()) try: if self.verbose: pbar.write(Fore.MAGENTA + "[{}]".format(self.name) + TRAFFIC_OUT + "XML request [#{}]:".format(calls)) pbar.write("{}".format(chunks_size) + Style.RESET_ALL) res = multicall() except: pbar.write(ERROR + "could not make an XML-RPC call" + Style.RESET_ALL) continue if self.verbose: pbar.write(Back.MAGENTA + "[{}]".format(self.name) + TRAFFIC_IN + "XML response [#{}] (200 OK):".format(calls)) pbar.write("{}".format(res.results) + Style.RESET_ALL) if re.search("isAdmin", str(res.results), re.MULTILINE): i = 0 for item in res.results: if re.search(r"'isAdmin': True", str(item)): exit_flag = True # let time for the threads to terminate time.sleep(2) # pbar.write() seems to be bugged at the moment pbar.write(RESULT + "found a match: \"{0}:{1}\"".format(self.username, chunks_size[i].strip()) + Style.RESET_ALL) # Log the password in case sys.stdout acts dodgy with open("passpot.pot", "a+") as logfile: logfile.write("{0} - {1}:{2}\n".format(self.xmlrpc_intf, self.username, chunks_size[i].strip())) break i += 1 calls += 1 self.queue.task_done() pbar.update() pbar.close()
def _report_created(self): # set up the report data rows = [] ips = [] data_center = self._get_data_center().get('ABBR') plan = self._get_plan().get('RAM') for linode in self._created_linodes: rows.append(( linode['hostname'], linode['public'], linode['private'], linode['gateway'], data_center, plan )) ips.append(linode['public']) firewall_command = './apply-firewall.py --private-key /path/to/key/deis --hosts ' + string.join(ips, ' ') # set up the report constants divider = Style.BRIGHT + Fore.MAGENTA + ('=' * 109) + Fore.RESET + Style.RESET_ALL column_format = " {:<20} {:<20} {:<20} {:<20} {:<12} {:>8}" formatted_header = column_format.format(*('HOSTNAME', 'PUBLIC IP', 'PRIVATE IP', 'GATEWAY', 'DC', 'PLAN')) # display the report print('') print(divider) print(divider) print('') print(Style.BRIGHT + Fore.LIGHTGREEN_EX + ' Successfully provisioned ' + str(self.num_nodes) + ' nodes!' + Fore.RESET + Style.RESET_ALL) print('') print(Style.BRIGHT + Fore.CYAN + formatted_header + Fore.RESET + Style.RESET_ALL) for row in rows: print(Fore.CYAN + column_format.format(*row) + Fore.RESET) print('') print('') print(Fore.LIGHTYELLOW_EX + ' Finish up your installation by securing your cluster with the following command:' + Fore.RESET) print('') print(' ' + firewall_command) print('') print(divider) print(divider) print('')
def plot_similarity_clusters(desc1, desc2, plot = None): """ find similar sounds using Affinity Propagation clusters :param desc1: first descriptor values :param desc2: second descriptor values :returns: - euclidean_labels: labels of clusters """ if plot == True: print (Fore.MAGENTA + "Clustering") else: pass min_max = preprocessing.scale(np.vstack((desc1,desc2)).T, with_mean=False, with_std=False) pca = PCA(n_components=2, whiten=True) y = pca.fit(min_max).transform(min_max) euclidean = AffinityPropagation(convergence_iter=1800, affinity='euclidean') euclidean_labels= euclidean.fit_predict(y) if plot == True: time.sleep(5) print (Fore.WHITE + "Cada número representa el grupo al que pertence el sonido como ejemplar de otro/s. El grupo '0' esta coloreado en azul, el grupo '1' esta coloreado en rojo, el grupo '2' esta coloreado en amarillo. Observa el ploteo para ver qué sonidos son ejemplares de otros") print np.vstack((euclidean_labels,files)).T time.sleep(6) plt.scatter(y[euclidean_labels==0,0], y[euclidean_labels==0,1], c='b') plt.scatter(y[euclidean_labels==1,0], y[euclidean_labels==1,1], c='r') plt.scatter(y[euclidean_labels==2,0], y[euclidean_labels==2,1], c='y') plt.scatter(y[euclidean_labels==3,0], y[euclidean_labels==3,1], c='g') plt.show() else: pass return euclidean_labels # save clusters files in clusters directory
def hexdump(data, cols=80): """ This is the main function which prints everything. """ # print the header print(Fore.MAGENTA + 'pyhexdump: {} bytes'.format(len(data))) print('ascii characters: GREEN') print('non-ascii: RED') print(Fore.BLUE + '{:>6} | {:02X} {:02X} {:02X} {:02X} {:02X} {:02X} {:02X} {:02X} {:02X} {:02X} {:02X} {:02X} {:02X} {:02X} {:02X} {:02X} | {}'.format( 'Offset(h)', 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 'String' )) print('-'*cols + Fore.RESET) # formating string for each line print_string = Fore.BLUE + '{:09X} | ' + Fore.RESET + '{:02X} {:02X} {:02X} {:02X} {:02X} {:02X} {:02X} {:02X} {:02X} {:02X} {:02X} {:02X} {:02X} {:02X} {:02X} {:02X} ' + Fore.GREEN + '| {}' + Fore.RESET # break data up into 16 byte chunks size = 16 buff = [] line = [0]*size for i, char in enumerate(data): if i % size == 0 and i != 0: buff.append(line) line = [0]*size line[0] = char else: line[i % size] = char if i == len(data) - 1: buff.append(line) # print data out for i, line in enumerate(buff): print(print_string.format(i, line[0], line[1], line[2], line[3], line[4], line[5], line[6], line[7], line[8], line[9], line[10], line[11], line[12], line[13], line[14], line[15], recover(line) ) )