我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用requests.head()。
def qiniu_upload_img(img_url): """????????? Args: img_url (string): ???? """ response = requests.get(img_url) image = response.content md5 = calc_md5(image) qiniu_url = 'http://{}/{}'.format(QINIU_HOSTNAME, md5) if requests.head(qiniu_url).ok: return qiniu_url q = Auth(QINIU_ACCESS_KEY, QINIU_SECRET_KEY) token = q.upload_token(QINIU_BUCKET, md5, 10) put_data(token, md5, image, mime_type='image/jpeg') return qiniu_url
def register(self): u"Register resource into CKAN" import ckanapi ckansite = ckanapi.RemoteCKAN(self.ckan_url, apikey=self.apikey) # resource url responds? resource = requests.head(self.url) self.size = int(resource.headers["content-length"]) # resource exists? resources = ckansite.action.resource_search(query=u"url:%s" % self.url) if resources[u"count"] == 0: ckansite.action.resource_create( package_id = self.package_id, url = self.url, name = self.name, description = self.description, format = self.format, mimetype = self.mimetype, size = self.size, )
def _is_update_needed(self): """ Determines if an update for the database is necessary :return: Whether or not the database should be updated """ # Retrieve the headers of the response to compare the MD5 checksums of the file headers_response = requests.head(self.MAXMIND_FREE_DB_URL) logger.debug(headers_response.headers) headers_response.raise_for_status() response_checksum = headers_response.headers.get('X-Database-MD5') logger.info("Database MD5 received in response headers: " + str(response_checksum)) # Compare the current file checksum to the one received from MaxMind if response_checksum is not None and response_checksum == self._database_checksum: return False self._response_checksum = response_checksum return True
def head_account_metadata(self): # get account's metadata url = self.url # request api response = requests.head(url, headers=self.base_headers) # formatting result result = dict() for key in response.headers: if key == 'X-Account-Container-Count': result['container_count'] = \ response.headers['X-Account-Container-Count'] elif key == 'X-Account-Object-Count': result['object_count'] = \ response.headers['X-Account-Object-Count'] elif key == 'X-Account-Bytes-Used': result['used_bytes'] = replace_bytes_to_readable( response.headers['X-Account-Bytes-Used'] ) else: result[key] = response.headers[key] return result
def head_container_metadata(self, container_name): """ get container's metadata :param container_name: target container name :return: container's metadata dict """ # check container metadata url = self.url + '/' + container_name # request api response = requests.head(url, headers=self.base_headers) # formatting result result = dict() for key in response.headers: if key == 'X-Container-Object-Count': result['object_count'] = \ response.headers['X-Container-Object-Count'] elif key == 'X-Container-Bytes-Used': result['used_bytes'] = replace_bytes_to_readable( response.headers['X-Container-Bytes-Used'] ) else: result[key] = response.headers[key] return result
def get_layer_size(self, layer_hash): """ Attempt to return the size of the given layer """ url = "{base_url}/v2/{name}/blobs/{layer_hash}".format( base_url=self.base_url, name=self.repository_name, layer_hash=layer_hash) headers = {} if self.token is not None: headers["Authorization"] = "Bearer %s" % self.token r = requests.head(url, headers=headers, allow_redirects=True, timeout=(3.05,5)) r.raise_for_status() if "content-length" in r.headers: self.layer_sizes[layer_hash] = int(r.headers["content-length"]) else: self.layer_sizes[layer_hash] = None return self.layer_sizes[layer_hash]
def getDLsize(self): debug_log("getDLsize called") it = QTreeWidgetItemIterator(self.tw) while it.value(): item = it.value() url_test = item.data(0, dataURL) if url_test is not None: try: r = requests.head(url_test) r.raise_for_status() try: size = (int(r.headers['Content-Length']) / 1024) / 1024 except ValueError: size = 0 if size > 0: item.setText(2, "{} MiB".format(round(size, 2))) except requests.exceptions.HTTPError: debug_log("Error {} getting DL size: {}".format(r.status_code, r.headers)) item.setText(2, r.status_code) except requests.exceptions.RequestException as e: item.setText(2, self.tr("Error")) debug_log(e, logging.ERROR) it += 1
def head(self): try: response = requests.head(self.url, timeout=self.request_timeout) self.res = response # assign the response object from requests to a property on the instance of HTTP class return response.headers except requests.exceptions.ConnectionError as ce: return False except Exception as e: if DEBUG_FLAG: sys.stderr.write("Naked Framework Error: Unable to perform a HEAD request with the head() method (Naked.toolshed.network.py).") raise e #------------------------------------------------------------------------------ # [ post method ] (string) # HTTP POST request for text # returns text from the URL as a string #------------------------------------------------------------------------------
def _get_wp_api_url(self, url): """ Private function for finding the WP-API URL. Arguments --------- url : str WordPress instance URL. """ resp = requests.head(url) # Search the Links for rel="https://api.w.org/". wp_api_rel = resp.links.get('https://api.w.org/') if wp_api_rel: return wp_api_rel['url'] else: # TODO: Rasie a better exception to the rel doesn't exist. raise Exception
def already_cached(self, asset_url): '''Checks if an item is already cached. This is indicated in the headers of the file being checked.''' try: req = requests.head(asset_url, headers={'user-agent': self.user_agent}) # NOQA if req.headers.get('Content-Type') is not None: # Item is not in cache self.log.debug('Not in cache: %s' % asset_url) return False else: # Item is already cached self.log.info('Already in cache: %s' % asset_url) return True except: # In case there is an error, we should return false anyway as there # is no harm in re-downloading the item if it's already downloaded. return False
def get_normal_title(self,url): return_message = "" head = requests.head(url) max_size = 5e6 if 'content-length' in head.headers and int(head.headers['content-length']) > max_size: return_message = "File too big for link preview\r\n" else: with eventlet.Timeout(60, False): response = requests.get(url,timeout=30) if response.status_code == 200: if 'text/html' in response.headers['content-type']: soup = BeautifulSoup(response.content,"lxml") if soup.title is not None: return_message += soup.title.string + "\r\n" else: return_message += response.headers['content-type'] + " Size: " + response.headers['content-length'] + "\r\n" return return_message
def find_decl_doc(self, name): raise IncludeError(name) import requests from requests.exceptions import InvalidSchema url = METATAB_ASSETS_URL + name + '.csv' try: # See if it exists online in the official repo r = requests.head(url, allow_redirects=False) if r.status_code == requests.codes.ok: return url except InvalidSchema: pass # It's probably FTP
def get_minutes_url(config, metadata): start_date = parse_timestamp_naively(metadata['start']).astimezone(get_tz(config)) meeting_type = None for key, value in config.get('minutes_abbrs', {}).items(): if key in metadata[config['minutes_abbrs_for']]: meeting_type = value break minutes_url = metadata.get('minutes_url') if minutes_url: requests.head(minutes_url).raise_for_status() return minutes_url elif config['id'] == 'vancouver': if not meeting_type: return 'N/A' if metadata['title'] == 'Inaugural Council Meeting': meeting_type = 'inau' mins = 'http://council.vancouver.ca/{dt:%Y%m%d}/{type}{dt:%Y%m%d}ag.htm'.format( type=meeting_type, dt=start_date) requests.head(mins).raise_for_status() return mins return 'N/A'
def check_pnda_mirror(): def raise_error(reason): CONSOLE.info('PNDA mirror...... ERROR') CONSOLE.error(reason) CONSOLE.error(traceback.format_exc()) sys.exit(1) try: mirror = PNDA_ENV['mirrors']['PNDA_MIRROR'] response = requests.head(mirror) # expect 200 (open mirror) 403 (no listing allowed) # or any redirect (in case of proxy/redirect) if response.status_code not in [200, 403, 301, 302, 303, 307, 308]: raise_error("PNDA mirror configured and present " "but responded with unexpected status code (%s). " % response.status_code) CONSOLE.info('PNDA mirror...... OK') except KeyError: raise_error('PNDA mirror was not defined in pnda_env.yaml') except: raise_error("Failed to connect to PNDA mirror. Verify connection " "to %s, check mirror in pnda_env.yaml and try again." % mirror)
def _len(url): headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML\ , like Gecko) Chrome/50.0.2661.102 Safari/537.36', 'Range': 'bytes=0-0', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8' } with requests.get(url, headers=headers) as r: length = r.headers['Content-Range'] if length.find('0-0/') == -1: length = None else: length = length.split('0-0/')[-1] length = int(length) if length else 0 if not length: del headers['Range'] with requests.head(url, headers=headers) as r: length = r.headers['Content-Length'] length = int(length) if length else 0 return length
def _scan_target_normal(self,target): try: r=requests.head(target) #it maybe a directory if self._check_eponymous_dir(r,target): return target+'/;' if self._check_exist_code(r.status_code): # check content r=requests.get(target) if self._check_exist_code(r.status_code): for cur in self._not_exist_flag: if r.text.find(cur)!=-1: return '' return target+';' return '' except Exception as e: return ''
def validate_media_url(self): """ Validates the media URL Raises: 3PlayMediaUrlError: on invalid media url or content type """ if not self.media_url: raise ThreePlayMediaUrlError('Invalid media URL "{media_url}".'.format(media_url=self.media_url)) response = requests.head(url=self.media_url) if not response.ok: raise ThreePlayMediaUrlError('The URL "{media_url}" is not Accessible.'.format(media_url=self.media_url)) elif response.headers['Content-Type'] != self.allowed_content_type: raise ThreePlayMediaUrlError( 'Media content-type should be "{allowed_type}". URL was "{media_url}", content-type was "{type}"'.format( # pylint: disable=line-too-long allowed_type=self.allowed_content_type, media_url=self.media_url, type=response.headers['Content-Type'], ) )
def _check_website(url, code): try: response = requests.head(url, headers={'user_agent': DEFAULT_HEADERS}) website_status_code = response.status_code if code: return website_status_code else: if website_status_code == 200: return True else: return False except requests.exceptions.Timeout, requests.exceptions.ConnectionError: if code: return -1 else: return False
def sync(self, attempt_perceptual_hash=False): """Attempt to sync the image with the remote location. Optionally does a number of other actions: * If the image exists and we have not constructed a fingerprint for it, do so at this time and populate the image model's perceptual_hash field (if attempt_perceptual_hash is True; defaults to False because it's slow) * If the image does not exist, mark it as removed_from_source * If the image is removed_from_source, delete it from the search engine * In all cases, update the last_synced_with_source timestamp on the image""" req = requests.head(self.url) if req.status_code == 200: if not self.perceptual_hash and attempt_perceptual_hash: self.perceptual_hash = self.generate_hash() else: self.removed_from_source = True self.last_synced_with_source = timezone.now() self.save()
def do_http_request(self, method, endpoint, data=None, files=None, timeout=100, only_headers=False, custom_header=None): if only_headers is True: return requests.head(endpoint) if method == 'post': action = requests.post else: action = requests.get if custom_header: headers = {'user-agent': custom_header} else: headers = {'user-agent': str(Etiquette())} if method == 'post': result = action(endpoint, data=data, files=files, timeout=timeout, headers=headers) else: result = action(endpoint, params=data, timeout=timeout, headers=headers) if self.throttle is True: self._update_rate_limits(result.headers) sleep(self.throttling_time) return result
def get_url_file_size(url, proxies=None): """???????????????KB :param proxies: :param url: :return: """ r = requests.head(url=url, proxies=proxies, timeout=3.0) while r.is_redirect: # ?????? # print 'got' # print r.headers['Location'] r = requests.head(url=r.headers['Location'], proxies=proxies, timeout=3.0) # print r.headers # print r.headers['Content-Length'] return int(r.headers['Content-Length']) / 1024
def load(self, url): """Load script from URL.""" # Check for a redirect to get a final base_url where to start resp = requests.head(url, allow_redirects=True) if url != resp.url: # Followed a redirect url = resp.url fname = get_response_fname(resp) _, ext = os.path.splitext(fname) if ext == ".py": py_file = self.fetch_file(url, url) return py_file else: self.crawl(url, url)
def fuzzworth_contentl(self, head, strict = False): '''Check the content-length before moving on. If strict is set to True content-length must be validated before moving on. If strict is set to False (default) the URL will be fuzzed if no content-length is found or no head is returned. The idea behind this method is to ensure that huge files are not downloaded, slowing down fuzzing.''' #no param no fuzzing if head and "content-length" in head: content_length = int(head["content-length"]) if content_length < self.pagesize_limit: return True else: return False else: if strict: return False else: return True
def fuzzworth_contentl_with_type_fallback(self, head, allowed_types_lst, full_req_match = False): '''This method will check the content length header strictly. If a content-length header is found and the content-length is below a certain amount (set in the fuzzer config) return True if either of those is not satisfied,. If it is not check the content-type, if the content-type is of a certain type return True. If not or if content-type can't be read either, return False.''' #if content-length header is found return True if head and "content-length" in head: return self.fuzzworth_contentl(head, strict = True) #if content-length header not found, check content-type, if it is of allowed type #return True, otherwise false if head and "content-type" in head: return self.fuzzworth_content_type(head, allowed_types_lst, full_req_match = False, strict = True) return False
def exce(indexOfResult,indexOfChar,queryASCII): # content-start url = "http://127.0.0.1/Less-9/?id=" tempurl = url + getPayload(indexOfResult,indexOfChar,queryASCII) before_time = time.time() requests.head(tempurl) after_time = time.time() # content-end use_time = after_time - before_time # judge-start # ?sleep????? , ???????? (????????????? , ???SQL??????????sleep????????) # ?????????? , ????/???????sleep????????? if abs(use_time) > error_time: return True else: return False # judge-end
def validate_href(image_href): """Validate HTTP image reference. :param image_href: Image reference. :raises: exception.ImageRefValidationFailed if HEAD request failed or returned response code not equal to 200. :returns: Response to HEAD request. """ try: response = requests.head(image_href) if response.status_code != http_client.OK: raise exception.ImageRefValidationFailed( image_href=image_href, reason=("Got HTTP code %s instead of 200 in response to " "HEAD request." % response.status_code)) except requests.RequestException as e: raise exception.ImageRefValidationFailed(image_href=image_href, reason=e) return response
def get_remote_source_length(): url = os.environ.get('SP_REMOTE_SOURCE') try: response = requests.head(url, allow_redirects=True, timeout=10) except requests.exceptions.RequestException as e: puts(colored.red('[HEAD] %s' % url)) puts(colored.red('Failed to get remote installation size: %s' % e)) sys.exit(1) size = response.headers.get('content-length') if not size: size = response.headers.get('Content-Length') if not size or not size.isdigit(): puts(colored.red('Could not fetch the remote Content-Length.')) sys.exit(1) try: size = int(size) except ValueError: pass return size
def _get_imageId(self, ImageName, tag="latest"): """ ??????tag?imageId/digest """ ReqUrl = self._baseUrl + "/repositories/{}/tags/{}".format(ImageName, tag) if self.version == 1 else self._baseUrl + "/{}/manifests/{}".format(ImageName, tag) logger.info("_get_imageId for url {}".format(ReqUrl)) try: if self.version == 1: r = requests.get(ReqUrl, timeout=self.timeout, verify=self.verify) else: r = requests.head(ReqUrl, timeout=self.timeout, verify=self.verify, allow_redirects=True, headers={"Content-Type": "application/vnd.docker.distribution.manifest.v2+json"}) except Exception,e: logger.error(e, exc_info=True) return False else: if self.version == 1: return r.json() else: return r.headers.get("Docker-Content-Digest", "") return ""
def check_download_session(url, download_dir, cookies): r = requests.head(url, cookies=cookies) if r.status_code != 200 or 'Content-Disposition' not in r.headers: return False m = re.search('filename="(.+)"', r.headers['Content-Disposition']) if not m: return False f = m.group(1) filename = os.path.join(download_dir, f) if os.path.isfile(filename): return True # get it print "Fetching %s" % f r2 = requests.get(url, cookies=cookies) if r2.status_code != 200: return False with open(filename, 'wb') as f: f.write(r2.content) return True
def _scan(self): while True: if self.queue.empty(): break try: sub = self.queue.get_nowait() self.queue.task_done() domain = self.target + sub r = requests.head(domain, headers=header, timeout=5, stream=True) code = r.status_code if code in self.status_code: logger.info('status code {} -> {}'.format(code, domain)) except Exception, e: pass self.thread_count -= 1
def _from_image_tag_getId(self, ImageName, tag, url, version=1): """ ??????tag?imageId/digest """ if url: ReqUrl = url.strip("/") + "/v1/repositories/{}/tags/{}".format(ImageName, tag) if version == 1 else url.strip("/") + "/v2/{}/manifests/{}".format(ImageName, tag) logger.info("_from_image_tag_getId for url {}".format(ReqUrl)) try: if version == 1: r = requests.get(ReqUrl, timeout=self.timeout, verify=self.verify) else: r = requests.head(ReqUrl, timeout=self.timeout, verify=self.verify, allow_redirects=True, headers={"Content-Type": "application/vnd.docker.distribution.manifest.v2+json"}) except Exception,e: logger.error(e, exc_info=True) else: if version == 1: return r.json() else: return r.headers.get("Docker-Content-Digest", "") return ""
def setURL(self,update=None): #if no valid path, do nothing: if self.path == None or self.grid == None or self.var == None or\ self.ext == None or self.year == None: self.url = None else: self.url = self.path + '/' + self.grid + '/' + self.var + "." +\ self.ext + self.year + ".nc" #Check if the file exists fexist = requests.head(self.url+'.html') if (fexist.status_code >= 400): self.yearList = self.yearList[1:] self.year = self.yearList[self.currentYearIndex+1] self.url = self.path + '/' + self.grid + '/' + self.var + "." +\ self.ext + self.year + ".nc" if update == True: self.NCARfile()
def run(self): while True: piece = self.queue.get() words = piece.split("\n") for word in words: url = self.target + word + self.suffix #print "[*] Trying: " + url try: r = requests.head(url) if r.status_code == 200: print "[+] 200 - " + url except: print "[*] Request Error: " + url self.queue.task_done() # http://stackoverflow.com/questions/519633/lazy-method-for-reading-big-file-in-python
def send_http_request_used_exec(self, url, method, request_body="", header="", cookie=None): if method not in ["get", "post", "put", "delete", "head", "options"]: raise Exception("Not supported method: %s" % method) _cookie_obj = cookie _response = None if header is not "": _request_api_string = "_response = requests.%s(%s, data=%s, header=%s, _cookie_obj)" % (method, url, request_body, header) else: _request_api_string = "_response = requests.%s(%s, data=%s, _cookie_obj)" % (method, url, request_body) exec _request_api_string return _response
def _is_var_ready(self, cycle, var): """ Checks if the variable var is ready for the given forecast hour by comparing its filetime to the timestamp given by the forecast hour. If the filetime is newer (later) then the variable is ready. :param cycle: which cycle are we working with (UTC) :param var: the variable identifier :return: true if the variable is ready """ # find last-modified time of file in UTC timezone url = self._remote_var_url(cycle.hour, var) r = requests.head(url) if r.status_code != 200: raise ValueError('Cannot find variable %s for hour %d at url %s' % (var, cycle.hour, url)) last_modif = self._parse_header_timestamp(r.headers['Last-Modified']) return last_modif > cycle
def setup_args(): ''' setup command line arguments ''' parser = argparse.ArgumentParser(description="Beatle request utility", formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument('--debug', action='store_true', default=False, help="enable debugging log") parser.add_argument('--shard', action='store', help='shard name or list shards by "list" value') parser.add_argument('request', action='store', help='request: post/get/head/delete/list') parser.add_argument('filename', action='store', help='file name') return parser
def sitesManager( media_url ): #picks which class will handle the media identification and extraction for website_name #first resolve url shortener shorteners=['bit.ly','goo.gl','tinyurl.com'] if any(shortener in media_url for shortener in shorteners): #v=sitesBase.requests_get('https://unshorten.me/s/'+ urllib.quote_plus( media_url ) ) v = requests.head( media_url, timeout=REQUEST_TIMEOUT, allow_redirects=True ) log(' short url(%s)=%s'%(media_url,repr(v.url))) media_url=v.url for subcls in sitesBase.__subclasses__(): regex=subcls.regex if regex: match=re.compile( regex , re.I).findall( media_url ) #log("testing:{0}[{1}] {2}".format(media_url,regex, repr(match)) ) if match : return subcls( media_url )
def check(url, timeout): try: if timeout <= 0: timeout = 20 response = requests.head(url,timeout = timeout) code = response.status_code screenLock.acquire() if code == 200: ColorPrinter.print_green_text("[ " + str(code) + " ]") print "Checking : " + url if "404" in response.text: ColorPrinter.print_blue_text(url + "\tMaybe every page same!") elif code == 404 or code == 405: pass # ColorPrinter.print_red_text("[ " + str(code) + " ]") # print "Checking : " + url else: ColorPrinter.print_blue_text("[ " + str(code) + " ]") print "Checking : " + url except Exception as e: screenLock.acquire() print e finally: screenLock.release()
def run(self): headers = { "User-Agent":"Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10_6_8; en-us) AppleWebKit/534.50 (KHTML, like Gecko) Version/5.1 Safari/534.50" } payloads = ["/excel/sso_user_export.php", "/excel/user_export.php", "/excel/server_export.php"] try: for payload in payloads: vulnurl = self.url + payload req = requests.head(vulnurl, headers=headers, timeout=10, verify=False) if r"application/vnd.ms-excel" in req.headers["Content-Type"]: cprint("[+]???????????????...(??)\tpayload: "+vulnurl, "yellow") except: cprint("[-] "+__file__+"====>????", "cyan")
def run(self): headers = { "User-Agent":"Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10_6_8; en-us) AppleWebKit/534.50 (KHTML, like Gecko) Version/5.1 Safari/534.50" } payloads = ["/data/dkcm_ssdfhwejkfs.mdb", "/_data/___dkcms_30_free.mdb", "/_data/I^(()UU()H.mdb"] for payload in payloads: vulnurl = self.url + payload try: req = requests.head(vulnurl, headers=headers, timeout=10, verify=False) if req.headers["Content-Type"] == "application/x-msaccess": cprint("[+]??dkcms???????...(??)\tpayload: "+vulnurl, "red") except: cprint("[-] "+__file__+"====>????", "cyan")
def pull_request_to_github(main_repo_user, repo_name, environment): #Creating strings, mainly URLs, that will be needed for the #pull request process. #This is the API URL to make a pull request pull_request_url = ('https://api.github.com/repos/{}/{}/pulls'.format( main_repo_user, repo_name)) #This has the username and password from the environment file. #It is used to log in for API calls. auth_string = ('{}:{}'.format(environment['github_username'], environment['github_password'])) #This is the data that will be posted for the pull request. #It tells the API what the pull request will be like. pull_request_data = ('{{"title": "{}", "head": "{}:master",' ' "base": "master"}}'.format( environment['github_pull_request_title'], environment['github_username'])) pull_request_command = ['curl', '--user', auth_string, pull_request_url, '-d', pull_request_data] #Make the pull request to the main repository subprocess.check_output(pull_request_command) return pull_request_command
def check_valid_url(repo_zip_url): #Check that it's a URL if(repo_zip_url[:7] != 'http://' and repo_zip_url[:8] != 'https://'): return False #Get just the head. request = requests.head(repo_zip_url) #It should be either success (200's) or redirect(300's). #Otherwise, inform the user of failure. if (request.status_code >= 400 or request.status_code < 200): print('Could not reach URL provided.\n') print('Provided url was {} and resulted in status code {}'.format( repo_zip_url,str(request.status_code))) return False else: return True #This sends a notification email based on information provided #in the environment file
def check_project_exist(self, project_name): result = False path = '%s://%s/api/projects?project_name=%s' % ( self.protocol, self.host, project_name) response = requests.head(path, cookies={'beegosessionID': self.session_id}) if response.status_code == 200: result = True logging.debug( "Successfully check project exist, result: {}".format(result)) elif response.status_code == 404: result = False logging.debug( "Successfully check project exist, result: {}".format(result)) else: logging.error("Fail to check project exist") return result # POST /projects
def _is_commit_ref_available(self, package, namespace): """ Check if commit ref is available on git repository """ if not package or not namespace: self.error("Missing parameter to check if commit is available") repository = "https://src.fedoraproject.org/cgit/%s/%s.git" % (namespace, package.name) if package.repository: repository = package.repository ref = "HEAD" if package.ref: ref = package.ref patch_path = "/patch/?id=%s" % ref url = repository + patch_path resp = requests.head(url) if resp.status_code < 200 or resp.status_code >= 300: self.error("Could not find ref '%s' on '%s'. returned exit status %d; output:\n%s" % (ref, package.name, resp.status_code, resp.text)) self.log.info("Found ref: %s for %s" % (ref, package.name))
def url_resp(url): """Receives a url returns True in case of 200 or 301 response codes""" res = None try: headers = { 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) \AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'} res = requests.head(url, timeout=3, headers=headers) except: pass if res: code = res.status_code print('*' + str(code) + '*') if code == 200 or code == 301: print(url) print('#ok#') return True else: print('#bad#')
def back_online_notifier(the_url): print("found " + the_url + " back online.") try: old_status_file = pickle.load(open("status.p", "rb")) except Exception as ex: print("The status.p file was not found. it will be recreated." + str(ex)) return if (the_url in old_status_file) and (old_status_file[the_url]['status'] == "down"): it_was_down_time = old_status_file[the_url]['time'] current_time = datetime.datetime.now().replace(microsecond=0) send_message(CHANNEL_ID, ":herb: " + the_url + " is back online. It was down for " + str(current_time - it_was_down_time)) else: print("skipping notifying that the url is online") # --- getting only the head ---
def total_blocks_and_bytes(self): total_blocks, total_bytes = 0, 0 for u in self.input_urls: head_response_headers = requests.head(u).headers if 'Content-Length' not in head_response_headers: m = "The url: '{}' doesn't support the 'Content-Length' field.".format(u) raise ContentLengthNotSupportedException(m) else: remote_size = int(head_response_headers['Content-Length']) total_bytes += remote_size num_blocks, last_block_size = divmod(remote_size, self.blocksize) total_blocks += num_blocks if last_block_size: total_blocks += 1 return total_blocks, total_bytes