我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用requests.ConnectionError()。
def fetch_data(): try: r = requests.get(MTG_JSON_URL) except requests.ConnectionError: r = requests.get(FALLBACK_MTG_JSON_URL) with closing(r), zipfile.ZipFile(io.BytesIO(r.content)) as archive: unzipped_files = archive.infolist() if len(unzipped_files) != 1: raise RuntimeError("Found an unexpected number of files in the MTGJSON archive.") data = archive.read(archive.infolist()[0]) decoded_data = data.decode('utf-8') sets_data = json.loads(decoded_data) return sets_data
def _do_healthcheck(self, containers, config): path = config.get('HEALTHCHECK_URL', '/') timeout = int(config.get('HEALTHCHECK_TIMEOUT', 1)) if not _etcd_client: raise exceptions.HealthcheckException('no etcd client available') for container in containers: try: key = "/deis/services/{self}/{container.job_id}".format(**locals()) url = "http://{}{}".format(_etcd_client.get(key).value, path) response = requests.get(url, timeout=timeout) if response.status_code != requests.codes.OK: raise exceptions.HealthcheckException( "app failed health check (got '{}', expected: '200')".format( response.status_code)) except (requests.Timeout, requests.ConnectionError, KeyError) as e: raise exceptions.HealthcheckException( 'failed to connect to container ({})'.format(e))
def _make_request(self, path, cni_envs, expected_status=None): method = 'POST' address = config.CONF.cni_daemon.bind_address url = 'http://%s/%s' % (address, path) try: LOG.debug('Making request to CNI Daemon. %(method)s %(path)s\n' '%(body)s', {'method': method, 'path': url, 'body': cni_envs}) resp = requests.post(url, json=cni_envs, headers={'Connection': 'close'}) except requests.ConnectionError: LOG.exception('Looks like %s cannot be reached. Is kuryr-daemon ' 'running?', address) raise LOG.debug('CNI Daemon returned "%(status)d %(reason)s".', {'status': resp.status_code, 'reason': resp.reason}) if expected_status and resp.status_code != expected_status: LOG.error('CNI daemon returned error "%(status)d %(reason)s".', {'status': resp.status_code, 'reason': resp.reason}) raise k_exc.CNIError('Got invalid status code from CNI daemon.') return resp
def session(request, api_url): username, password = request.param with GirderSession(base_url=api_url) as s: try: r = s.get('user/authentication', auth=(username, password)) except requests.ConnectionError: raise Exception( 'Unable to connect to %s.' % api_url) try: s.headers['Girder-Token'] = r.json()['authToken']['token'] except KeyError: raise Exception( 'Unable to login with user "%s", password "%s"' % (username, password)) yield s # TODO combine with session in some way?
def make_request(session, url, url_list, bulk_query, num_retries=5, reuse_url=False): failures = 0 while True: try: result = session.get(url + '/_msearch', data=bulk_query) if _bulk_success(result): return url, result last_ex = RuntimeError('Too many failures or no urls left') except requests.ConnectionError as e: last_ex = e failures += 1 if failures >= num_retries: raise last_ex if not reuse_url: if len(url_list) == 0: raise last_ex # TODO: This is only desirable if url_list is a list of actual # servers. If the url_list is a loadbalancer like LVS then we # want to keep using the existing url. url = url_list.pop()
def get_response(self): """ Initial request. :return: request object. """ if self.current_url: url = self.current_url else: url = self.base_url try: r = requests.get(url) except ConnectionError: raise ConnectionError('No internet connection determined.') self.status_code = r.status_code if self.status_code == 200: self.content = r.content return r else: handle_response_codes(self.status_code)
def request(self, method, params, secure=False): """ :param method: The endpoint to be requested. :param params: Params to be used in request. :param secure: Whether the method belongs to the secure or readonly service. """ try: if secure: response = self.client.secure_client.service[method](params) else: response = self.client.readonly_client.service[method](params) except ConnectionError: raise APIError(None, method, params, 'ConnectionError') except Exception as e: raise APIError(None, method, params, e) data = serialize_object(response) check_status_code(data) return data
def handle_result(f): @wraps(f) def wrapper(*args, **kwargs): while True: try: return f(*args, **kwargs) except UnauthorizedError: url = kwargs.get('url') click.echo('Please login') subprocess.call(['scm', 'login', url]) break except requests.ConnectionError: click.secho('Can not connect to content manager!', fg='red') break except Exception as e: click.secho(str(e), fg='red') return wrapper
def agent_add(): payload = request.get_json() for agent in payload['agents']: host = ip(agent['host']) port = agent['port'] try: res = remote.ping(host, port) if res['status'] != 0: abort(406, '{}:{} ?? agent ??'.format(host, port)) except requests.ConnectionError: abort(500, '{}:{} Connection error'.format(host, port)) try: Agent.add(host, port) except sqlalchemy.exc.SQLAlchemyError: abort(500, '{}:{} ????'.format(host, port)) return jsonify(status=0)
def agent_list(): host = request.args.get('host') try: page = int(request.args.get('page', 1)) except ValueError: page = 1 res = Agent.search((page - 1) * PAGE_SIZE, host=host) for item in res['items']: item.update({'alive': False, 'version': 'unknown'}) try: r = remote.ping(item['host'], item['port']) if r['status'] != 0: continue item.update({'alive': True, 'version': r.get('version', 'unknown')}) except requests.ConnectionError: continue return jsonify(pages=res['pages'], page=page, count=res['count'], items=res['items'])
def test_requests_connecterror(): class GetPersonAPI(HTTPEater): request_cls = Model response_cls = Model url = 'http://example.com/' def connect(*args, **kwargs): # pylint: disable=unused-argument raise requests.ConnectionError() api = GetPersonAPI() with requests_mock.Mocker() as mock: mock.get( 'http://example.com/', text=connect ) with pytest.raises(EaterConnectError): api()
def update_json(): global data try: # response = requests.get('http://sickpi:3000/station/9100013') # Spittelmarkt response = requests.get('http://sickpi:3000/station/9160523') # Gotlindestr. json_data = response.json() data = json.loads(json.dumps(json_data[0], indent=4)) print('\nUPDATE JSON') update_departures() except (requests.HTTPError, requests.ConnectionError): print('\nERROR UPDATE JSON') quit_all()
def get_folder(client, folder_id): """ :param client: :param folder_id: :return: """ folder = None num_retry = 15 for x in range(num_retry): try: folder = client.folder(folder_id=folder_id).get() break except (ConnectionError, BrokenPipeError, ProtocolError, ConnectionResetError, BoxAPIException): crate_logger.debug(traceback.format_exc()) if x >= num_retry - 1: crate_logger.debug('Failed for the last time to get the folder: {}'.format(folder_id)) return folder
def do_check(self, s): """Checks your system's RAM stats.""" # if s == "ram": if "ram" in s: system("free -lm") # if s == "time" elif "time" in s: timeIn.main(self, s) elif "forecast" in s: forecast.main(self, s) # if s == "weather" elif "weather" in s: try: weatherIn.main(self, s) except ConnectionError: print(CONNECTION_ERROR_MSG)
def download(type_, book_id, msg): try: if type_ in ['fb2', 'epub', 'mobi']: r = requests.get(f"http://flibusta.is/b/{book_id}/{type_}") else: r = requests.get(f"http://flibusta.is/b/{book_id}/download") except requests.exceptions.ConnectionError as err: telebot.logger.exception(err) return None if 'text/html' in r.headers['Content-Type']: # if bot get html file with error message try: # try download file from tor if type_ in ['fb2', 'epub', 'mobi']: r = requests.get(f"http://flibustahezeous3.onion/b/{book_id}/{type_}", proxies=config.PROXIES) else: r = requests.get(f"http://flibustahezeous3.onion/b/{book_id}/download", proxies=config.PROXIES) except requests.exceptions.ConnectionError as err: logger.debug(err) bot.reply_to(msg, "?????? ??????????? ? ???????! ?????????? ???????.").wait() return None if 'text/html' in r.headers['Content-Type']: # send message to user when get bot.reply_to(msg, "??????! ?????????? ????? ???? ????? :(").wait() # html file return None return r
def echo_errors(f): """ Decorator for subcommands, that will echo any known errors from Client. """ @wraps(f) def wrapped(*args, **kwargs): try: return f(*args, **kwargs) except dprclient.ConfigError as e: echo('[ERROR]: %s \n\n' 'To enter configuration options please run:\n' ' dpmpy configure\n' % str(e)) sys.exit(1) except requests.ConnectionError as e: echo('[ERROR] %s\n' % repr(e)) echo('Network error. Please check your connection settings\n') sys.exit(1) except dprclient.HTTPStatusError as e: echo('[ERROR] %s\n' % str(e.message)) sys.exit(1) except dprclient.DpmException as e: echo('[ERROR] %s\n' % str(e)) sys.exit(1) return wrapped
def handle_trigger_event(self, sender, **data): # Pushover doesn't support images, so just send the event. notification_data = { "user": self.pushover_user, "token": self.pushover_token, "message": "Camera %s, event: %s" % (data['source'], data['prediction']), "timestamp": calendar.timegm(data['timestamp'].timetuple()) } # Optionally, set the device. if self.pushover_device: notification_data['device'] = self.pushover_device try: r = requests.post("https://api.pushover.net/1/messages.json", data=notification_data) if r.status_code != 200: logger.error("Failed to send notification, (%d): %s" % (r.status_code, r.text)) except requests.ConnectionError, e: logger.error("Connection Error:", e) except requests.HTTPError, e: logger.error("HTTP Error:", e)
def check_online(): try: response = requests.get("http://www.google.com") if response.status_code == 200: return True except requests.ConnectionError: pass # second attempt time.sleep(2) try: response = requests.get("http://www.google.com") if response.status_code == 200: return True return False except requests.ConnectionError: return False
def fetch_info(id, type): """ Returns a dictionary with information about the media(id, type). Currently only fetches 'members'(popularity count). """ validate_media(type) url = media_url(id, type) try: response = requests.get(url) html = response.content return extract_info(html) except requests.ConnectionError: print(f"Timed out on fetching {type}:{id} info") return None except Exception as err: print(id, type, '-', response.status_code, '-', err) if response.status_code == 404: return [] return None
def _query(query_str): url = 'http://{host}:{port}/query' \ .format(host=settings.INFLUXDB_HOST, port=settings.INFLUXDB_PORT) params = { 'q': query_str, 'chunked': 'false', # 'epoch': 's', 'db': settings.INFLUXDB_DATABASE, 'u': settings.INFLUXDB_USER, 'p': settings.INFLUXDB_PASSWORD } try: r = requests.get( url=url, params=params ) return r.json() except requests.ConnectionError as e: raise InfluxDBConnectionError(e), None, sys.exc_info()[2] except ValueError as e: raise InfluxDBUnexpectedAnswer(e), None, sys.exc_info()[2]
def request_metrics_data(args, uri): # constructing URL to query metrics from url = 'http://{host}:{port}/{uri}?json'.format(host=args.host, port=args.port, uri=uri) # proceeding with query metrics try: r = requests.get(url, verify=False, allow_redirects=True) except requests.ConnectionError as e: print("Unable to connect to ", url, " error is ", e, file=sys.stderr) return False if r.status_code == 200: # got HTTP/200 for request - storing it in cache open(get_tmp_file_name(args, uri), mode="w").write(json.dumps(r.json())) else: return False return True
def request(self, path, method='GET', **kwargs): try: url = ENDPOINT_URL + path response = requests.request( method=method, url=url, headers=self.headers, timeout=self.timeout, **kwargs ) except requests.HTTPError as e: response = json.loads(e.read()) except requests.ConnectionError as e: raise VoucherifyError(e) if response.headers.get('content-type') and 'json' in response.headers['content-type']: result = response.json() else: result = response.text if isinstance(result, dict) and result.get('error'): raise VoucherifyError(result) return result
def get_fees(): try: response = requests.get(_fee_host + "v1/fees/recommended") if response.status_code == 200: fee_per_kb = response.json()['halfHourFee'] * 1000 else: raise requests.ConnectionError('Received status_code %d' % response.status_code) except requests.RequestException as error: fee_per_kb = DEFAULT_FEE_PER_KB logger.error( "Error getting recommended fees from server: %s. Using defaults." % error) if not 0 <= fee_per_kb <= 2 * DEFAULT_FEE_PER_KB: raise exceptions.UnreasonableFeeError( 'Unreasonable fee per kB: %s' % fee_per_kb) return { 'per_kb': fee_per_kb, 'per_input': int(DEFAULT_INPUT_SIZE_KB * fee_per_kb), 'per_output': int(DEFAULT_OUTPUT_SIZE_KB * fee_per_kb) }
def push(self): """ Push data to API. :return: push success or not """ try: re = requests.post(url=self.api, data={"info": json.dumps(self.post_data, ensure_ascii=False)}) result = re.json() if result.get("vul_pdf", "") != "": logger.info('[PUSH API] Push success!') return True else: logger.warning('[PUSH API] Push result error: {0}'.format(re.text)) return False except (requests.ConnectionError, requests.HTTPError) as error: logger.critical('[PUSH API] Network error: {0}'.format(str(error))) return False except ValueError as error: logger.critical('[PUSH API] Response error: {0}'.format(str(error))) return False
def _configure_login(self): login_manager = LoginManager() login_manager.init_app(app) @login_manager.user_loader def load_token(token): try: response = AuthClient().token.get(token) except HTTPError: return None except requests.ConnectionError: logger.warning('Wazo authentication server connection error') return None token = response.get('token') if not token: return None return UserUI(token, response.get('auth_id'))
def validate(self): super(LoginForm, self).validate() try: response = AuthClient(username=self.username.data, password=self.password.data).token.new('xivo_admin', expiration=60*60*12) except HTTPError as e: if unauthorized(e): self.username.errors.append(USERNAME_PASSWORD_ERROR) self.password.errors.append(USERNAME_PASSWORD_ERROR) return False raise ValidationError(l_('Error with Wazo authentication server: %(error)s', error=e.message)) except requests.ConnectionError: raise ValidationError(l_('Wazo authentication server connection error')) self.user = UserUI(response['token'], response['auth_id']) return True
def check_host_url(self, ssl_verify): """ Check if host url is valid Args: ssl_verify (bool/str): Whether or not to verify SSL certificates, or a path to a CA file to use. Returns: true/false(bool): Returns true if url is valid else false """ log.debug('Checking if host URL %s is correct', self.url) try: response = self.get_response(method='GET', url=self.url, ssl_verify=ssl_verify) if response.status_code == 200: return True else: raise ValueError('Host URL is incorrectly configured' ' in config file.') except (ValueError, requests.ConnectionError): raise ValueError('Host URL is incorrectly configured' ' in config file.') except: raise
def _read_ngrok_tunnels(self): webhook_urls = dict() self._api_url = 'http://%s:%d/api' % self._find_ngrok_inspection_port() logging.info('Local ngrok API url: %s', self._api_url) for _ in range(0, self.POLL_TUNNELS_MAX_TRIES): try: data = requests.get(self._api_url + '/tunnels').json() if 'tunnels' not in data or len(data['tunnels']) == 0: raise ValueError('ngrok API did not return any tunnel.') for tunnel in data['tunnels']: if tunnel['config']['addr'].endswith(':' + str(self.server.server_port)): webhook_urls[tunnel['proto']] = tunnel['public_url'] break except (requests.ConnectionError, ValueError) as e: logging.error('Error reading ngrok API: %s. Retry in 1sec.', e) time.sleep(1) if 'https' in webhook_urls: self._webhook_url = webhook_urls['https'] + '/' + self.server.session_token else: raise RuntimeError('Did not receive any HTTPS tunnel from ngrok API.')
def api_delete(server_name, api, session_id): #Header and URL for delete call headers = {'content-type': 'application/json', 'cookie': 'JSESSIONID='+session_id } url = 'https://' + server_name + API + api try: # Invoke the API. r = requests.delete(url, headers=headers, verify=False) except requests.ConnectionError: raise TintrRequestsiApiException("API Connection error occurred.") except requests.HTTPError: raise TintriRequestsException("HTTP error occurred.") except requests.Timeout: raise TintriRequestsException("Request timed out.") except: raise TintriRequestsException("An unexpected error " + sys.exc_info()[0] + " occurred.") return r # PUT
def api_put(server_name, api, payload, session_id): headers = {'content-type': 'application/json', 'cookie': 'JSESSIONID='+session_id } url = 'https://' + server_name + API + api try: # Invoke the API. r = requests.put(url, data=json.dumps(payload), headers=headers, verify=False) except requests.ConnectionError: raise TintriRequestsException("API Connection error occurred.") except requests.HTTPError: raise TintriRequestsException("HTTP error occurred.") except requests.Timeout: raise TintriRequestsException("Request timed out.") except: raise TintriRequestsException("An unexpected error " + sys.exc_info()[0] + " occurred.") return r # POST
def api_post(server_name, api, payload, session_id): headers = {'content-type': 'application/json', 'cookie': 'JSESSIONID='+session_id } url = 'https://' + server_name + API + api try: # Invoke the API. r = requests.post(url, data=json.dumps(payload), headers=headers, verify=False) except requests.ConnectionError: raise TintriRequestsException("API Connection error occurred.") except requests.HTTPError: raise TintriRequestsException("HTTP error occurred.") except requests.Timeout: raise TintriRequestsException("Request timed out.") except: raise TintriRequestsException("An unexpected error " + sys.exc_info()[0] + " occurred.") return r # Login.
def download_file(server_name, report_url, session_id, file_name): headers = {'content-type': 'application/json'} try: r = requests.get(report_url, headers=headers, verify=False, stream=True) # if HTTP Response is not 200 then raise an exception if r.status_code != 200: message = "The HTTP response for get call to the server is not 200." raise TintriApiException(message, r.status_code, report_url, "No Payload", r.text) with open(file_name, 'w') as file_h: for block in r.iter_content(4096): file_h.write(block) except requests.ConnectionError: raise TintriRequestsException("API Connection error occurred.") except requests.HTTPError: raise TintriRequestsException("HTTP error occurred.") except requests.Timeout: raise TintriRequestsException("Request timed out.") except Exception as e: raise TintriRequestsException("An unexpected error: " + e.__str__())
def index(self, uid='', iss=''): link = '' if iss: link = iss elif uid: try: link = self.rph.find_srv_discovery_url( resource="acct:{}".format(uid)) except requests.ConnectionError: raise cherrypy.HTTPError( message="Webfinger lookup failed, connection error") else: fname = os.path.join(self.html_home, 'opbyuid.html') return as_bytes(open(fname, 'r').read()) if link: resp_headers = self.rph.begin(link) raise cherrypy.HTTPRedirect(resp_headers['Location'])
def test_get_podm_status_Offline_by_http_exception(self, mock_get): mock_get.side_effect = requests.ConnectionError self.assertEqual(redfish.pod_status('url', 'username', 'password'), constants.PODM_STATUS_OFFLINE) mock_get.asset_called_once_with('url', auth=auth.HTTPBasicAuth('username', 'password')) # SSL Error mock_get.side_effect = requests.exceptions.SSLError self.assertEqual(redfish.pod_status('url', 'username', 'password'), constants.PODM_STATUS_OFFLINE) self.assertEqual(mock_get.call_count, 2) # Timeout mock_get.side_effect = requests.Timeout self.assertEqual(redfish.pod_status('url', 'username', 'password'), constants.PODM_STATUS_OFFLINE) self.assertEqual(mock_get.call_count, 3)
def supervisor_controlled_processes_running(self): # Use the api to verify the processes controlled by supervisor are all in a RUNNING state try: response = self.chroma_manager.get('/api/system_status/') except requests.ConnectionError: logger.warning("Connection error trying to connect to the manager") return False self.assertEqual(response.successful, True, response.text) system_status = response.json non_running_processes = [] for process in system_status['supervisor']: if not process['statename'] == 'RUNNING': non_running_processes.append(process) if non_running_processes: logger.warning("Supervisor processes found not to be running: '%s'" % non_running_processes) return False else: return True
def get(self,url): try: resp = requests.get(url=url,headers=headers.get_header()) self.notifylineEdit_4.emit(u"[+] ???? "+url) return resp except requests.ConnectionError: # self.log.error(traceback.print_exc()) # print traceback.print_exc() self.log.error(u'??{0}??'.format(url)) # ???????3? time.sleep(3) except Exception as e: # self.log.error(traceback.print_exc()) print traceback.print_exc() self.log.error(u'??{0}??????{1}'.format(url,e)) # return # ?????? # http://movie.douban.com/subject/10533913/photos?type=R # ?? http://img3.douban.com/view/photo/thumb/public/p1812524514.jpg # ???? http://img3.douban.com/view/photo/photo/public/p1812524514.jpg # ???????images,??????
def create_external_export(username, id_string, export_id, query=None, token=None, meta=None): export = Export.objects.get(id=export_id) try: # though export is not available when for has 0 submissions, we # catch this since it potentially stops celery gen_export = generate_external_export( Export.EXTERNAL_EXPORT, username, id_string, export_id, token, query, meta ) except (Exception, NoRecordsFoundError, ConnectionError) as e: export.internal_status = Export.FAILED export.save() # mail admins details = { 'export_id': export_id, 'username': username, 'id_string': id_string } report_exception("External Export Exception: Export ID - " "%(export_id)s, /%(username)s/%(id_string)s" % details, e, sys.exc_info()) raise else: return gen_export.id
def setUp(self): def getLogger(name): self.mock_logger = mock.Mock() return self.mock_logger sys.modules['logging'].getLogger = getLogger def get(url, headers): get_return = mock.Mock() get_return.ok = True get_return.json = mock.Mock() get_return.json.return_value = {'data': {'status': 1}} return get_return sys.modules['requests'].get = get self.env = EnvironmentVarGuard() self.env.set('CACHET_TOKEN', 'token2') self.configuration = Configuration('config.yml') sys.modules['requests'].Timeout = Timeout sys.modules['requests'].ConnectionError = ConnectionError sys.modules['requests'].HTTPError = HTTPError
def url_to_img_array(url): if not isinstance(url, basestring): logging.warning("input is neither an ndarray nor a string, so I don't know what to do") return None # replace_https_with_http: if 'http' in url and 'https' not in url: url = url.replace("https", "http") try: headers = {'User-Agent': USER_AGENT} response = requests.get(url, headers=headers) img_array = cv2.imdecode(np.asarray(bytearray(response.content)), 1) except requests.ConnectionError: logging.warning("connection error - check url or connection") return None except: logging.warning(" error other than connection error - check something other than connection") return None return img_array
def post_event(url, tag_name, event_message): headers = {"Content-Type": "application/json"} try: r = requests.post(url, headers=headers, data='{{"what":"Ceph Health",' '"tags":"{}",' '"data":"{}"}}'.format(tag_name, event_message)) except requests.ConnectionError: # if we hit this, the endpoint wasn't there (graphite web was not # accessible) so identify that issue as a server error (500) return 500 else: return r.status_code
def post(self, printer_id): """Send file from on printer to defined printers or prints given file""" args = deleteParser.parse_args() printer = g.user.get_printer_id(printer_id) if not printer: return "", 403 if args["send"]: # send file from one printer to defined printers printer_ids = sendParser.parse_args() printers = g.user.get_accessible_printers_id(printer_ids["printerId"]) content = OctoprintService.get_file_contents(printer, args["origin"], args["name"]) for dest_printer in printers: try: OctoprintService.send_file(dest_printer, args["name"], content, False) except (RuntimeError, requests.ConnectionError): pass return "", 200 else: # print file if OctoprintService.print(printer, args["origin"], args["name"]): return "", 200 return "", 409
def test_run_socket_error(self, m_post): m_post.side_effect = requests.ConnectionError result = self._test_run('DEL', 'delNetwork', m_post) self.assertEqual(1, result)
def get_wait_time_list(): for _ in xrange(3): try: token = get_token() response = get_wait_time(token) except (requests.HTTPError, requests.ConnectionError): time.sleep(1) else: return response
def http_request(self, method, url, **kwargs): method = method.upper() verify_ssl = kwargs.pop('verify', None) or self.ssl_verify proxies = kwargs.pop('proxies', None) or self.proxies new_headers = kwargs.pop('headers', None) if new_headers: headers = self.token_header.copy() headers.update(new_headers) else: headers = self.token_header uri = self.server + url try: raw_data = kwargs.get("data", None) if raw_data: log.debug("Sending HTTP {0} {1} with {2}".format(method, url, raw_data)) r = self.session.request(method, uri, headers=headers, verify=verify_ssl, proxies=proxies, timeout=self._timeout, **kwargs) log.debug('HTTP {0:s} {1:s} took {2:.3f}s (response {3:d})'.format(method, url, calculate_elapsed_time(r.elapsed), r.status_code)) except requests.Timeout as timeout_error: raise TimeoutError(uri=uri, original_exception=timeout_error) except requests.ConnectionError as connection_error: raise ApiError("Received a network connection error from {0:s}: {1:s}".format(self.server, str(connection_error)), original_exception=connection_error) except Exception as e: raise ApiError("Unknown exception when connecting to server: {0:s}".format(str(e)), original_exception=e) else: if r.status_code == 404: raise ObjectNotFoundError(uri=uri, message=r.text) elif r.status_code == 401: raise UnauthorizedError(uri=uri, action=method, message=r.text) elif r.status_code >= 400: raise ServerError(error_code=r.status_code, message=r.text) return r
def _send_rest_request(self, address): try: resp = requests.get(address) except ConnectionError as e: self.logger.debug('{} - {}'.format(address, e)) return False except Exception as e: self.logger.warning('{} - {}'.format(address, e)) return False else: self.logger.debug(resp) return resp.ok
def _make_request(self, url, protocol='https'): try: return self.session.get('{}://{}'.format(protocol, url), timeout=5, verify=False) except requests.Timeout: return False except requests.ConnectionError as e: logging.debug('Connection Error: {}'.format(e)) return False
def login(self): """ Log in to the ICE Portal (wifionice) """ logging.info('Trying to log in...') try: ret = self.session.post('http://{}/de/?login'.format(self.api_host_ip), data={'login': True, 'CSRFToken': self.csrf_token}) except requests.exceptions.ConnectionError: logging.debug('Login Failed, probably bad wifi')
def __log(self, msg, level): payload = {'app_name': str(self.app_name), 'log_level': level, 'message': msg} try: self.session.post(url=self.elastic_url, data=json.dumps(payload), headers=self.headers, auth=self.auth) except ConnectionError as ce: logging.error(ce.message) logging.exception('Unable to connect to Elastic Search. Check the `elastic_url` and `auth`') logging.error(msg)