我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用requests.get()。
def fetch_data(): try: r = requests.get(MTG_JSON_URL) except requests.ConnectionError: r = requests.get(FALLBACK_MTG_JSON_URL) with closing(r), zipfile.ZipFile(io.BytesIO(r.content)) as archive: unzipped_files = archive.infolist() if len(unzipped_files) != 1: raise RuntimeError("Found an unexpected number of files in the MTGJSON archive.") data = archive.read(archive.infolist()[0]) decoded_data = data.decode('utf-8') sets_data = json.loads(decoded_data) return sets_data
def main(): for url in url_list : try: r = requests.get(url) except : continue tree = html.fromstring(r.text) script = tree.xpath('//script[@language="javascript"]/text()')[0] json_string = regex.findall(script)[0] json_data = json.loads(json_string) next_page_url = tree.xpath('//footer/a/@href') links = [domain + x['nodeRef'] for x in json_data] for link in links: extract(link)
def sendRequest(ip, port, route, data=None, protocol="http"): url = "{protocol}://{ip}:{port}{route}".format(protocol=protocol, ip=ip, port=port, route=route) if data is not None: try: resp = requests.post(url, data=data) except requests.HTTPError as e: raise PipelineServiceError("{reason}".format(reason=e)) else: try: resp = requests.get(url) except requests.HTTPError as e: raise PipelineServiceError("{reason}".format(reason=e)) return resp
def run(self): Analyzer.run(self) if self.data_type == 'domain' or self.data_type == 'url': try: pattern = re.compile("(?:Category: )([\w\s]+)") baseurl = 'http://www.fortiguard.com/webfilter?q=' url = baseurl + self.getData() req = requests.get(url) category_match = re.search(pattern, req.content, flags=0) self.report({ 'category': category_match.group(1) }) except ValueError as e: self.unexpectedError(e) else: self.notSupported()
def judge_ip(self, ip, port): #??ip???? http_url = "http://www.baidu.com" proxy_url = "http://{0}:{1}".format(ip, port) try: proxy_dict = { "http":proxy_url, } response = requests.get(http_url, proxies=proxy_dict) except Exception as e: print ("invalid ip and port") self.delete_ip(ip) return False else: code = response.status_code if code >= 200 and code < 300: print ("effective ip") return True else: print ("invalid ip and port") self.delete_ip(ip) return False
def watchJob(jobId, exchangeName): queue = PipelineQueue('PIPELINE_JOB_{j}'.format(j=jobId)) queue.bindToExchange(exchangeName, jobId) while True: body, method = queue.get() if method: body = json.loads(body) if body["current_status"] == "SUCCEEDED": return jobId else: raise PipelineServiceError("Job {j} has current status {s}!".format(j=jobId, s=body["current_status"])) else: pass
def _verify(self): try: self.read(self.path) except IOError as e: print "Couldn't open {path}: {reason}".format(path=self.path, reason=e) exit(-1) else: d = {} for name, attrs in self._configParams.iteritems(): if attrs["required"]: if not self.has_section(attrs["section"]): raise LookupError("missing required section {s} in the configuration!\nRUN `isb-cgc-pipelines config` to correct the configuration".format(s=attrs["section"])) if not self.has_option(attrs["section"], name): raise LookupError("missing required option {o} in section {s}!\nRun `isb-cgc-pipelines config` to correct the configuration".format(s=attrs["section"], o=name)) try: d[name] = self.get(attrs["section"], name) except NoOptionError: pass except NoSectionError: pass return d
def get_best(url): url = 'http://www.infoarena.ro' + url source_code = requests.get(url) plain_text = source_code.text soup = BeautifulSoup(plain_text, "html.parser") name = soup.find('span', {'class': 'username'}).find('a')['href'][35:] tests = soup.find_all('td', {'class': 'number'}) max_ms = -1 for test in tests: test = test.string if test.endswith('ms'): time = int(test.strip('ms')) max_ms = max(max_ms, time) if name not in d or max_ms < d[name][0]: d[name] = (max_ms, url) print(max_ms, name, url)
def list(package_name): # lists all of the packages for a user, or all of the implementations for a package # <username> / <package> , <implementation> # detemine if there's a user and package, or just a user p = split_package_name(package_name) if p['username'] != None: # get all of the packages and print their names in a pretty print if p['package'] != None: # get all implementations and print their names in a pretty print if p['implementation'] != None: print('Cannot list one specific implementation. Use "print".') return return return print('Error parsing arguments. Got {}. Specify in format such that: <username>/<package> with <package> being optional.'.format(p)) # @cli.command() # @click.argument('payload') # def print(payload): # pass
def __get_api_conf(self, sfile, conf_name): full_path = Fun.get_file_in_directory_full_path(sfile) print full_path if not os.path.exists(full_path): print("Error: Cannot get config file") sys.exit(-1) sfile = full_path conf = ConfigParser.ConfigParser() conf.read(sfile) print conf.sections() try: self.url = conf.get(conf_name, "url") self.access_token = conf.get(conf_name, "access_token") self.api_token = conf.get(conf_name, "api_token") except Exception, e: print("Error: " + str(e)) sys.exit(-1) # ????
def pull_user_data(session): print 'pulling users' user_data = requests.get(u"{}{}".format(config.prod_url, 'export/users')) loaded_data = json.loads(user_data.text) for user_dict in loaded_data: user = User( id=user_dict['id'], name=user_dict['name'], email=user_dict['email'], admin=user_dict['admin'], avatar=user_dict['avatar'], active=user_dict['active'], created_at=user_dict['created_at'], elo=user_dict['elo'], wins=user_dict['wins'], losses=user_dict['losses'] ) session.add(user) session.commit() print 'done pulling users'
def pull_game_data(session): print 'pulling games' game_data = requests.get(u"{}{}".format(config.prod_url, 'export/games')) loaded_data = json.loads(game_data.text) for game_dict in loaded_data: game = Game( id=game_dict['id'], created_at=game_dict['created_at'], deleted_at=game_dict['deleted_at'], winner_id=game_dict['winner_id'], winner_elo_score=game_dict['winner_elo_score'], loser_id=game_dict['loser_id'], loser_elo_score=game_dict['loser_elo_score'], submitted_by_id=game_dict['submitted_by_id'] ) session.add(game) session.commit() print 'done pulling games'
def format_data(cls, data): """Re-format the response data for the front-end. Arguments: data (:py:class:`dict`): The JSON data from the response. Returns: :py:class:`dict`: The re-formatted data. """ builds = [cls.format_build(build) for build in data.get('builds', [])] estimate_time(builds) return dict( builds=builds[:4], health=health_summary(builds), name=data.get('repository_name'), )
def format_build(cls, build): """Re-format the build data for the front-end. Arguments: build (:py:class:`dict`): The JSON data from the response. Returns: :py:class:`dict`: The re-formatted data. """ start, finish, elapsed = elapsed_time( build.get('started_at'), build.get('finished_at'), ) return super().format_build(dict( author=build.get('github_username'), duration=( None if start is None or finish is None else finish - start ), elapsed=elapsed, message=build.get('message'), outcome=build.get('status'), started_at=start, ))
def format_data(cls, name, data): """Re-format the response data for the front-end. Arguments: data (:py:class:`list`): The JSON data from the response. name (:py:class:`str`): The name of the repository. Returns: :py:class:`dict`: The re-formatted data. """ return dict( commits=[cls.format_commit(commit.get('commit', {})) for commit in data[:5] or []], name=name, )
def half_life(issues): """Calculate the half life of the service's issues. Args: issues (:py:class:`list`): The service's issue data. Returns: :py:class:`datetime.timedelta`: The half life of the issues. """ lives = [] for issue in issues: start = safe_parse(issue.get('created_at')) end = safe_parse(issue.get('closed_at')) if start and end: lives.append(end - start) if lives: lives.sort() size = len(lives) return lives[((size + (size % 2)) // 2) - 1]
def details(self, iteration): """Update the project data with more details. Arguments: iteration (:py:class:`int`): The current iteration number. Returns: :py:class:`dict`: Additional detail on the current iteration. """ url = self.url_builder( '/projects/{id}/iterations/{number}', params={'number': iteration, 'id': self.project_id}, url_params={'fields': ':default,velocity,stories'}, ) response = requests.get(url, headers=self.headers) if response.status_code == 200: update = response.json() return dict( stories=self.story_summary(update.get('stories', [])), velocity=update.get('velocity', 'unknown'), ) else: logger.error('failed to update project iteration details') return {}
def format_data(self, name, data): """Re-format the response data for the front-end. Arguments: data (:py:class:`dict`): The JSON data from the response. name (:py:class:`str`): The name of the repository. Returns: :py:class:`dict`: The re-formatted data. """ builds = [self.format_build(build) for build in data.get('builds', [])] return dict( builds=builds[:4], health=self.health(builds[0] if builds else None), name=name, )
def format_build(cls, build): """Re-format the build data for the front-end. Arguments: build (:py:class:`dict`): The JSON data from the response. Returns: :py:class:`dict`: The re-formatted data. """ coverage = build.get('covered_percent') message = build.get('commit_message') return dict( author=build.get('committer_name') or '<no author>', committed=occurred(build.get('created_at')), coverage=None if coverage is None else '{:.1f}%'.format(coverage), message_text=remove_tags(message) if message else None, raw_coverage=coverage, )
def format_data(self, data): """Re-format the response data for the front-end. Arguments: data (:py:class:`dict`): The JSON data from the response. Returns: :py:class:`dict`: The re-formatted data. """ commits = {commit['id']: commit for commit in data.get('commits', [])} builds = [ self.format_build(build, commits.get(build.get('commit_id'), {})) for build in data.get('builds', []) ] estimate_time(builds) return dict( builds=builds[:4], health=health_summary(builds), name=self.repo, )
def format_build(cls, build, commit): # pylint: disable=arguments-differ """Re-format the build and commit data for the front-end. Arguments: build (:py:class:`dict`): The build data from the response. commit (:py:class:`dict`): The commit data from the response. Returns: :py:class:`dict`: The re-formatted data. """ start, finish, elapsed = elapsed_time( build.get('started_at'), build.get('finished_at'), ) return super().format_build(dict( author=commit.get('author_name'), duration=( None if start is None or finish is None else finish - start ), elapsed=elapsed, message=commit.get('message'), outcome=build.get('state'), started_at=start, ))
def watch(self, path): params = {'watch': 'true'} url = self._base_url + path header = {} if self.token: header.update({'Authorization': 'Bearer %s' % self.token}) # TODO(ivc): handle connection errors and retry on failure while True: with contextlib.closing( requests.get(url, params=params, stream=True, cert=self.cert, verify=self.verify_server, headers=header)) as response: if not response.ok: raise exc.K8sClientException(response.text) for line in response.iter_lines(delimiter='\n'): line = line.strip() if line: yield jsonutils.loads(line)
def download_current_dataset(self, dest_path='.', unzip=True): now = datetime.now().strftime('%Y%m%d') file_name = 'numerai_dataset_{0}.zip'.format(now) dest_file_path ='{0}/{1}'.format(dest_path, file_name) r = requests.get(self._dataset_url) if r.status_code!=200: return r.status_code with open(dest_file_path, "wb") as fp: for byte in r.content: fp.write(byte) if unzip: with zipfile.ZipFile(dest_file_path, "r") as z: z.extractall(dest_path) return r.status_code
def get_file_report(self, this_hash): """ Get the scan results for a file. You can also specify a CSV list made up of a combination of hashes and scan_ids (up to 4 items with the standard request rate), this allows you to perform a batch request with one single call. i.e. {'resource': '99017f6eebbac24f351415dd410d522d, 88817f6eebbac24f351415dd410d522d'}. :param this_hash: The md5/sha1/sha256/scan_ids hash of the file whose dynamic behavioural report you want to retrieve or scan_ids from a previous call to scan_file. :return: """ params = {'apikey': self.api_key, 'resource': this_hash} try: response = requests.get(self.base + 'file/report', params=params, proxies=self.proxies) except requests.RequestException as e: return dict(error=e.message) return _return_response_and_status_code(response)
def get_url_report(self, this_url, scan='0'): """ Get the scan results for a URL. (can do batch searches like get_file_report) :param this_url: a URL will retrieve the most recent report on the given URL. You may also specify a scan_id (sha256-timestamp as returned by the URL submission API) to access a specific report. At the same time, you can specify a CSV list made up of a combination of hashes and scan_ids so as to perform a batch request with one single call (up to 4 resources per call with the standard request rate). When sending multiples, the scan_ids or URLs must be separated by a new line character. :param scan: (optional): this is an optional parameter that when set to "1" will automatically submit the URL for analysis if no report is found for it in VirusTotal's database. In this case the result will contain a scan_id field that can be used to query the analysis report later on. :return: JSON response """ params = {'apikey': self.api_key, 'resource': this_url, 'scan': scan} try: response = requests.get(self.base + 'url/report', params=params, proxies=self.proxies) except requests.RequestException as e: return dict(error=e.message) return _return_response_and_status_code(response)
def get_upload_url(self): """ Get a special URL for submitted files bigger than 32MB. In order to submit files bigger than 32MB you need to obtain a special upload URL to which you can POST files up to 200MB in size. This API generates such a URL. :return: JSON special upload URL to which you can POST files up to 200MB in size. """ params = {'apikey': self.api_key} try: response = requests.get(self.base + 'file/scan/upload_url', params=params, proxies=self.proxies) except requests.RequestException as e: return dict(error=e.message) if response.status_code == requests.codes.ok: return response.json()['upload_url'] else: return dict(response_code=response.status_code)
def get_file_behaviour(self, this_hash): """ Get a report about the behaviour of the file in sand boxed environment. VirusTotal runs a distributed setup of Cuckoo sandbox machines that execute the files we receive. Execution is attempted only once, upon first submission to VirusTotal, and only Portable Executables under 10MB in size are ran. The execution of files is a best effort process, hence, there are no guarantees about a report being generated for a given file in our dataset. If a file did indeed produce a behavioural report, a summary of it can be obtained by using the file scan lookup call providing the additional HTTP POST parameter allinfo=1. The summary will appear under the behaviour-v1 property of the additional_info field in the JSON report. :param this_hash: The md5/sha1/sha256 hash of the file whose dynamic behavioural report you want to retrieve. :return: full JSON report of the file's execution as returned by the Cuckoo JSON report encoder. """ params = {'apikey': self.api_key, 'hash': this_hash} try: response = requests.get(self.base + 'file/behaviour', params=params, proxies=self.proxies) except requests.RequestException as e: return dict(error=e.message) return _return_response_and_status_code(response)
def get_file_distribution(self, before='', after='', reports='false', limit='1000'): """ Get a live feed with the latest files submitted to VirusTotal. Allows you to retrieve a live feed of absolutely all uploaded files to VirusTotal, and download them for further scrutiny. This API requires you to stay synced with the live submissions as only a backlog of 6 hours is provided at any given point in time. :param before: (optional) Retrieve files received before the given timestamp, in timestamp descending order. :param after: (optional) Retrieve files received after the given timestamp, in timestamp ascending order. :param reports: (optional) Include the files' antivirus results in the response. Possible values are 'true' or 'false' (default value is 'false'). :param limit: (optional) Retrieve limit file items at most (default: 1000). :return: JSON response: please see https://www.virustotal.com/en/documentation/private-api/#file-distribution """ params = {'apikey': self.api_key, 'before': before, 'after': after, 'reports': reports, 'limit': limit} try: response = requests.get(self.base + 'file/distribution', params=params, proxies=self.proxies) except requests.RequestException as e: return dict(error=e.message) return _return_response_and_status_code(response)
def get_url_report(self, this_url, scan='0', allinfo=1): """ Get the scan results for a URL. :param this_url: A URL for which you want to retrieve the most recent report. You may also specify a scan_id (sha256-timestamp as returned by the URL submission API) to access a specific report. At the same time, you can specify a CSV list made up of a combination of urls and scan_ids (up to 25 items) so as to perform a batch request with one single call. The CSV list must be separated by new line characters. :param scan: (optional) This is an optional parameter that when set to "1" will automatically submit the URL for analysis if no report is found for it in VirusTotal's database. In this case the result will contain a scan_id field that can be used to query the analysis report later on. :param allinfo: (optional) If this parameter is specified and set to "1" additional info regarding the URL (other than the URL scanning engine results) will also be returned. This additional info includes VirusTotal related metadata (first seen date, last seen date, files downloaded from the given URL, etc.) and the output of other tools and datasets when fed with the URL. :return: JSON response """ params = {'apikey': self.api_key, 'resource': this_url, 'scan': scan, 'allinfo': allinfo} try: response = requests.get(self.base + 'url/report', params=params, proxies=self.proxies) except requests.RequestException as e: return dict(error=e.message) return _return_response_and_status_code(response)
def get_url_distribution(self, after=None, reports='true', limit=1000): """ Get a live feed with the lastest URLs submitted to VirusTotal. Allows you to retrieve a live feed of URLs submitted to VirusTotal, along with their scan reports. This call enables you to stay synced with VirusTotal URL submissions and replicate our dataset. :param after: (optional) Retrieve URLs received after the given timestamp, in timestamp ascending order. :param reports: (optional) When set to "true" each item retrieved will include the results for each particular URL scan (in exactly the same format as the URL scan retrieving API). If the parameter is not specified, each item returned will only contain the scanned URL and its detection ratio. :param limit: (optional) Retrieve limit file items at most (default: 1000). :return: JSON response """ params = {'apikey': self.api_key, 'after': after, 'reports': reports, 'limit': limit} try: response = requests.get(self.base + 'url/distribution', params=params, proxies=self.proxies) except requests.RequestException as e: return dict(error=e.message) return _return_response_and_status_code(response)
def get_ip_report(self, this_ip): """ Get information about a given IP address. Retrieves a report on a given IP address (including the information recorded by VirusTotal's Passive DNS infrastructure). :param this_ip: A valid IPv4 address in dotted quad notation, for the time being only IPv4 addresses are supported. :return: JSON response """ params = {'apikey': self.api_key, 'ip': this_ip} try: response = requests.get(self.base + 'ip-address/report', params=params, proxies=self.proxies) except requests.RequestException as e: return dict(error=e.message) return _return_response_and_status_code(response)
def get_domain_report(self, this_domain): """ Get information about a given domain. Retrieves a report on a given domain (including the information recorded by VirusTotal's passive DNS infrastructure). :param this_domain: A domain name. :return: JSON response """ params = {'apikey': self.api_key, 'domain': this_domain} try: response = requests.get(self.base + 'domain/report', params=params, proxies=self.proxies) except requests.RequestException as e: return dict(error=e.message) return _return_response_and_status_code(response)
def get_comments(self, resource, before=None): """ Get comments for a file or URL. Retrieve a list of VirusTotal Community comments for a given file or URL. VirusTotal Community comments are user submitted reviews on a given item, these comments may contain anything from the in-the-wild locations of files up to fully-featured reverse engineering reports on a given sample. :param resource: Either an md5/sha1/sha256 hash of the file or the URL itself you want to retrieve. :param before: (optional) A datetime token that allows you to iterate over all comments on a specific item whenever it has been commented on more than 25 times. :return: JSON response - The application answers with the comments sorted in descending order according to their date. """ params = dict(apikey=self.api_key, resource=resource, before=before) try: response = requests.get(self.base + 'comments/get', params=params, proxies=self.proxies) except requests.RequestException as e: return dict(error=e.message) return _return_response_and_status_code(response)
def WaitForFileServerToStart(port): """ Wait for the Flask file server to start up. Test it by trying the PyUpdater update URL, e.g. http://127.0.0.1:12345. If we receive a ConnectionError, we continue waiting, but if we receive an HTTP response code (404), we return True. For a frozen app, e.g. a Mac .app bundle, the location of the updates must be supplied by an environment variable, whereas when running from the source repo, the location of the updates is likely to be ./pyu-data/deploy/ """ url = 'http://%s:%s/fileserver-is-ready' % (LOCALHOST, port) attempts = 0 while True: try: attempts += 1 requests.get(url, timeout=1) return True except requests.exceptions.ConnectionError: time.sleep(0.25) if attempts > 10: logger.warning("WaitForFileServerToStart: timeout") return
def checkFactorDB(n): """See if the modulus is already factored on factordb.com, and if so get the factors""" # Factordb gives id's of numbers, which act as links for full number # follow the id's and get the actual numbers r = requests.get('http://www.factordb.com/index.php?query=%s' % str(n)) regex = re.compile("index\.php\?id\=([0-9]+)", re.IGNORECASE) ids = regex.findall(r.text) # These give you ID's to the actual number num = len(ids)-2 print(ids) print(num) if num < 2: return 0 else: return num * (num-1) / 2 #print solve('1ELuX8Do1NDSMy4eV8H82dfFtTvKaqYyhg')
def get_audit_actions(self, date_modified, offset=0, page_length=100): """ Get all actions created after a specified date. If the number of actions found is more than 100, this function will page until it has collected all actions :param date_modified: ISO formatted date/time string. Only actions created after this date are are returned. :param offset: The index to start retrieving actions from :param page_length: How many actions to fetch for each page of action results :return: Array of action objects """ logger = logging.getLogger('sp_logger') actions_url = self.api_url + 'actions/search' response = self.authenticated_request_post( actions_url, data=json.dumps({ "modified_at": {"from": str(date_modified)}, "offset": offset, "status": [0, 10, 50, 60] }) ) result = self.parse_json(response.content) if response.status_code == requests.codes.ok else None self.log_http_status(response.status_code, 'GET actions') if result is None or None in [result.get('count'), result.get('offset'), result.get('total'), result.get('actions')]: return None return self.get_page_of_actions(logger, date_modified, result, offset, page_length)
def recursive_scrape(url, count=0): # The API seems to link the images in a loop, so we can stop once we see an # image we have already seen. if url in seen_photos: return seen_photos[url] = True page = requests.get(url) photo_json = page.json() print photo_json yield photo_json next_url = 'https://earthview.withgoogle.com' + photo_json['nextApi'] # Yielding from recursive functions is a bit funky for photo_json in recursive_scrape(next_url, count + 1): yield photo_json
def facebook_id_to_username(self, facebook_id): """ Converts a Facebook ID to a username. Args: facebook_id: A string representing a Facebook ID. Returns: A string representing the username corresponding to facebook_id. """ # Store username in self.id_to_username if we have not seen the user ID yet if facebook_id not in self.id_to_username: graph_api_request = 'https://graph.facebook.com/' + facebook_id + '?fields=name&access_token=' + self.ACCESS_TOKEN response_dict = requests.get(graph_api_request).json() try: username = response_dict['name'] except KeyError: self.id_to_username[facebook_id] = facebook_id + '@facebook.com' else: self.id_to_username[facebook_id] = username return self.id_to_username[facebook_id]
def slack(text: hug.types.text): """Returns JSON containing an attachment with an image url for the Slack integration""" title = text if text == 'top250': top250_res = requests.get(IMDB_URL + '/chart/toptv', headers={'Accept-Language': 'en'}) top250_page = html.fromstring(top250_res.text) candidates = top250_page.xpath('//*[@data-caller-name="chart-top250tv"]//tr/td[2]/a') title = random.choice(candidates).text return dict( response_type='in_channel', attachments=[ dict(image_url=GRAPH_URL + f'/graph?title={quote(title)}&uuid={uuid.uuid4()}') ] )
def saveFile(self, url, page, idx): user_define_name = self.now_date() + '_p_' + str(page) + '_' + string.zfill(idx, 2) # ??2? file_ext = self.file_extension(url) # ??? save_file_name = user_define_name + "_" + file_ext # ???????open?? # urllib.urlretrieve(item[0], self.save_path + save_file_name) # ???? url = self.CheckUrlValidate(url) try: pic = requests.get(url, timeout=30) f = open(self.store_dir + os.sep + save_file_name, 'wb') f.write(pic.content) f.close() print '\ndone save file ' + save_file_name except ReadTimeout: print 'save file %s failed. cause by timeout(30)' %(save_file_name) except Exception, e: print 'this python version does not support https.' print e #??url????http:??
def GetTotalPage(self, html): # create the BeautifulSoup some_soup = BeautifulSoup(html) #get the page div ele_a = some_soup.find('div', attrs={'class': 'page'}) #get the last div>a text='??' last_a = ele_a.findAll('a')[-1] #substr 0:.html pagenum = last_a.get('href')[:-5] print 'pagenum :', pagenum # print type(last_a) self.SaveTotalPageToFile(pagenum) # store the max page number to totalpage.ini #new_page_num: new max page num
def new_session(account): if account.get('session',None) is None: session = requests.session() session.verify = True session.headers.update({'User-Agent': 'Niantic App'}) # session.headers.update({'User-Agent': 'niantic'}) if not account['proxy'] is None: session.proxies.update(account['proxy']) account['session'] = session else: account['session'].close() account['session'].cookies.clear() account['session_time'] = get_time() account['session_hash'] = os.urandom(32) account['api_url'] = API_URL account['auth_ticket'] = None
def get_watchlist_id_by_name(watchlistsdict): """ For each watchlist name specified in the config file, find the associated watchlist ID. NOTE: We trigger on watchlist IDs, and not on watchlist names """ global cbtoken global cbserver headers = {'X-AUTH-TOKEN': cbtoken} r = requests.get("https://%s/api/v1/watchlist" % (cbserver), headers=headers, verify=False) parsed_json = json.loads(r.text) for watchlist in parsed_json: for key, value in watchlistsdict.iteritems(): if watchlist['name'].lower() == key.lower(): watchlistsdict[key] = watchlist['id']
def get_or_create(self, model, field, value, **kwargs): """ Retrieves object of class `model` with lookup key `value` from the cache. If not found, creates the object based on `field=value` and any other `kwargs`. Returns a tuple of `(object, created)`, where `created` is a boolean specifying whether an `object` was created. """ result = self[model].get(value) created = False if not result: kwargs[field] = value result = model.objects.create(**kwargs) self[model][value] = result created = True return result, created
def tags(self): tags = self._fixed_tags if self._type == 'reply': # NOTE replies don't ever get put in public directly out_tags = [] for tag in tags: if tag.startswith('RRID:'): continue # we deal with the RRID itself in def rrid(self) elif tag == self.INCOR_TAG and self.rrid: continue else: out_tags.append(tag) if self.corrected: out_tags.append(self.CORR_TAG) return sorted(out_tags) else: return [t for t in tags if not t.startswith('RRID:')] # let self.rrid handle the rrid tags
def clean_dupes(get_annos, repr_issues=False): annos = get_annos() seen = set() dupes = [a.id for a in annos if a.id in seen or seen.add(a.id)] preunduped = [a for a in annos if a.id in dupes] for id_ in dupes: print('=====================') anns = sorted((a for a in annos if a.id == id_), key=lambda a: a.updated) if not repr_issues: [print(a.updated, HypothesisHelper(a, annos)) for a in anns] for a in anns[:-1]: # all but latest annos.remove(a) deduped = [a for a in annos if a.id in dupes] assert len(preunduped) // len(dupes) == 2, 'Somehow you have managed to get more than 1 duplicate!' # get_annos.memoize_annos(annos) embed()
def get_balance(address): """ Retrieves the balance from etherscan.io. The balance is returned in ETH rounded to the second decimal. """ address = PyWalib.address_hex(address) url = 'https://api.etherscan.io/api' url += '?module=account&action=balance' url += '&address=%s' % address url += '&tag=latest' if ETHERSCAN_API_KEY: '&apikey=%' % ETHERSCAN_API_KEY # TODO: handle 504 timeout, 403 and other errors from etherscan response = requests.get(url) response_json = response.json() PyWalib.handle_etherscan_error(response_json) balance_wei = int(response_json["result"]) balance_eth = balance_wei / float(pow(10, 18)) balance_eth = round(balance_eth, ROUND_DIGITS) return balance_eth
def get_cids(self, cas): """ Use the PubChem API to get the CID :param cas: string - CAS identifier :return: list of CIDs """ uri = "http://pubchem.ncbi.nlm.nih.gov/rest/pug/compound/name/%s/cids/json" \ "?email=%s" try: response = get((uri % (cas, app.config['ADMIN_EMAIL']))).json() try: cids = response['IdentifierList']['CID'] return cids except KeyError: return None except (exceptions.ConnectionError, TimeoutError, exceptions.Timeout, exceptions.ConnectTimeout, exceptions.ReadTimeout) as e: # Error. return the error and the CAS number that this error occured on sys.stderr.write("Error: %s. Occurred on CAS: %s", (e, cas)) sys.stderr.flush() sys.stdout.flush()
def getRosiItem(): start = time.time() index = 1 while True: url = "http://www.mmxyz.net/category/rosi/page/{}/".format(index) res = requests.get(url,timeout=10) if res.status_code == 404: print("+ Time: {:.2f} S +".format(time.time()-start)) print("+ Total Pages: {} +".format(index-1)) print("+ Total Numbers: {} +".format(len(RosiItems))) print("+-------------------------+\r\n\r\n") return soup = BeautifulSoup(res.content, "html.parser") rosiList = soup.find_all("a", class_="inimg") for rosi in rosiList: RosiItems.append(rosi['href']) index += 1
def getRosiItem(): start = time.time() index = 1 while True: url = "http://www.mmxyz.net/category/disi/page/{}/".format(index) res = requests.get(url,timeout=10) if res.status_code == 404: print("+ Time: {:.2f} S +".format(time.time()-start)) print("+ Total Pages: {} +".format(index-1)) print("+ Total Numbers: {} +".format(len(RosiItems))) print("+-------------------------+\r\n\r\n") return soup = BeautifulSoup(res.content, "html.parser") rosiList = soup.find_all("a", class_="inimg") for rosi in rosiList: RosiItems.append(rosi['href']) index += 1