我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用requests.session()。
def get_captcha(): import time t = str(int(time.time()*1000)) captcha_url = "https://www.zhihu.com/captcha.gif?r={0}&type=login".format(t) t = session.get(captcha_url, headers=header) with open("captcha.jpg","wb") as f: f.write(t.content) f.close() from PIL import Image try: im = Image.open('captcha.jpg') im.show() im.close() except: pass captcha = input("?????\n>") return captcha
def new_session(account): if account.get('session',None) is None: session = requests.session() session.verify = True session.headers.update({'User-Agent': 'Niantic App'}) # session.headers.update({'User-Agent': 'niantic'}) if not account['proxy'] is None: session.proxies.update(account['proxy']) account['session'] = session else: account['session'].close() account['session'].cookies.clear() account['session_time'] = get_time() account['session_hash'] = os.urandom(32) account['api_url'] = API_URL account['auth_ticket'] = None
def get_upload_url(self,session): """Summary Args: session (TYPE): Description Returns: TYPE: Description """ r = session.get ( DataManagement.__APPSPOT_URL ) if r.text.startswith ( '\n<!DOCTYPE html>' ): self.logger.debug ( 'Incorrect credentials. Probably. If you are sure the credentials are OK, ' 'refresh the authentication token. If it did not work report a problem. ' 'They might have changed something in the Matrix.' ) sys.exit ( 1 ) elif r.text.startswith ( '<HTML>' ): self.logger.debug ( 'Redirecting to upload URL' ) r = session.get ( DataManagement.__APPSPOT_URL ) d = ast.literal_eval ( r.text ) return d['url']
def __init__(self, cache: bool = False, future: bool = True): if cache: redis_conn = redis.StrictRedis(host='redis') self.session = requests_cache.core.CachedSession( cache_name='api_cache', backend='redis', expire_after=60 * 60 * 24 * 30, allowable_codes=(200,), allowable_methods=('GET',), old_data_on_error=False, connection=redis_conn, ) else: self.session = session() if future: self.future_session = FuturesSession(max_workers=10, session=self.session) self.url = self.url_template.format(resource='', token=self.token)
def get_token(self, symbols, wlist): ip = Helper.get_ip() url = 'https://current.sina.com.cn/auth/api/jsonp.php/' + \ 'var%20KKE_auth_{}=/'.format(Helper.random_string(9)) + \ 'AuthSign_Service.getSignCode?' + \ 'query=hq_pjb&ip={}&list={}&kick=1'.format(ip, wlist) resp = self.session.get(url) pat = re.compile('result:"([^"]+)",timeout:(\d+)') m = pat.search(resp.text) if m: token, timeout = m.groups() timeout = int(timeout) return token else: log.error('token error: {}'.format(resp.text)) return self.get_token(symbols, wlist)
def login(self, name, password): """Authenticates user. :param str name: account name :param str password: account password .. note:: Throws :class:`exceptions.RequestFailed` (20, 'badCreadentials') if bad credentials are passed. """ self.session = requests.session() remember = True data = self._request('login', [name, password, int(remember)], hmethod='POST') return LoggedUser(fw=self, uid=data[3], name=data[0], img=common.img_path_to_relative(data[1]), sex=data[4], birth_date=data[5]) # This method is one big TODO.
def first_get(self): global _session _session=requests.session() main_url='https://www.instagram.com' _session.get(main_url,proxies=self.use_proxy,verify=True) self.save_cookies() if os.path.exists('cookiefile'):#print('have cookies') self.csrf=self.read_cookies() self.data=self.create_ajax() print(self.data) self.ins() time.sleep(5)#wait for 5 seconds self.my_selfie=get_pic.get_pic()#???——??-?? self.my_selfie.get_selfie()#download random selfie picture to local folder self.upload()#upload the selfie else: pass
def __init__(self, username, password, timeout = 120, ref_id = 0): self._username = username self._password = password self._ref_id = '{}'.format(ref_id) # save as str self._timeout = timeout self._session = session() # init a new session self._normal_captcha = None # save last solved captcha self._recaptcha = None self._error = None # keep track of last error self._headers = { # use this user agent 'User-Agent' : USER_AGENT } # solve normal captcha
def _get_log_schema(self): """ Get the log schema for this SMC version. :return: dict """ if self.session and self.session_id: schema = '{}/{}/monitoring/log/schemas'.format(self.url, self.api_version) response = self.session.get( url=schema, headers={'cookie': self.session_id, 'content-type': 'application/json'}) if response.status_code in (200, 201): return response.json()
def available_api_versions(base_url, timeout=10, verify=True): """ Get all available API versions for this SMC :return version numbers :rtype: list """ try: r = requests.get('%s/api' % base_url, timeout=timeout, verify=verify) # no session required if r.status_code == 200: j = json.loads(r.text) versions = [] for version in j['version']: versions.append(version['rel']) #versions = [float(i) for i in versions] return versions raise SMCConnectionError( 'Invalid status received while getting entry points from SMC. ' 'Status code received %s. Reason: %s' % (r.status_code, r.reason)) except requests.exceptions.RequestException as e: raise SMCConnectionError(e)
def getListProxies(self): session = requests.session() self.getRandomUserAgent() page = session.get("http://www.xicidaili.com/nn", headers=self.headers) soup = BeautifulSoup(page.text, 'lxml') proxyList = [] taglist = soup.find_all('tr', attrs={'class': re.compile("(odd)|()")}) for trtag in taglist: tdlist = trtag.find_all('td') proxy = { 'https': tdlist[1].string + ':' + tdlist[2].string } proxyList.append(proxy) return proxyList #????????????????????
def __init__(self, key=None, code=None, user_agent='', cache=None): """ This class is the base for access the XML API. Attributes can be called on it to build a path to an endpoint. Args: key (str) - optional authentication keyID code (str) - optional authentication vCode user_agent (str) - optional (recommended) User-Agent to use when making web requests cache (preston.xmlapi.cache.Cache) - page cache Returns: None """ self.cache = cache or Cache() self.user_agent = user_agent or 'Preston (github.com/Celeo/Preston)' self.session = requests.session() self.session.headers.update({'User-Agent': self.user_agent}) self.key = key self.code = code
def __init__(self, api_key, api_secret, redirect_uri, token=None): # const define self.site = 'https://api.weibo.com/' self.authorization_url = self.site + 'oauth2/authorize' self.token_url = self.site + 'oauth2/access_token' self.api_url = self.site + '2/' # init basic info self.client_id = api_key self.client_secret = api_secret self.redirect_uri = redirect_uri self.session = requests.session() # activate client directly if given token if token: self.set_token(token)
def get(self, uri, **kwargs): """ Request resource by get method. """ # 500 test url # self.api_url='https://httpbin.org/status/500' url = "{0}{1}.json".format(self.api_url, uri) res = self.session.get(url, params=kwargs) # other error code with server will be deal in low level app # 403 for invalid access token and rate limit # 400 for information of expire token if res.status_code in [200, 400, 403]: self._assert_error(res.json()) return res
def create_session_proxy(self, proxy_info): # ?????? ip = proxy_info['ip'] port = proxy_info['port'] protocol = proxy_info['protocol'].lower() proxy = {protocol: ip + ':' + port} # ??session session = requests.session() session.headers = requestHeader session.cookies.update(self.account_manager.get_auth_token()) session.proxies = proxy # ? session ????? self.session_pool.put(session) self.created_session_num_change(1) self.available_session_num_change(1) # ????????? session
def fetch_proxy_data(self, page): # ?????? self.session.headers = requestHeader # ???? URL url = requestUrl + str(page) # ???? retry_time = 0 while retry_time < NETWORK_RETRY_TIMES: try: response = self.session.get(url, timeout=CONNECT_TIMEOUT) return response.text except Exception: # ???? # print("[????]?????????????") retry_time += 1 time.sleep(NETWORK_RECONNECT_INTERVAL) # ???????? None return None
def __init__(self, url, name=None, requests_session=None, timeout=30): IOBase.__init__(self) self.url = url self.sess = requests_session if requests_session is not None else requests.session() self._seekable = False self.timeout = timeout f = self.sess.head(url, headers={'Range': 'bytes=0-'}, timeout=timeout) if f.status_code == 206 and 'Content-Range' in f.headers: self._seekable = True self.len = int(f.headers["Content-Length"]) if name is None: if "Content-Disposition" in f.headers: value, params = cgi.parse_header(f.headers["Content-Disposition"]) if "filename" in params: self.name = params["filename"] else: self.name = name f.close() self._pos = 0 self._r = None
def extract_news(news_url): # Fetch html session_requests = requests.session() response = session_requests.get(news_url, headers=getHeaders()) news = {} try: # Parse html tree = html.fromstring(response.content) # Extract information news = tree.xpath(GET_CNN_NEWS_XPATH) news = ''.join(news) except Exception as e: print # coding=utf-8 return {} return news
def logout(self): ''' ???? :returns: True????False???????? ''' #passport_logout_response = self.get_response(logout_url) self.session.cookies = cookielib.CookieJar() response = self.get_response(logout_url) check_logout = re.findall('????', response) if len(check_logout) > 0: self.logined = False self.remove_cookies() return True else: return False
def save_cookies(self): ''' ??cookie :returns: True????False???????? ''' try: if os.path.isfile('cookie.list'): os.remove('cookie.list') fd = open('cookie.list', "wb+") cookie_list = requests.utils.dict_from_cookiejar(self.session.cookies) fd.write(json.dumps(cookie_list)) fd.close() return True except Exception: utils.show_msg(traceback.print_exc()) utils.show_msg('???Can\'t save cookie list file.') return False
def get_all_pic_url(question_num, answer_offset,answer_limit): url = 'https://www.zhihu.com/api/v4/questions/{qnum}/answers?include=data%5B*%5D.is_normal%2Cis_collapsed%2Cannotation_action%' \ '2Cannotation_detail%2Ccollapse_reason%2Cis_sticky%2Ccollapsed_by%2Csuggest_edit%2Ccomment_count%2Ccan_comment%2Ccontent%2Ceditable_content%2Cvoteup_count%2Creshipment_settings%2Ccomment_permission%' \ '2Cmark_infos%2Ccreated_time%2Cupdated_time%2Creview_info%2Crelationship.is_authorized%2Cis_author%2Cvoting%2Cis_thanked%2Cis_nothelp%2Cupvoted_followees%3Bdata%5B*%5D.author.follower_count%2Cbadge%5B%3F(type%3Dbest_answerer)%5D.topics&' \ 'offset={offset}&limit={limit}&sort_by=default' response = session.get(url.format(qnum=question_num,offset=answer_offset,limit=answer_limit), headers=headers, allow_redirects=False) print('json_response', response) json_response = response.json() answer = json_response['data'] pattern = re.compile(r'data-original=\"https\:(.*?)\.(jpg|png)"') urls = []; for i in range(0, len(answer)): per_answer_dict = answer[i] # dict per_answer_content_str = per_answer_dict['content'] match = pattern.findall(per_answer_content_str) urls.extend(["https:" + i[0] + ".jpg" for i in match[1::2]]) return urls
def login(self, username, password, require_ownership=False): resp = self.session.post( self.login_url, params=dict(require_game_ownership=int(require_ownership)), data=dict(username=username, password=password) ) try: json = resp.json() except: json = None try: resp.raise_for_status() return json[0] except requests.HTTPError: if isinstance(json, dict) and 'message' in json: if json['message'] == 'Insufficient membership': raise OwnershipError(json['message']) else: raise AuthError(json['message']) else: raise
def login(self, username, password): self.username = username self.password = password data = "password=%s&username=%s" % (self.password, self.username) res = self.session.post(self.endpoints['login'], data=data) res = res.json() try: self.auth_token = res['token'] except KeyError: return False self.headers['Authorization'] = 'Token '+self.auth_token return True ############################## #GET DATA ##############################
def run(self): try: url = 'http://%s/Home/Save' % (self.url) data = { 'u' : self.username, 'p' : self.password } print(url) print(data) s = requests.session() r = s.post(url, data, verify=False) print(r.status_code, r.content ) print('-----------------------------------------------------') except Exception as e: print(e) pass #self.parent and self.parent.on_thread_finished(self, 'done') # top-level domains
def __init__(self, constants, util, settings): """ Injects instances, sets session file & loads initial session :param constants: Constants instance :type constants: resources.lib.Constants :param util: Utils instance :type util: resources.lib.Utils :param settings: Settings instance :type settings: resources.lib.Settings """ self.constants = constants self.utils = util self.settings = settings addon = self.utils.get_addon() verify_ssl = True if addon.getSetting('verifyssl') == 'True' else False self.session_file = self.utils.get_addon_data().get('cookie_path') self.verify_ssl = verify_ssl self._session = self.load_session()
def load_session(self): """ Generates the build up session object, loads & deserializes Cookie file if exists :returns: requests.session -- Session object """ _session = session() _session.headers.update({ 'User-Agent': self.utils.get_user_agent(), 'Accept-Encoding': 'gzip' }) if path.isfile(self.session_file): with open(self.session_file, 'r') as handle: try: _cookies = utils.cookiejar_from_dict(pickle.load(handle)) except EOFError: _cookies = utils.cookiejar_from_dict({}) _session.cookies = _cookies return _session
def getCoupon(): sched_Timer="2017-06-14 20:00" ##??????? ##????????url ?????Network?? couPonUrl="https://api.m.jd.com/client.action?functionId=newBabelAwardCollection&body=%7B%22activityId%22%3A%223tPzkSJZdNRuhgmowhPn7917dcq1%22%2C%22scene%22%3A%221%22%2C%22args%22%3A%22key%3D898c3948b1a44f36b032c8619e2514eb%2CroleId%3D6983488%2Cto%3Dpro.m.jd.com%2Fmall%2Factive%2F3tPzkSJZdNRuhgmowhPn7917dcq1%2Findex.html%22%2C%22mitemAddrId%22%3A%22%22%2C%22geo%22%3A%7B%22lng%22%3A%22%22%2C%22lat%22%3A%22%22%7D%7D&client=wh5&clientVersion=1.0.0&sid=dce17971eb6cbfcc2275dded296bcb58&uuid=1506710045&area=&_=1497422307569&callback=jsonp5" ##????????referer ?????Network?? referer="https://pro.m.jd.com/mall/active/3tPzkSJZdNRuhgmowhPn7917dcq1/index.html" while(1): now=datetime.datetime.now().strftime('%Y-%m-%d %H:%M'); if now==sched_Timer: cj = requests.utils.cookiejar_from_dict(get_cookie()) session.cookies = cj resp=session.get( url=couPonUrl, headers={ 'Referer':referer , } ) logger.info(resp.text) break
def getCaptcha(self): captchaImgUrl = 'https://passport.lagou.com/vcode/create?from=register&refresh=%s' % time.time() # ??????? f = open(CaptchaImagePath, 'wb') f.write(self.session.get(captchaImgUrl, headers=HEADERS).content) f.close() # ??????? if sys.platform.find('darwin') >= 0: subprocess.call(['open', CaptchaImagePath]) elif sys.platform.find('linux') >= 0: subprocess.call(['xdg-open', CaptchaImagePath]) else: os.startfile(CaptchaImagePath) # ??????? captcha = input("???????(% s)????: " % CaptchaImagePath) print('????????:% s' % captcha) return captcha
def get_vdcode(): t = str(int(time.time()*1000)) captcha_url = 'https://passport.bilibili.com/captcha.gif?r=' + t + "&type=login" r = session.get(captcha_url) with open('captcha.jpg', 'wb') as f: f.write(r.content) f.close() # ?pillow ? Image ????? # ?????? pillow ????????????????????? try: im = Image.open('captcha.jpg') im.show() im.close() except: print(u'?? %s ????captcha.jpg ????' % os.path.abspath('captcha.jpg')) captcha = input("please input the captcha\n>") return captcha
def __init__(self, timeout=60, cache=False, max_retries=None, retry_interval=None): """The constructor. Args: timeout (float): The default global timeout(seconds). """ self.timeout = timeout self.session = requests.session() if max_retries and retry_interval: retries = Retry(total=max_retries, backoff_factor=retry_interval) self.session.mount('http://', HTTPAdapter(max_retries=retries)) self.session.mount('https://', HTTPAdapter(max_retries=retries)) if cache: self.session = CacheControl(self.session)
def execute_as_string(self, request): """Execute a given HttpRequest to get a string response back Args: request (HttpRequest): The given HttpRequest to execute. Returns: HttpResponse: The response of the HttpRequest. """ response = self.session.request(HttpMethodEnum.to_string(request.http_method), request.query_url, headers=request.headers, params=request.query_parameters, data=request.parameters, files=request.files, timeout=self.timeout) return self.convert_response(response, False)
def execute_as_binary(self, request): """Execute a given HttpRequest to get a binary response back Args: request (HttpRequest): The given HttpRequest to execute. Returns: HttpResponse: The response of the HttpRequest. """ response = self.session.request(HttpMethodEnum.to_string(request.http_method), request.query_url, headers=request.headers, params=request.query_parameters, data=request.parameters, files=request.files, timeout=self.timeout) return self.convert_response(response, True)
def is_login(): #??????????????????????? inbox_url = "https://www.zhihu.com/question/56250357/answer/148534773" response = session.get(inbox_url, headers=header, allow_redirects=False) if response.status_code != 200: return False else: return True
def get_xsrf(): #??xsrf code response = session.get("https://www.zhihu.com", headers=header) response_text = response.text match_obj = re.match('.*name="_xsrf" value="(.*?)"', response_text, re.DOTALL) xsrf = '' if match_obj: xsrf = (match_obj.group(1)) return xsrf
def get_index(): response = session.get("https://www.zhihu.com", headers=header) with open("index_page.html", "wb") as f: f.write(response.text.encode("utf-8")) print ("ok")
def zhihu_login(account, password): #???? if re.match("^1\d{10}",account): print ("??????") post_url = "https://www.zhihu.com/login/phone_num" post_data = { "_xsrf": get_xsrf(), "phone_num": account, "password": password, "captcha":get_captcha() } else: if "@" in account: #?????????? print("??????") post_url = "https://www.zhihu.com/login/email" post_data = { "_xsrf": get_xsrf(), "email": account, "password": password } response_text = session.post(post_url, data=post_data, headers=header) session.cookies.save() # get_index() # is_login() # get_captcha()
def get_http_client(): if config['use_tor_proxy']: session = requesocks.session() session.proxies = {'http': 'socks5://127.0.0.1:%d' % config['tor_proxy_port'], 'https': 'socks5://127.0.0.1:%d' % config['tor_proxy_port']} return session else: return requests.session() # ??get??
def __init__(self, username=None, password=None, auto_login=False, get_devices=False, get_automations=False): """Init Abode object.""" self._username = username self._password = password self._session = None self._token = None self._panel = None self._user = None self._event_controller = AbodeEventController(self) self._default_alarm_mode = CONST.MODE_AWAY self._devices = None self._automations = None # Create a requests session to persist the cookies self._session = requests.session() if (self._username is not None and self._password is not None and auto_login): self.login() if get_devices: self.get_devices() if get_automations: self.get_automations()
def logout(self): """Explicit Abode logout.""" if self._token: header_data = { 'ABODE-API-KEY': self._token } self._session = requests.session() self._token = None self._panel = None self._user = None self._devices = None self._automations = None try: response = self._session.post( CONST.LOGOUT_URL, headers=header_data) response_object = json.loads(response.text) except OSError as exc: _LOGGER.warning("Caught exception during logout: %s", str(exc)) return False if response.status_code != 200: raise AbodeAuthenticationException( (response.status_code, response_object['message'])) _LOGGER.debug("Logout Response: %s", response.text) _LOGGER.info("Logout successful") return True
def pinned_session(pool_maxsize=8): http_adapter = _SSLAdapter(pool_connections=4, pool_maxsize=pool_maxsize) _session = requests.session() _session.mount('https://', http_adapter) return _session
def zw_init(url='https://127.0.0.1/', user='test_user', pswd='test_password'): zwareGlobals.zwareSession = requests.session() zwareGlobals.zwareUrl = url zwareGlobals.zwareSession.headers.update({'Content-Type':'application/x-www-form-urlencoded'}) # apache requires this zw_api('register/login.php', 'usrname='+ user + '&passwd=' + pswd) zwareGlobals.zwareUrl += 'cgi/zcgi/networks//' return zw_api('zw_version')
def main_craw_ptt(i,ptt_class_name,sql_name,bo): #ptt_class_name = 'Soft_Job' index_name = 'http://www.ptt.cc' index_class = '/bbs/' + ptt_class_name + '/index' # i=4806, i=18 index_url = index_name + index_class +str(i)+'.html' # ?? i ??? #res = requests.get(index_url,verify = False) res = requests.get(index_url,verify = True) # ?? html ??? soup = BeautifulSoup(res.text, "lxml")# html???? ??????? temp = soup.find_all("",{'class':'r-ent'}) for i in range( len( temp ) ): # i=0 len( temp ) #print(i) temp2 = temp[i].find('a') if( str( temp2 ) == 'None' ):# ??????? ? return error, ?????????? print('error') elif( str( temp2 ) != 'None' ):# ??????? #print(i) article_url = temp[i].find('a')['href']# ????? article_url = index_name+article_url# ? index ?? title = temp[i].find('a').get_text()# ? title # article_url = 'https://www.ptt.cc/bbs/Soft_Job/M.1503652456.A.526.html' response = requests.session().get( article_url )#response, ????, 200???? if( response.status_code == 404 ): print(404) elif( re.search('[??]',title) ):# ???? print('[??]') elif( response.status_code == 200 ):# 200???? if(bo == 'new'):# ??data, ??????, # max date time ??, ??sql?? max time, ??????, ?? date_time = catch_ptt_history_date_time(ptt_class_name,sql_name) max_date_time = date_time elif(bo == 'his'):# ?????, ???????, ????? max_date_time = 0 tem = craw_ptt_data_fun(article_url,temp,i,index_url,sql_name,max_date_time,bo) else: print('other') #--------------------------------------------------------------------------------- # ???? data, ????????? index=100 ?, ?5????error, ???6?????
def fix_data(i,ptt_class_name,sql_name,bo,j): #ptt_class_name = 'Soft_Job' index_name = 'http://www.ptt.cc' index_class = '/bbs/' + ptt_class_name + '/index' # i=4806, i=18 index_url = index_name + index_class +str(i)+'.html' #res = requests.get(index_url,verify = False) # index_url = 'http://www.ruten.com.tw/' res = requests.get(index_url,verify = True) soup = BeautifulSoup(res.text, "lxml") temp = soup.find_all("",{'class':'r-ent'}) for i in range( j,len( temp ) ): # i=12 len( temp ) #print(i) temp2 = temp[i].find('a') if( str( temp2 ) == 'None' ): print('error') elif( str( temp2 ) != 'None' ): #print(i) article_url = temp[i].find('a')['href'] article_url = index_name+article_url title = temp[i].find('a').get_text() # article_url = 'https://www.ptt.cc/bbs/Soft_Job/M.1503652456.A.526.html' response = requests.session().get( article_url ) if( response.status_code == 404 ): print(404) elif( re.search('[??]',title) ): print('[??]') elif( response.status_code == 200 ): if(bo == 'new'): date_time = catch_ptt_history_date_time(ptt_class_name,sql_name) max_date_time = max(date_time) elif(bo == 'his'): max_date_time = 0 tem = craw_ptt_data_fun(article_url,temp,i,index_url,sql_name,max_date_time,bo) else: print('other') #--------------------------------------------------------------------------------- # ?????, ??????
def __init__(self, instance, server_login, dedi_code, path, pack_mask, server_version, server_build, game='TM2'): """ Initiate dedi api. :param instance: ControllerInstance :param server_login: . :param dedi_code: . :param path: . :param pack_mask: . :param server_version: . :param server_build: . :param game: Game info :type instance: pyplanet.core.instance.Instance """ self.instance = instance self.loop = instance.loop self.client = requests.session() self.headers = { 'User-Agent': 'PyPlanet/{}'.format(version), 'Accept': 'text/xml', 'Accept-Encoding': 'gzip', 'Content-Type': 'text/xml; charset=UTF-8', 'Content-Encoding': 'gzip', 'Keep-Alive': 'timeout=600, max=2000', 'Connection': 'Keep-Alive', } self.server_login = server_login self.dedimania_code = dedi_code self.path = path self.pack_mask = pack_mask self.server_version = server_version self.server_build = server_build self.game = game self.update_task = None self.session_id = None self.retries = 0
def create_google_session(self): """Summary Returns: TYPE: Description """ session = requests.session () login_html = session.get ( DataManagement.__GOOGLE_ACCOUNT_URL ) #Check cookies returned because there is an issue with the authentication #GAPS , GALX , NID - These cookies are used to identify the user when using Google + functionality. #GAPS is still provided self.logger.debug(session.cookies.get_dict ().keys ()) try: galx = session.cookies['GALX'] except: self.logger.error('No cookie GALX') soup_login = BeautifulSoup ( login_html.content , 'html.parser' ).find ( 'form' ).find_all ( 'input' ) payload = {} for u in soup_login: if u.has_attr ( 'value' ): payload[u['name']] = u['value'] payload['Email'] = self.__username payload['Passwd'] = self.__password auto = login_html.headers.get ( 'X-Auto-Login' ) follow_up = unquote ( unquote ( auto ) ).split ( 'continue=' )[-1] #Commented as suggested in https://github.com/tracek/gee_asset_manager/issues/36 #galx = login_html.cookies['GALX'] payload['continue'] = follow_up # Commented as suggested in https://github.com/tracek/gee_asset_manager/issues/36 #payload['GALX'] = galx session.post ( DataManagement.__AUTHENTICATION_URL , data=payload ) return session
def __upload_image(self,file_path,session,upload_url,image_name,properties,nodata): """Summary Args: file_path (TYPE): Description session (TYPE): Description upload_url (TYPE): Description image_name (TYPE): Description properties (TYPE): Description nodata (TYPE): Description """ with open ( file_path , 'rb' ) as f: files = {'file': f} resp = session.post ( upload_url , files=files ) gsid = resp.json ()[0] asset_data = {"id": image_name , "tilesets": [ {"sources": [ {"primaryPath": gsid , "additionalPaths": []} ]} ] , "bands": [] , "properties": properties , "missingData": {"value": nodata} } task_id = ee.data.newTaskId ( 1 )[0] _ = ee.data.startIngestion ( task_id , asset_data )
def data_management(self,session,upload_url,assets_names,file_path, properties, nodata): """Summary Args: session (TYPE): Description upload_url (TYPE): Description assets_names (TYPE): Description file_path (TYPE): Description properties (TYPE): Description nodata (TYPE): Description """ file_root = file_path.split ( "/" )[-1].split ( "." )[0] image_name = self.asset_path + '/%s' % file_root already_uploaded = False if file_root in assets_names: self.logger.error("%s already in collection" % file_root) already_uploaded = True #if name file already in that asset it throws an error if os.path.exists ( file_path ) and not already_uploaded: self.__upload_image(file_path , session , upload_url , image_name,properties,nodata) else: self.logger.debug('%s already uploaded in GEE - Deleting old file' % file_root) self.__delete_image(image_name) self.__upload_image ( file_path , session , upload_url , image_name , properties , nodata )
def __init__(self): self.inventory = defaultdict(list) # A list of groups and the hosts in that group self.cache = dict() # Details about hosts in the inventory self.params = dict() # Params of each host self.facts = dict() # Facts of each host self.hostgroups = dict() # host groups self.session = None # Requests session self.config_paths = [ "/etc/ansible/foreman.ini", os.path.dirname(os.path.realpath(__file__)) + '/foreman.ini', ] env_value = os.environ.get('FOREMAN_INI_PATH') if env_value is not None: self.config_paths.append(os.path.expanduser(os.path.expandvars(env_value)))