我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用colorama.Style.DIM。
def banner(): print(Style.DIM) print(' ___________________________') print(' / /\\') print(' / sadboyzvone\'s _/ /\\') print(' / Intel 8080 / \/') print(' / Assembler /\\') print('/___________________________/ /') print('\___________________________\/') print(' \\ \\ \\ \\ \\ \\ \\ \\ \\ \\ \\ \\ \\ \\ \\' + Style.RESET_ALL + Style.BRIGHT) print(Fore.WHITE + '\nPowered by ' + Fore.BLUE + 'Pyt' + Fore.YELLOW + 'hon' + Fore.WHITE + '\nCopyright (C) 2017, Zvonimir Rudinski') # Print usage information
def test_tag_chars(): with raises(ValueError): am = AnsiMarkup(tag_sep='{') with raises(ValueError): am = AnsiMarkup(tag_sep='(-)') with raises(ValueError): am = AnsiMarkup(tag_sep='qq') am = AnsiMarkup(tag_sep='{}') r1 = p('0<b>1<d>2</d>3</b>4') r2 = am.parse('0{b}1{d}2{/d}3{/b}4') assert r1 == r2 == '0' + S.BRIGHT + '1' + S.DIM + '2' + S.RESET_ALL + S.BRIGHT + '3' + S.RESET_ALL + '4' assert s('<b>1</b>') == am.strip('{b}1{/b}') == '1'
def local_exec(cmd, env=None, shell=False, check_retcode=True): LOG.debug("{}Executing locally: '{}'{}".format( Style.DIM, cmd if isinstance(cmd, basestring) else " ".join(cmd), Style.RESET_ALL)) if env is not None: # Sanitize env because it doees not digest non string values env = dict((key, str(value)) for key, value in env.items()) # Unite the system and custom environments env = dict(os.environ, **env) proc = subprocess.Popen(cmd, env=env, stderr=subprocess.PIPE, stdout=subprocess.PIPE, shell=shell) out, err = proc.communicate() ret_code = proc.returncode _proceed_exec_result( out.decode('ascii', 'ignore'), err.decode('ascii', 'ignore'), ret_code, check_retcode) return ret_code, out, err
def ssh_exec_live(ssh, cmd, timeout=None, check_retcode=True): LOG.debug(u"{}Calling SSH live: '{}'{}".format(Style.DIM, cmd, Style.RESET_ALL)) try: interact = SSHClientInteraction(ssh, timeout=timeout, display=True, logger=logging.getLogger()) interact.expect('.*') interact.send(cmd + "; exit $?") # needed to not depend on prompt type interact.tail() ret_code = interact.channel.recv_exit_status() except Exception: LOG.debug("Something went wrong in 'ssh_exec_live':\n{}".format( format_exception(sys.exc_info()) )) raise _proceed_exec_result("", "", ret_code, check_retcode) return ret_code, "", ""
def get_scanning_result(self, iface): indent = ' ' * 1 flagExec = self.exec_iwlist(iface) if flagExec: header = indent + 'Scanning WiFi networks using interface \'' + iface + '\'\n' network_table = [['SSID', 'AP Address', 'Channel', 'Encryption', 'Quality']] for dict_network in self.parsed_cells: network_table.append([ dict_network['Name'], dict_network['Address'], dict_network['Channel'], dict_network['Encryption'], dict_network['Quality'] ]) table = AsciiTable(network_table) print (Fore.YELLOW + Style.DIM + header + table.table) return True else: return False
def print_ifaces_wireless_table(self): indent = ' ' * 1 header = indent + 'Wireless interfaces information:\n' self.wireless_table = [['Interface', 'Status', 'IP Address', 'Mask', 'Mode', 'SSID', 'AP Address', 'Wireless Type']] for data in self.ifaces_wireless: self.wireless_table.append([ data, self.dict_status[data], self.dict_ipaddr[data], self.dict_mask[data], self.dict_mode[data], self.dict_ssid[data], self.dict_apaddr[data], self.dict_wname[data] ]) table = AsciiTable(self.wireless_table) print (Fore.YELLOW + Style.DIM + header + table.table)
def app_header(self): header = '\n' header += ' ??????????? ???? ?????? ???? ??? ?????? ??????? ??????????????? \n' header += ' ????????????? ?????????????????? ??????????????????? ????????????????\n' header += ' ??? ????????????????????????? ?????????????? ?????????? ????????\n' header += ' ??? ???????????????????????????????????????? ????????? ????????\n' header += ' ??????????? ??? ?????? ?????? ????????? ??????????????????????? ???\n' header += ' ?????????? ?????? ?????? ???????? ??? ??????? ??????????? ???\n' header2 = ' Connection Manager\n' header3 = ' {}\n'.format('-' * 70) header3 += ' Version : {}\n'.format(__version__) header3 += ' Release date : {}\n'.format(__release_date__) header3 += ' Github : https://github.com/fachrioktavian/CManager\n' header3 += ' Dev by : {0} - ({1})\n'.format(__author__, __author_email__) header3 += ' Official : https://dracos-linux.org\n' header3 += ' {}\n'.format('-' * 70) print (Fore.RED + Style.DIM + header + header2) print (Fore.CYAN + Style.DIM + header3)
def menu(): # Using colour from colorama: https://pypi.python.org/pypi/colorama # Formatting e.g.: Fore.COLOUR, Back.COLOUR, Style.DIM with e.g. DIM, RED, CYAN, etc.: print(Fore.BLACK + Back.WHITE + "10cbazbt3 menu:" + Style.RESET_ALL) print(Fore.YELLOW + Style.DIM + "Main:") print(" b = Blurb m = Mentions") print(" r = Reply t = Timeline") print(" blog = BLOG o = Own blurbs") print(" pins = PINS") print("Admin:") print(" Login = Login menu = show Menu") print(" Logout = Logout. sites = my Sites") print(" exit = Exit") print(Style.RESET_ALL) # DEFINE 10C POST INTERACTIONS: # LOTS OF DUPLICATION HERE! # Define the 'blurb' (social post) subroutine:
def blurb(): # Input some text: posttext = input(Fore.YELLOW + Style.DIM + "Write some text: " + Style.RESET_ALL) # Saves the input text to 'posttext.txt': file = open("posttext.txt", "w") file.write(posttext) file.close() # Uses the global header & creates the data to be passed to the url: url = 'https://api.10centuries.org/content' data = {'content': posttext} response = requests.post(url, headers=headers, data=data) # Displays the server's response: responsestatus = response.status_code showapiresponse(responsestatus) # Define the 'post' (blog post) subroutine:
def post(): # Input blog post data: posttitle = input(Fore.YELLOW + Style.DIM + "Write a blog post title: " + Style.RESET_ALL) print(Fore.YELLOW + Style.DIM + "Write a blog post:") print("(Press [ctrl-d] to 'save' when you finish writing.)" + Style.RESET_ALL) posttext = sys.stdin.read() # Adds a post date & time, currently set as 'now': postdatetime = strftime("%Y-%m-%d %H:%M:%S") # Uses the global header & creates the data to be passed to the url: url = 'https://api.10centuries.org/content' # IMPORTANT: @bazbt3's channel_id = 6. SUBSTITUTE WITH YOUR CHANNEL_ID in global definitions! data = {'title': posttitle, 'content': posttext, 'channel_id': channelid, 'send_blurb': 'Y', 'pubdts': postdatetime} response = requests.post(url, headers=headers, data=data) # Displays the server's response: responsestatus = response.status_code showapiresponse(responsestatus) # Define the 'reply' subroutine:
def reply(): # INEFFICIENT: NEED TO MERGE MOST OF THE CODE FROM THIS AND THE REPLYINLINE SUBROUTINE: # Input a reply-to post number: replytoid = input(Fore.YELLOW + Style.DIM + "Post number to reply to: " + Style.RESET_ALL) # Input some text: posttext = input(Fore.YELLOW + Style.DIM + "Write some text (add usernames!): " + Style.RESET_ALL) # Saves the input text to 'posttext.txt': file = open("posttext.txt", "w") file.write(posttext) file.close() # Uses the global header & creates the data to be passed to the url: url = 'https://api.10centuries.org/content' data = {'reply_to': replytoid, 'content': posttext} response = requests.post(url, headers=headers, data=data) # Displays the server's response: responsestatus = response.status_code showapiresponse(responsestatus) # Define the 'replyinline' subroutine: # INEFFICIENT: SEE THE REPLY SUBROUTINE:
def replyinline(postidreply, poster): # Use the to-be-replied-to post id: replytoid = postidreply replytoposter = poster # Input some text: posttext = input(Fore.YELLOW + Style.DIM + "Write some text: " + Style.RESET_ALL) # Add '@', username to posttext: posttext = ("@" + replytoposter + " " + posttext) # Saves the input text to 'posttext.txt': file = open("posttext.txt", "w") file.write(posttext) file.close() # Uses the global header & creates the data to be passed to the url: url = 'https://api.10centuries.org/content' data = {'reply_to': replytoid, 'content': posttext} response = requests.post(url, headers=headers, data=data) # Displays the server's response: responsestatus = response.status_code showapiresponse(responsestatus) # Define the 'repostinline' subroutine:
def test_flat(): assert p('<b>1</b>') == p('<bold>1</bold>') == S.BRIGHT + '1' + S.RESET_ALL assert p('<d>1</d>') == p('<dim>1</dim>') == S.DIM + '1' + S.RESET_ALL assert p('<b>1</b><d>2</d>') == S.BRIGHT + '1' + S.RESET_ALL + S.DIM + '2' + S.RESET_ALL assert p('<b>1</b>2<d>3</d>') == S.BRIGHT + '1' + S.RESET_ALL + '2' + S.DIM + '3' + S.RESET_ALL
def test_nested(): assert p('0<b>1<d>2</d>3</b>4') == '0' + S.BRIGHT + '1' + S.DIM + '2' + S.RESET_ALL + S.BRIGHT + '3' + S.RESET_ALL + '4' assert p('<d>0<b>1<d>2</d>3</b>4</d>') == S.DIM + '0' + S.BRIGHT + '1' + S.DIM + '2' + S.RESET_ALL + S.DIM + S.BRIGHT + '3' + S.RESET_ALL + S.DIM +'4' + S.RESET_ALL
def m_info(self, m): m = '[*] ' + m if COLORAMA: print Style.DIM + m else: print m
def banner(): banner = Style.DIM + """ __ ___ __ ___ | |_ __ _ __ ___ \ \/ / '_ ` _ \| | '__| '_ \ / __|____ > <| | | | | | | | | |_) | (_|_____| /_/\_\_| |_| |_|_|_| | .__/ \___| |_| _ _ __ | |__ _ __ _ _| |_ ___ / _| ___ _ __ ___ ___ _ __ | '_ \| '__| | | | __/ _ \ |_ / _ \| '__/ __/ _ \ '__| | |_) | | | |_| | || __/ _| (_) | | | (_| __/ | |_.__/|_| \__,_|\__\___|_| \___/|_| \___\___|_| """ + Style.RESET_ALL print("{}".format(banner))
def ssh_exec(ssh, cmd, timeout=None, check_retcode=True, get_pty=False): LOG.debug(u"{}Calling SSH: '{}'{}".format(Style.DIM, cmd, Style.RESET_ALL)) try: _, out, err = ssh.exec_command(cmd, timeout=timeout, get_pty=get_pty) ret_code = out.channel.recv_exit_status() out, err = out.read().strip(), err.read().strip() except Exception: LOG.debug("Something went wrong in 'ssh_exec':\n{}".format( format_exception(sys.exc_info()) )) raise _proceed_exec_result(out, err, ret_code, check_retcode) return ret_code, out, err
def _run_ssh_pexpect(cmd, password, using_bashc=False): """ Run a given command using pexpect. """ logger.debug(u'{}SSH Command: {}{}'.format(Style.DIM, cmd, Style.RESET_ALL)) if using_bashc: ssh_cli = pexpect.spawn('/bin/bash', ['-c', cmd]) else: ssh_cli = pexpect.spawn(cmd) i = ssh_cli.expect(['[Pp]assword: ', '\(yes/no\)\? ']) if i == 1: ssh_cli.sendline('yes') ssh_cli.expect('[Pp]assword: ') ssh_cli.sendline(password) time.sleep(1) ssh_cli.expect(['Connection to [0-9\.a-z]+ is closed.', pexpect.EOF, pexpect.TIMEOUT], timeout=5) # Expected behavior is to get pexpect.EOF or closed connection, but due to # a bug in docker we have to send an additional new line or Ctrl^C out = str(ssh_cli.before) + str(ssh_cli.after) logger.debug(u'Output:\n {}{}{}'.format(Fore.YELLOW, out, Style.RESET_ALL)) if ssh_cli.isalive(): ssh_cli.close() return out
def _run_scp_command(cmd, user, host, password): """ Emulate user command line interation using SCP protocol :param cmd: command to be executed :param user: remote host user :param host: remote host IP/hostname :param password: passwrod for remote user on host :returns None: """ logger.debug(u'{}Running SCP: {}{}'.format( Style.DIM, cmd, Style.RESET_ALL)) scp = pexpect.spawn(cmd) i = scp.expect(['\(yes/no\)\? ', '[Pp]assword: ']) if i == 0: scp.sendline('yes') scp.expect('[Pp]assword: ') scp.sendline(password) time.sleep(1) try: while True: i = scp.expect([pexpect.EOF, '[0-9][0-9]:[0-9][0-9] '], timeout=5) if i == 0: logger.debug(u'{}{}{}'.format(Fore.YELLOW, scp.before, Style.RESET_ALL)) break logger.debug(u'{}{}{}{}'.format(Fore.YELLOW, scp.before, scp.after, Style.RESET_ALL)) time.sleep(.1) except pexpect.TIMEOUT: # A docker bug expecting an extra new line in the end. Ideally we # will exit the loop getting pexpect.EOF, i.e. i==0 logger.debug(u'{}{}{}'.format(Fore.YELLOW, scp.before, Style.RESET_ALL)) finally: if scp.isalive(): scp.close()
def _compare_files(local_path, remote_path, ssh): """ Compare hashes of two files, one on local and another one on remote server :param local_path: path to file on the local server :param remote_path: path to file on the remote server :param ssh: SSHClient instance to communicate with remote server :returns: True/False on success/fail :rtype: bool """ logger.debug(u'{}Comparing files. host: {} and container: {}{}'.format( Style.DIM, local_path, remote_path, Style.RESET_ALL)) # Sometimes ssh_exec exits with a 0 code, but no stdout can be read, # so we moved the file hash call inside a function. def _remote_md5sum(ssh, path): remote_cmd = 'md5sum {}'.format(path) ret_code, out, err = ssh_exec(ssh=ssh, cmd=remote_cmd, get_pty=True) _remote_file_hash = out.strip().split()[0].strip() return _remote_file_hash # Get hash of the remote file. remote_file_hash = retry(_remote_md5sum, ssh=ssh, path=remote_path, tries=3, interval=1) # Get hash of the local file try: with open(local_path) as f: local_file_hash = hashlib.md5(f.read()).hexdigest() except (OSError, IOError) as e: raise FileTransferValidationFailed(str(e)) # Compare hashes if local_file_hash != remote_file_hash: message = 'Hashes not equal. Host: {} != container: {}'.format( local_file_hash, remote_file_hash) logger.debug(u'{}host: {} container: {}{}'.format( Fore.RED, local_path, remote_path, Style.RESET_ALL)) raise FileTransferValidationFailed(message) logger.debug(u'{}Validation: OK{}'.format(Fore.GREEN, Style.RESET_ALL)) return True
def test_SNAT_rules(cluster): container_ids, container_ips, pods, specs = setup_pods(cluster) # --------- Test that SNAT rules are applied correctly -------- jenkins_ip = _get_jenkins_ip(cluster) LOG.debug('{}Test that SNAT rules work properly{}'.format( Fore.CYAN, Style.RESET_ALL)) LOG_MSG = "Check SNAT rules for pod '{}' public IP: '{}' host node: '{}'" BIND_IP = '0.0.0.0' POD_TCP_CMD = 'nc -z -v {} {}'.format(jenkins_ip, JENKINS_TCP_SERVER_PORT) POD_UDP_CMD = 'nc -u -z -v {} {}'.format(jenkins_ip, JENKINS_UDP_SERVER_PORT) for name, pod in pods.items(): msg = LOG_MSG.format(name, pod.public_ip, specs[name]['host']) # Check if pod can ping jenkins ping(pod, container_ids[name], jenkins_ip) LOG.debug('{}TCP check {}{}'.format(Style.DIM, msg, Style.RESET_ALL)) # Check if SNAT rules work properly for TCP connections with jenkins_accept_connections( SocketServer.TCPServer, MyRequestHandler, BIND_IP, JENKINS_TCP_SERVER_PORT) as connection_list: pod.docker_exec(container_ids[name], POD_TCP_CMD) _check_visible_ip(pod, specs, connection_list) LOG.debug('{}UDP check {}{}'.format(Style.DIM, msg, Style.RESET_ALL)) # Check if SNAT rules work properly for UDP connections with jenkins_accept_connections( SocketServer.UDPServer, MyRequestHandler, BIND_IP, JENKINS_UDP_SERVER_PORT) as connection_list: pod.docker_exec(container_ids[name], POD_UDP_CMD) _check_visible_ip(pod, specs, connection_list)
def say(message): print(prefix + Style.DIM + message + Style.RESET_ALL)
def log_debug(message): print(Style.DIM + Fore.MAGENTA + message + Fore.RESET + Style.RESET_ALL)
def winrm(self, script, winrm_kwargs=dict(), quiet=False, **kwargs): """ Executes a remote windows powershell script :param script: A string with the powershell script :param winrm_kwargs: The pywinrm Protocol class kwargs :param quiet: Whether to hide the stdout/stderr output or not :return: A tuple with the status code, the stdout and the stderr :raise: WinRmError: If the command fails """ if self._vm_object: self._wait_for_winrm_service( kwargs['vcdriver_vm_winrm_username'], kwargs['vcdriver_vm_winrm_password'], **winrm_kwargs ) winrm_session = self._open_winrm_session( kwargs['vcdriver_vm_winrm_username'], kwargs['vcdriver_vm_winrm_password'], winrm_kwargs ) if not quiet: print('Executing remotely on {} ...'.format(self.ip())) styled_print(Style.DIM)(script) status, stdout, stderr = self._run_winrm_ps(winrm_session, script) if not quiet: styled_print(Style.BRIGHT)('CODE: {}'.format(status)) styled_print(Fore.GREEN)(stdout) if status != 0: if not quiet: styled_print(Fore.RED)(stderr) raise WinRmError(script, status, stdout, stderr) else: return status, stdout, stderr
def TOR_CHECK_PRNT(data): if (data["IsTor"]==True): print ("[" + Fore.GREEN + Style.BRIGHT + "+" + Style.RESET_ALL + "]" + Fore.GREEN + Style.BRIGHT + " TorStat's network traffic was routed through tor, onto the next stage...\n" + Style.RESET_ALL) return True elif (data["IsTor"]==False): print ("[" + Fore.YELLOW + Style.BRIGHT + "!" + Style.RESET_ALL + "]" + Fore.YELLOW + Style.BRIGHT +" TorStat cannot perform any more recon on the tor servers because TorStat's network traffic was not routed through tor.\n\t " + Style.RESET_ALL + Fore.WHITE + Style.DIM + "=> try : proxychains python TorStat.py\n" + Style.RESET_ALL) return False
def parseLine(line): severity = getSeverity(line) # Add color based on severity if 'severity' not in locals(): severity = 3 if severity == 0: color = Style.DIM + Fore.WHITE elif severity == 1: color = Style.NORMAL + Fore.BLUE elif severity == 2: color = Style.NORMAL + Fore.CYAN elif severity == 3: color = Style.NORMAL + Fore.WHITE elif severity == 4: color = Style.NORMAL + Fore.RED elif severity == 5: color = Style.NORMAL + Fore.BLACK + Back.RED else: color = Style.NORMAL + Fore.BLACK + Back.YELLOW # Replace html tab entity with actual tabs line = clearTags(line) line = line.replace('	', "\t") return color + line + Style.RESET_ALL
def chitchat(self, msg, eol=None): if msg: print(Style.DIM + msg + Style.RESET_ALL, end=eol) sys.stdout.flush() # show warning message
def errmsg(self, msg, info=""): info = str(info).strip() if info: # monkeypatch to make python error message less ugly info = item(re.findall('Errno -?\d+\] (.*)', info), '') or info.splitlines()[-1] info = Style.RESET_ALL + Style.DIM + " (" + info.strip('<>') + ")" + Style.RESET_ALL if msg: print(Back.RED + msg + info) # show printer and status
def psfind(self, name): vol = Style.DIM + Fore.YELLOW + item(re.findall("^(%.*%)", name)) + Style.RESET_ALL name = Fore.YELLOW + const.SEP + re.sub("^(%.*%)", '', name) + Style.RESET_ALL print("%s %s" % (vol, name)) # show directory listing
def psdir(self, isdir, size, mtime, name, otime): otime = Style.DIM + "(created " + otime + ")" + Style.RESET_ALL vol = Style.DIM + Fore.YELLOW + item(re.findall("^(%.*%)", name)) + Style.RESET_ALL name = re.sub("^(%.*%)", '', name) # remove volume information from filename name = Style.BRIGHT + Fore.BLUE + name + Style.RESET_ALL if isdir else name if isdir: print("d %8s %s %s %s %s" % (size, mtime, otime, vol, name)) else: print("- %8s %s %s %s %s" % (size, mtime, otime, vol, name)) # show directory listing
def pcldir(self, size, mtime, id, name): id = Style.DIM + "(macro id: " + id + ")" + Style.RESET_ALL print("- %8s %s %s %s" % (size, mtime, id, name)) # show output from df
def debug(self, indent, actor, deb_type, msg): """ deb_type -> 0:info, 1:succeed, 2:failed """ list_type = [Fore.YELLOW, Fore.GREEN, Fore.RED] deb_style = Style.DIM deb_color = list_type[deb_type] spaces = ' ' * indent print (deb_color + deb_style + spaces + '[' + actor + '] ' + msg)
def show_options(self): indent = ' ' header = indent + 'Available Options for creating profile:\n' info = 'name -> ' + self.dict_profile['NAME'] + '\n' info += 'ssid -> ' + self.dict_profile['SSID'] + '\n' info += 'type -> ' + self.dict_profile['TYPE'] + '\n' info += 'passphrase -> ' + self.dict_profile['PASSPHRASE'] print (Fore.YELLOW + Style.DIM + header + info)
def show_profile(self): indent = ' ' header = indent + 'List of profiles saved by system:\n' table = [['Profile name', 'Connection type', 'SSID', 'Passphrase']] list_profile = os.listdir(self.profile_path) for fileconf in list_profile: of_file = open(self.profile_path + fileconf, 'r') of = of_file.read().split('\n') if of[0][1::] == 'Open': table.append([fileconf[:-5:], 'Open', of[2][7:-1:], '-']) elif of[0][1::] == 'WPA': table.append([fileconf[:-5:], 'WPA', of[2][7:-1:], of[3][7:-1:]]) of_file.close() print (Fore.YELLOW + Style.DIM + header + AsciiTable(table).table)
def print_ifaces_localhost_table(self): indent = ' ' * 1 header = indent + 'Localhost interfaces information:\n' self.localhost_table = [['Interface', 'Status', 'IP Address', 'Mask']] for data in self.ifaces_localhost: self.localhost_table.append([ data, self.dict_status[data], self.dict_ipaddr[data], self.dict_mask[data] ]) table = AsciiTable(self.localhost_table) print (Fore.YELLOW + Style.DIM + header + table.table)
def dprint(request): print(Style.DIM + time.strftime('<%Y/%m/%d %H:%M:%S> ') + Style.BRIGHT + request)
def get_text_style(self, event): if event.name == 'RunningTask': return Style.BRIGHT elif event.name == 'SkippingTask': return Style.DIM elif event.name == 'RunningCommand': return Style.BRIGHT elif event.kind is EventKind.output: return Style.DIM elif event.kind is EventKind.error: return Style.BRIGHT + Fore.RED
def _wrap_color(code_string): """Wrap key parts in styling and resets. Stying for each key part from, (col_offset, fromlineno) to (end_col_offset, end_lineno). Note: use this to set color back to default (on mac, and others?): Style.RESET_ALL + Style.DIM """ ret = Style.BRIGHT + Fore.WHITE + Back.BLACK ret += code_string ret += Style.RESET_ALL + Style.DIM + Fore.RESET + Back.RESET return ret
def mentions(): # How many mentions posts to retrieve? postcount = input(Fore.YELLOW + Style.DIM + "How many posts? " + Style.RESET_ALL) postcount = str(postcount) # Uses the global header & creates the data to be passed to the url: url = 'https://api.10centuries.org/content/blurbs/mentions?count=' + postcount data = {'count': postcount} response = requests.get(url, headers=headers, data=data) # Pass the API response to 'timelinebase': timelinebase(response, postcount) # Define the 'hometimeline' subroutine:
def hometimeline(): # How many home timeline posts to retrieve? postcount = input(Fore.YELLOW + Style.DIM + "How many posts? " + Style.RESET_ALL) postcount = str(postcount) # Uses the global header & creates the data to be passed to the url: url = 'https://api.10centuries.org/content/blurbs/home?count=' + postcount data = {'count': postcount} response = requests.get(url, headers=headers, data=data) # Pass the API response to 'timelinebase': timelinebase(response, postcount) # Define the 'ownblurbtimeline' subroutine: # BUGGY AS-OF v0.2.4 - SEE THE COMMENTED-OUT CODE ON THE 'url' ASSIGNMENT LINE:
def ownblurbtimeline(): # How many own posts to retrieve? postcount = input(Fore.YELLOW + Style.DIM + "How many posts? " + Style.RESET_ALL) postcount = str(postcount) # Uses the global header & creates the data to be passed to the url: url = 'https://api.10centuries.org/users/blurbs/' + accountid #+ '?count=' + postcount data = {'count': postcount} response = requests.get(url, headers=headers, data=data) # Pass the API response to 'timelinebase': timelinebase(response, postcount) # Define the 'ownpinstimeline' subroutine:
def ownpinstimeline(): # How many own pinned posts to retrieve? postcount = input(Fore.YELLOW + Style.DIM + "How many posts? " + Style.RESET_ALL) postcount = str(postcount) # Uses the global header & creates the data to be passed to the url: url = 'https://api.10centuries.org/content/blurbs/pins?count=' + postcount data = {'count': postcount} response = requests.get(url, headers=headers, data=data) # Pass the API response to 'timelinebase': timelinebase(response, postcount) # DEFINE 10C ADMIN SUBROUTINES: # Define 'authorise' subroutine:
def login(): # Input account name: my_acctname = input(Fore.YELLOW + Style.DIM + "10C Username (account email address): " + Fore.WHITE) # Input account password: my_acctpass = getpass.getpass(Fore.YELLOW + Style.DIM + "10C Password (is not shown onscreen): " + Style.RESET_ALL) # Construct the login URL & data passed to the API: url = 'https://api.10centuries.org/auth/login' loginheaders = {'Content-Type': 'application/x-www-form-urlencoded'} data = {'client_guid': my_client_guid, 'acctname': my_acctname, 'acctpass': my_acctpass} response = requests.post(url, headers=loginheaders, data=data) # Saves the server's JSON response to 'loginresponse.txt': file = open("loginresponse.txt", "w") file.write(response.text) file.close() # Extracts the auth token from the JSON file: json_data = open("loginresponse.txt", "r") decoded = json.load(json_data) temptoken = decoded['data']['token'] # Saves just the auth token to authtoken.txt: file = open("authtoken.txt", "w") file.write(temptoken) file.close() # Let the user know the application is authorised: print("") print(Fore.BLACK + Back.GREEN + "Authorised" + Style.RESET_ALL + Fore.YELLOW + Style.DIM + " (but check for a 'Connected' indicator!)" + Style.RESET_ALL) print("") setloginstatus("In") authorise() # Define the 'logout' subroutine:
def _format_line(line: str, n: int, padding: int) -> str: """Format single line of code.""" return ' {dim}{n}{reset}: {line}'.format(dim=Style.DIM, n=str(n + 1).zfill(padding), line=line, reset=Style.RESET_ALL)
def _get_lines_with_highlighted_error(e: CompilationError) -> Iterable[str]: """Format code with highlighted syntax error.""" error_line = e.lineno - 1 lines = e.code.split('\n') padding = len(str(len(lines))) from_line = error_line - const.SYNTAX_ERROR_OFFSET if from_line < 0: from_line = 0 if from_line < error_line: for n in range(from_line, error_line): yield _format_line(lines[n], n, padding) yield ' {dim}{n}{reset}: {bright}{line}{reset}'.format( dim=Style.DIM, n=str(error_line + 1).zfill(padding), line=lines[error_line], reset=Style.RESET_ALL, bright=Style.BRIGHT) yield ' {padding}{bright}^{reset}'.format( padding=' ' * (padding + e.offset + 1), bright=Style.BRIGHT, reset=Style.RESET_ALL) to_line = error_line + const.SYNTAX_ERROR_OFFSET if to_line > len(lines): to_line = len(lines) for n in range(error_line + 1, to_line): yield _format_line(lines[n], n, padding)