我们从Python开源项目中,提取了以下47个代码示例,用于说明如何使用urllib.request.add_header()。
def _try_send(self, ip, port, body, header, data): try: request = urllib.request.Request('http://%s:%s/upnp/control/basicevent1' % (ip, port)) request.add_header('Content-type', 'text/xml; charset="utf-8"') request.add_header('SOAPACTION', header) request_body = '<?xml version="1.0" encoding="utf-8"?>' request_body += '<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">' request_body += '<s:Body>%s</s:Body></s:Envelope>' % body request.data = request_body.encode() result = urllib.request.urlopen(request, timeout=3) return self._extract(result.read().decode(), data) except Exception as e: # except: # raise print(str(e)) return None
def getWebPage(url, headers, cookies, postData=None): try: if (postData): params = urllib.parse.urlencode(postData) params = params.encode('utf-8') request = urllib.request.Request(url, data=params, headers=headers) else: print('Fetching '+url) request = urllib.request.Request(url, None, headers) request.add_header('Cookie', cookies) if (postData): response = urllib.request.build_opener(urllib.request.HTTPCookieProcessor).open(request) else: response = urllib.request.urlopen(request) if response.info().get('Content-Encoding') == 'gzip': buf = BytesIO(response.read()) f = gzip.GzipFile(fileobj=buf) r = f.read() else: r = response.read() return r except Exception as e: print("Error processing webpage: "+str(e)) return None ## https://stackoverflow.com/questions/480214/how-do-you-remove-duplicates-from-a-list-in-python-whilst-preserving-order
def play(self, txt): encText = urllib.parse.quote(txt) data = "speaker=" + self.speaker + "&speed=" + self.speed + "&text=" + encText; request = urllib.request.Request(url) request.add_header("X-Naver-Client-Id",client_id) request.add_header("X-Naver-Client-Secret",client_secret) response = urllib.request.urlopen(request, data=data.encode('utf-8')) rescode = response.getcode() if(rescode==200): response_body = response.read() with open(tmpPlayPath, 'wb') as f: f.write(response_body) #?? ???? ?? vlc os.system('cvlc ' + tmpPlayPath + ' --play-and-exit') #?????? #os.system('omxplayer ' + tmpPlayPath) else: print("Error Code:" + rescode)
def authenticate_with_apikey(self, api_key, scope=None): """perform authentication by api key and store result for execute_request method api_key -- secret api key from account settings scope -- optional scope of authentication request. If None full list of API scopes will be used. """ scope = "auto" if scope is None else scope data = { "grant_type": "client_credentials", "scope": scope } encoded_data = urllib.parse.urlencode(data).encode() request = urllib.request.Request(self.auth_endpoint, encoded_data, method="POST") request.add_header("ContentType", "application/x-www-form-urlencoded") request.add_header("Authorization", 'Basic ' + base64.standard_b64encode(('APIKEY:' + api_key).encode()).decode()) response = urllib.request.urlopen(request) self._token = WaApiClient._parse_response(response) self._token.retrieved_at = datetime.datetime.now()
def authenticate_with_contact_credentials(self, username, password, scope=None): """perform authentication by contact credentials and store result for execute_request method username -- typically a contact email password -- contact password scope -- optional scope of authentication request. If None full list of API scopes will be used. """ scope = "auto" if scope is None else scope data = { "grant_type": "password", "username": username, "password": password, "scope": scope } encoded_data = urllib.parse.urlencode(data).encode() request = urllib.request.Request(self.auth_endpoint, encoded_data, method="POST") request.add_header("ContentType", "application/x-www-form-urlencoded") auth_header = base64.standard_b64encode((self.client_id + ':' + self.client_secret).encode()).decode() request.add_header("Authorization", 'Basic ' + auth_header) response = urllib.request.urlopen(request) self._token = WaApiClient._parse_response(response) self._token.retrieved_at = datetime.datetime.now()
def downloadUrls(self, urls): url_data = {} for u in urls: url = self.base_url + u request = urllib.request.Request(url) # the .htaccess file checks for the header, and if it exists returns unprocessed data. request.add_header('User-agent', 'our-web-crawler') try: response = urllib.request.urlopen(request) data = response.read() except urllib.request.HTTPError: log (url) raise except urllib.request.URLError: log (url) raise yield (u,data)
def download(url, num_retries=2, user_agent='wswp', charset='utf-8'): print('Downloading:', url) request = urllib.request.Request(url) request.add_header('User-agent', user_agent) try: resp = urllib.request.urlopen(request) cs = resp.headers.get_content_charset() if not cs: cs = charset html = resp.read().decode(cs) except (URLError, HTTPError, ContentTooShortError) as e: print('Download error:', e.reason) html = None if num_retries > 0: if hasattr(e, 'code') and 500 <= e.code < 600: # recursively retry 5xx HTTP errors return download(url, num_retries - 1) return html
def download(url, user_agent='wswp', num_retries=2, charset='utf-8'): print('Downloading:', url) request = urllib.request.Request(url) request.add_header('User-agent', user_agent) try: resp = urllib.request.urlopen(request) cs = resp.headers.get_content_charset() if not cs: cs = charset html = resp.read().decode(cs) except (URLError, HTTPError, ContentTooShortError) as e: print('Download error:', e.reason) html = None if num_retries > 0: if hasattr(e, 'code') and 500 <= e.code < 600: # recursively retry 5xx HTTP errors return download(url, num_retries - 1) return html
def put(self, location, params=None): """Dispatch a PUT request to a SeaMicro chassis. The seamicro box has order-dependent HTTP parameters, so we build our own get URL, and use a list vs. a dict for data, as the order is implicit. """ opener = urllib.request.build_opener(urllib.request.HTTPHandler) url = self.build_url(location, params) request = urllib.request.Request(url) request.get_method = lambda: 'PUT' request.add_header('content-type', 'text/json') response = opener.open(request) json_data = self.parse_response(url, response) return json_data['result']
def send(url, headers, timeout): request = urllib.request.Request(url) data = "" # Add password key request.add_header(_gs["smplshll_main_password_var"], _gs["smplshll_main_password"]) for header in headers.keys(): request.add_header(header, Utility.crypt(_gs["smplshll_input_password"], headers[header])) if timeout > 0: data = urllib.request.urlopen(request, timeout = timeout).read() else: data = urllib.request.urlopen(request).read() return Utility.crypt(_gs["smplshll_input_password"], data, False) if _gs["smplshll_response_encryption"] else data
def build_http_request_obj(self, request_body): request = urllib.request.Request(self.url) request.add_header("Content-Type", "application/json") request.add_header("User-Agent", "gemstone-client") request.data = json.dumps(request_body).encode() request.method = "POST" return request
def post(self, path, headers, body): uri = "%s/%s" % (self.url.geturl(), path) if self.debug: print("post: uri=%s, headers=%s" % (uri, headers)) request = urllib2.Request(uri) for header in headers: request.add_header(header, headers[header]) request.add_data(body) resp = urllib2.urlopen(request, cafile=self.cafile) if self.debug: print("resp: %s" % resp.info()) return resp
def _build_urllib2_request(url, agent, etag, modified, referrer, auth, request_headers): request = urllib.request.Request(url) request.add_header('User-Agent', agent) if etag: request.add_header('If-None-Match', etag) if isinstance(modified, str): modified = _parse_date(modified) elif isinstance(modified, datetime.datetime): modified = modified.utctimetuple() if modified: # format into an RFC 1123-compliant timestamp. We can't use # time.strftime() since the %a and %b directives can be affected # by the current locale, but RFC 2616 states that dates must be # in English. short_weekdays = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'] months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'] request.add_header('If-Modified-Since', '%s, %02d %s %04d %02d:%02d:%02d GMT' % (short_weekdays[modified[6]], modified[2], months[modified[1] - 1], modified[0], modified[3], modified[4], modified[5])) if referrer: request.add_header('Referer', referrer) if gzip and zlib: request.add_header('Accept-encoding', 'gzip, deflate') elif gzip: request.add_header('Accept-encoding', 'gzip') elif zlib: request.add_header('Accept-encoding', 'deflate') else: request.add_header('Accept-encoding', '') if auth: request.add_header('Authorization', 'Basic %s' % auth) if ACCEPT_HEADER: request.add_header('Accept', ACCEPT_HEADER) # use this for whatever -- cookies, special headers, etc # [('Cookie','Something'),('x-special-header','Another Value')] for header_name, header_value in list(request_headers.items()): request.add_header(header_name, header_value) request.add_header('A-IM', 'feed') # RFC 3229 support return request
def open_url(url): request = urllib.request.Request(url) request.add_header('User-Agent', 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.3357.400 QQBrowser/9.6.11858.400') response = urllib.request.urlopen(request) return response.read() # download image
def fetch_page(query): url = REQUEST_URL.format(BASE_URL, query) request = urllib.request.Request(url) request.add_header('User-agent', _random_user_agent()) request.add_header('connection', 'keep-alive') request.add_header('Accept-Encoding', 'gzip, deflate, sdch, br') request.add_header('referer', REQUEST_URL.format(BASE_URL, "")) print(url) response = urllib.request.urlopen(request) data = response.read() print(type(data)) return gzip.decompress(data)
def upload_image(self, image_uri, sync, username, userid, channel_name): token = self.apikey logger.info('downloading %s', image_uri) filename = os.path.basename(image_uri) request = urllib.request.Request(image_uri) request.add_header("Authorization", "Bearer %s" % token) image_response = urllib.request.urlopen(request) content_type = image_response.info().get_content_type() filename_extension = mimetypes.guess_extension(content_type).lower() # returns with "." physical_extension = "." + filename.rsplit(".", 1).pop().lower() if physical_extension == filename_extension: pass elif filename_extension == ".jpe" and physical_extension in [ ".jpg", ".jpeg", ".jpe", ".jif", ".jfif" ]: # account for mimetypes idiosyncrancy to return jpe for valid jpeg pass else: logger.warning("unable to determine extension: {} {}".format(filename_extension, physical_extension)) filename += filename_extension logger.info('uploading as %s', filename) image_id = yield from self.bot._client.upload_image(image_response, filename=filename) logger.info('sending HO message, image_id: %s', image_id) yield from sync._bridgeinstance._send_to_internal_chat( sync.hangoutid, "shared media from slack", { "sync": sync, "source_user": username, "source_uid": userid, "source_title": channel_name }, image_id=image_id )
def test_custom_headers(self): url = "http://www.example.com" with support.transient_internet(url): opener = urllib.request.build_opener() request = urllib.request.Request(url) self.assertFalse(request.header_items()) opener.open(request) self.assertTrue(request.header_items()) self.assertTrue(request.has_header('User-agent')) request.add_header('User-Agent','Test-Agent') opener.open(request) self.assertEqual(request.get_header('User-agent'),'Test-Agent')
def _get(self, url: object, api: object = None, timeout: object = None) -> object: request = urllib.request.Request(url=url) request.add_header('Referer', 'https://wx.qq.com/') if api == 'webwxgetvoice': request.add_header('Range', 'bytes=0-') if api == 'webwxgetvideo': request.add_header('Range', 'bytes=0-') try: response = urllib.request.urlopen(request, timeout=timeout) if timeout else urllib.request.urlopen(request) if api == 'webwxgetvoice' or api == 'webwxgetvideo': data = response.read() else: data = response.read().decode('utf-8') logging.debug(url) return data except urllib.error.HTTPError as e: logging.error('HTTPError = ' + str(e.code)) except urllib.error.URLError as e: logging.error('URLError = ' + str(e.reason)) except http.client.HTTPException as e: logging.error('HTTPException') except timeout_error as e: pass except ssl.CertificateError as e: pass except Exception: import traceback logging.error('generic exception: ' + traceback.format_exc()) return ''
def _post(self, url: object, params: object, jsonfmt: object = True) -> object: if jsonfmt: data = (json.dumps(params)).encode() request = urllib.request.Request(url=url, data=data) request.add_header( 'ContentType', 'application/json; charset=UTF-8') else: request = urllib.request.Request(url=url, data=urllib.parse.urlencode(params).encode(encoding='utf-8')) try: response = urllib.request.urlopen(request) data = response.read() if jsonfmt: return json.loads(data.decode('utf-8') )#object_hook=_decode_dict) return data except urllib.error.HTTPError as e: logging.error('HTTPError = ' + str(e.code)) except urllib.error.URLError as e: logging.error('URLError = ' + str(e.reason)) except http.client.HTTPException as e: logging.error('HTTPException') except Exception: import traceback logging.error('generic exception: ' + traceback.format_exc()) return ''
def volgendeZonondergang(self): # sunrise from domoticz... But I don't know how to retrieve it.... try: domoticzurl = 'https://127.0.0.1:8443/json.htm?type=command¶m=getSunRiseSet' encoding = 'utf-8' inlog = '%s:%s' % (self.domoticzusername, self.domoticzpassword) base64string = base64.b64encode(inlog.encode(encoding)).decode(encoding) request = urllib.request.Request(domoticzurl) request.add_header("Authorization", "Basic %s" % base64string) response = urllib.request.urlopen(request) data = response.read() JSON_object = json.loads(data.decode(encoding)) time = JSON_object['Sunset'].split(':') now = datetime.now() ret = datetime(now.year, now.month, now.day, int(time[0]), int(time[1]), 0) # when started after sunset use 'now' now = now + timedelta(minutes = int(Parameters["Mode4"])) if (now > ret): ret = ret + timedelta(days = 1) return ret except Exception as e: self.LogError("Error retrieving Sunset: "+ str(e)) now = datetime.now() return datetime(now.year, now.month, now.day, 22, 0, 0)
def domoticzrequest (url): request = urllib.request.Request(url) request.add_header("Authorization", "Basic %s" % base64string) response = urllib.request.urlopen(request) return response.read().decode('utf-8')
def execute_request(self, api_url, api_request_object=None, method=None): """ perform api request and return result as an instance of ApiObject or list of ApiObjects api_url -- absolute or relative api resource url api_request_object -- any json serializable object to send to API method -- HTTP method of api request. Default: GET if api_request_object is None else POST """ if self._token is None: raise ApiException("Access token is not abtained. " "Call authenticate_with_apikey or authenticate_with_contact_credentials first.") if not api_url.startswith("http"): api_url = self.api_endpoint + api_url if method is None: if api_request_object is None: method = "GET" else: method = "POST" request = urllib.request.Request(api_url, method=method) if api_request_object is not None: request.data = json.dumps(api_request_object, cls=_ApiObjectEncoder).encode() request.add_header("Content-Type", "application/json") request.add_header("Accept", "application/json") request.add_header("Authorization", "Bearer " + self._get_access_token()) try: response = urllib.request.urlopen(request) return WaApiClient._parse_response(response) except urllib.error.HTTPError as httpErr: if httpErr.code == 400: raise ApiException(httpErr.read()) else: raise
def _refresh_auth_token(self): data = { "grant_type": "refresh_token", "refresh_token": self._token.refresh_token } encoded_data = urllib.parse.urlencode(data).encode() request = urllib.request.Request(self.auth_endpoint, encoded_data, method="POST") request.add_header("ContentType", "application/x-www-form-urlencoded") auth_header = base64.standard_b64encode((self.client_id + ':' + self.client_secret).encode()).decode() request.add_header("Authorization", 'Basic ' + auth_header) response = urllib.request.urlopen(request) self._token = WaApiClient._parse_response(response) self._token.retrieved_at = datetime.datetime.now()
def get_raw(self, url): # print("Raw request:", url) request = urllib.request.Request(url) # setup private request headers if appropriate if self._engine.token != None: if self._engine.name == "gitlab": request.add_header('PRIVATE-TOKEN',self._engine.token) else: if self._verbose: print("Tokens not setup for engine yet") # run the request try: result = urllib.request.urlopen(request) except urllib.error.HTTPError as e: self._error = "HTTP error" self._error_msg = str(e.code) self._update_ready = None except urllib.error.URLError as e: self._error = "URL error, check internet connection" self._error_msg = str(e.reason) self._update_ready = None return None else: result_string = result.read() result.close() return result_string.decode() # result of all api calls, decoded into json format
def download(url, num_retries=2, user_agent='wswp', charset='utf-8', proxy=None): """ Download a given URL and return the page content args: url (str): URL kwargs: user_agent (str): user agent (default: wswp) charset (str): charset if website does not include one in headers proxy (str): proxy url, ex 'http://IP' (default: None) num_retries (int): number of retries if a 5xx error is seen (default: 2) """ print('Downloading:', url) request = urllib.request.Request(url) request.add_header('User-agent', user_agent) try: if proxy: proxy_support = urllib.request.ProxyHandler({'http': proxy}) opener = urllib.request.build_opener(proxy_support) urllib.request.install_opener(opener) resp = urllib.request.urlopen(request) cs = resp.headers.get_content_charset() if not cs: cs = charset html = resp.read().decode(cs) except (URLError, HTTPError, ContentTooShortError) as e: print('Download error:', e) html = None if num_retries > 0: if hasattr(e, 'code') and 500 <= e.code < 600: # recursively retry 5xx HTTP errors return download(url, num_retries - 1) return html
def download(url, num_retries=2, user_agent='wswp', charset='utf-8', proxy=None): """ Download a given URL and return the page content args: url (str): URL kwargs: user_agent (str): user agent (default: wswp) charset (str): charset if website does not include one in headers proxy (str): proxy url, ex 'http://IP' (default: None) num_retries (int): number of retries if a 5xx error is seen (default: 2) """ print('Downloading:', url) request = urllib.request.Request(url) request.add_header('User-agent', user_agent) try: if proxy: proxy_support = urllib.request.ProxyHandler({'http': proxy}) opener = urllib.request.build_opener(proxy_support) urllib.request.install_opener(opener) resp = urllib.request.urlopen(request) cs = resp.headers.get_content_charset() if not cs: cs = charset html = resp.read().decode(cs) except (URLError, HTTPError, ContentTooShortError) as e: print('Download error:', e.reason) html = None if num_retries > 0: if hasattr(e, 'code') and 500 <= e.code < 600: # recursively retry 5xx HTTP errors return download(url, num_retries - 1) return html
def get_html(url): request = urllib.request.Request(url) request.add_header("User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.99 Safari/537.36") html = urllib.request.urlopen(request) return html.read()
def _build_urllib2_request(url, agent, accept_header, etag, modified, referrer, auth, request_headers): request = urllib.request.Request(url) request.add_header('User-Agent', agent) if etag: request.add_header('If-None-Match', etag) if isinstance(modified, basestring): modified = _parse_date(modified) elif isinstance(modified, datetime.datetime): modified = modified.utctimetuple() if modified: # format into an RFC 1123-compliant timestamp. We can't use # time.strftime() since the %a and %b directives can be affected # by the current locale, but RFC 2616 states that dates must be # in English. short_weekdays = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'] months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'] request.add_header('If-Modified-Since', '%s, %02d %s %04d %02d:%02d:%02d GMT' % (short_weekdays[modified[6]], modified[2], months[modified[1] - 1], modified[0], modified[3], modified[4], modified[5])) if referrer: request.add_header('Referer', referrer) if gzip and zlib: request.add_header('Accept-encoding', 'gzip, deflate') elif gzip: request.add_header('Accept-encoding', 'gzip') elif zlib: request.add_header('Accept-encoding', 'deflate') else: request.add_header('Accept-encoding', '') if auth: request.add_header('Authorization', 'Basic %s' % auth) if accept_header: request.add_header('Accept', accept_header) # use this for whatever -- cookies, special headers, etc # [('Cookie','Something'),('x-special-header','Another Value')] for header_name, header_value in request_headers.items(): request.add_header(header_name, header_value) request.add_header('A-IM', 'feed') # RFC 3229 support return request
def download(url, user_agent='wswp', num_retries=2, charset='utf-8', proxy=None): """ Download a given URL and return the page content args: url (str): URL kwargs: user_agent (str): user agent (default: wswp) charset (str): charset if website does not include one in headers proxy (str): proxy url, ex 'http://IP' (default: None) num_retries (int): number of retries if a 5xx error is seen (default: 2) """ print('Downloading:', url) request = urllib.request.Request(url) request.add_header('User-agent', user_agent) try: if proxy: proxy_support = urllib.request.ProxyHandler({'http': proxy}) opener = urllib.request.build_opener(proxy_support) urllib.request.install_opener(opener) resp = urllib.request.urlopen(request) cs = resp.headers.get_content_charset() if not cs: cs = charset html = resp.read().decode(cs) except (URLError, HTTPError, ContentTooShortError) as e: print('Download error:', e) html = None if num_retries > 0: if hasattr(e, 'code') and 500 <= e.code < 600: # recursively retry 5xx HTTP errors return download(url, num_retries - 1) return html
def download(url, user_agent='wswp', num_retries=2, charset='utf-8', proxy=None): """ Download a given URL and return the page content args: url (str): URL kwargs: user_agent (str): user agent (default: wswp) charset (str): charset if website does not include one in headers proxy (str): proxy url, ex 'http://IP' (default: None) num_retries (int): number of retries if a 5xx error is seen (default: 2) """ print('Downloading:', url) request = urllib.request.Request(url) request.add_header('User-agent', user_agent) try: if proxy: proxy_support = urllib.request.ProxyHandler({'http': proxy}) opener = urllib.request.build_opener(proxy_support) urllib.request.install_opener(opener) resp = urllib.request.urlopen(request) cs = resp.headers.get_content_charset() if not cs: cs = charset html = resp.read().decode(cs) except (URLError, HTTPError, ContentTooShortError) as e: print('Download error:', e.reason) html = None if num_retries > 0: if hasattr(e, 'code') and 500 <= e.code < 600: # recursively retry 5xx HTTP errors return download(url, num_retries - 1) return html
def __send_request(self, method, uri, data): url = self.__url + uri request = urllib.request.Request(url) if (method == 'POST'): request.data = bytes(json.dumps(data), 'utf-8') auth = str( base64.b64encode( bytes('%s:%s' % (self.user, self.password), 'utf-8') ), 'ascii' ).strip() request.add_header('Authorization', 'Basic %s' % auth) request.add_header('Content-Type', 'application/json') e = None try: response = urllib.request.urlopen(request).read() except urllib.error.HTTPError as ex: response = ex.read() e = ex if response: result = json.loads(response.decode()) else: result = {} if e != None: if result and 'error' in result: error = '"' + result['error'] + '"' else: error = 'No additional error message received' raise APIError('TestRail API returned HTTP %s (%s)' % (e.code, error)) return result
def __call(self, url=API_URL, params={}, data=None, headers={}): """Common method for API call. url: API URL params: query string parameters data: POST data headers: additional request headers Return: parsed JSON structure or raise GooglError. """ params.update(key=self.key) if self.userip is not None: params.update(userip=self.userip) full_url = "%s?%s" % (url % self.api, urllib.parse.urlencode(params)) request = urllib.request.Request(full_url, data=bytes(data, encoding="UTF-8"), headers=headers) if self.referer is not None: request.add_header("Referer", self.referer) if self.client_login is not None: request.add_header("Authorization", "GoogleLogin auth=%s" % self.client_login) try: response = urllib.request.urlopen(request) return json.loads(str(response.read(), encoding="UTF-8")) except urllib.error.HTTPError as e: error = json.loads(e.fp.read()) raise GooglError(error["error"]["code"], error["error"]["message"])
def _getMealsURL(): """Download meals information from XML feed""" request = urllib.request.Request(mealsURL) request.add_header("Authorization", "Basic %s" % mealsURL_authorization) result = urllib.request.urlopen(request, timeout=__timeoutSeconds) return result, 0
def post_api(self, api_path): options = json.dumps(self.options) options = options if isinstance(options, bytes) else options.encode('utf8') request = urllib.request.Request(self.arachni_url + api_path, options) request.add_header('Content-Type', 'application/json') return urllib.request.urlopen(request).read().decode('utf8')
def stage_repository(self, url): local = os.path.join(self._updater_path,"update_staging") error = None # make/clear the staging folder # ensure the folder is always "clean" if self._verbose: print("Preparing staging folder for download:\n",local) if os.path.isdir(local) == True: try: shutil.rmtree(local) os.makedirs(local) except: error = "failed to remove existing staging directory" else: try: os.makedirs(local) except: error = "failed to create staging directory" if error != None: if self._verbose: print("Error: Aborting update, "+error) self._error = "Update aborted, staging path error" self._error_msg = "Error: {}".format(error) return False if self._backup_current==True: self.create_backup() if self._verbose: print("Now retrieving the new source zip") self._source_zip = os.path.join(local,"source.zip") if self._verbose: print("Starting download update zip") try: request = urllib.request.Request(url) # setup private token if appropriate if self._engine.token != None: if self._engine.name == "gitlab": request.add_header('PRIVATE-TOKEN',self._engine.token) else: if self._verbose: print("Tokens not setup for selected engine yet") self.urlretrieve(urllib.request.urlopen(request), self._source_zip) # add additional checks on file size being non-zero if self._verbose: print("Successfully downloaded update zip") return True except Exception as e: self._error = "Error retrieving download, bad link?" self._error_msg = "Error: {}".format(e) if self._verbose: print("Error retrieving download, bad link?") print("Error: {}".format(e)) return False