我们从Python开源项目中,提取了以下47个代码示例,用于说明如何使用apiclient.discovery.build()。
def _build_google_client(service, api_version, http_auth): """ Google build client helper. :param service: service to build client for :type service: ``str`` :param api_version: API version to use. :type api_version: ``str`` :param http_auth: Initialized HTTP client to use. :type http_auth: ``object`` :return: google-python-api client initialized to use 'service' :rtype: ``object`` """ client = build(service, api_version, http=http_auth) return client
def main(): """Shows basic usage of the Google Admin SDK Reports API. Creates a Google Admin SDK Reports API service object and outputs a list of last 10 login events. """ credentials = get_credentials() http = credentials.authorize(httplib2.Http()) service = discovery.build('admin', 'reports_v1', http=http) print('Getting the last 10 login events') results = service.activities().list(userKey='all', applicationName='login', maxResults=10).execute() activities = results.get('items', []) if not activities: print('No logins found.') else: print('Logins:') for activity in activities: print('{0}: {1} ({2})'.format(activity['id']['time'], activity['actor']['email'], activity['events'][0]['name']))
def execute(self): """ Returns GSuite events based on given app/activity. Other parameters are optional. """ logging.debug("Authenticating to GSuite") self.get_credentials() self.http = self.credentials.authorize(httplib2.Http()) self.service = discovery.build('admin', 'reports_v1', http=self.http) logging.debug("Retrieving %s events from: %s to %s", self.app, convert_time(self.s_time), convert_time(self.e_time)) self.results = self.service.activities().list(userKey=self.user, applicationName=self.app, startTime=self.s_time, endTime=self.e_time, maxResults=self.max).execute() return self.results.get('items', [])
def main(): """Shows basic usage of the Google Drive API. Creates a Google Drive API service object and outputs the names and IDs for up to 10 files. """ credentials = get_credentials() http = credentials.authorize(httplib2.Http()) service = discovery.build('drive', 'v3', http=http) results = service.files().list( pageSize=10, fields="nextPageToken, files(id, name)").execute() items = results.get('files', []) if not items: print('No files found.') else: print('Files:') for item in items: print('{0} ({1})'.format(item['name'], item['id']))
def send(To, Subject, Body, Cc=[], Bcc=[], html=False, files=[]): """Send an email """ subtype = 'html' if html else 'plain' message = MIMEMultipart() message['To'] = ', '.join(To) message['Subject'] = Subject message['Cc'] = ', '.join(Cc) message['Bcc'] = ', '.join(Bcc) message.attach(MIMEText(Body, subtype)) for f in files: with open(f, "rb") as In: part = MIMEApplication(In.read(), Name=basename(f)) part['Content-Disposition'] = 'attachment; filename="%s"' % basename(f) message.attach(part) message = {'raw': base64.urlsafe_b64encode(message.as_string())} credentials = oauth2client.file.Storage(CREDENTIALS_PATH).get() Http = credentials.authorize(httplib2.Http()) service = discovery.build('gmail', 'v1', http=Http) message = service.users().messages().send(userId='me', body=message).execute()
def main(): """ Shows basic usage of the Google Calendar API. Creates a Google Calendar API service object, logs the query from the arguments, and creates an event with Quick Add. """ credentials = get_credentials() http = credentials.authorize(httplib2.Http()) service = discovery.build('calendar', 'v3', http=http) if len(sys.argv) > 1: with open("log.txt", "a") as logfile: logfile.write(flags.query + "\n") created_event = service.events().quickAdd( calendarId='primary', text=flags.query ).execute()
def image_urls_api_call(search_term, previously_captured_num, imgSize="medium"): """returns list of image urls (with faces) from google image search on search_term""" try: key = os.environ['GOOGLE_CUSTOM_SEARCH_KEY'] service = build("customsearch", "v1", developerKey=key) res = service.cse().list( q=search_term, cx='018267786259991380019:ja65luoszbg', # my custom search engine id. searchType='image', filter='1', num=10, imgSize=imgSize, imgType="face", fileType="jpg", start = previously_captured_num ).execute() links = [item['link'] for item in res['items']] print('Added links for ' + search_term) return(links) except: print('Failed to get links for ' + search_term) return []
def authenticate(): global service global http_auth global calendar try: # read credentials credentials = ServiceAccountCredentials.from_json_keyfile_name(CREDENTIAL_FILE_PATH, scopes) # authorize and get the calendar service http_auth = credentials.authorize(Http()) service = discovery.build('calendar', 'v3', http=http_auth) calendar = service.calendars().get(calendarId=CALENDAR_ID).execute() except: logging.getLogger('BoilerLogger').error('failed to authenticate to google calendar service, will retry...') init() # get calendar events in a window sorted by start time
def initialize_analyticsreporting(): """Initializes an analyticsreporting service object. Returns: analytics an authorized analyticsreporting service object. """ credentials = ServiceAccountCredentials.from_p12_keyfile( settings.SERVICE_ACCOUNT_EMAIL, settings.KEY_FILE_LOCATION, scopes=SCOPES ) http = credentials.authorize(httplib2.Http()) # Build the service object. analytics = build('analytics', 'v4', http=http, discoveryServiceUrl=DISCOVERY_URI) return analytics
def youtube_search(q, max_results): youtube = build(settings.YOUTUBE_API_SERVICE_NAME, settings.YOUTUBE_API_VERSION, developerKey=settings.DEVELOPER_KEY) search_response = youtube.search().list(q=q, part="id,snippet", maxResults=max_results).execute() videos = [] for search_result in search_response.get("items", []): if search_result["id"]["kind"] == "youtube#video": video = {} video['title'] = search_result["snippet"]['title'] video['description'] = search_result["snippet"]['description'] video['date'] = await youtube_date(search_result["snippet"]['publishedAt']) video['channel_title'] = search_result["snippet"]['channelTitle'] video['channel_id'] = search_result["snippet"]['channelId'] video['video_id'] = search_result["id"]["videoId"] videos.append(video) return videos
def get_google_service(service_type=None,version=None): ''' get_google service will use the requests library to get a url :param service_type: the service to get (default is storage) :param version: version to use (default is v1) ''' if service_type == None: service_type = "sheets" if version == None: version = "v4" secrets=os.environ.get('GOOGLE_SHEETS_CREDENTIALS') if secrets is not None: return get_authenticated_service(secrets, service_type, version) credentials = GoogleCredentials.get_application_default() return discovery.build(service_type, version, credentials=credentials)
def login_required(f): @wraps(f) def decorated_function(*args, **kwargs): credentials = session.get('credentials', None) if credentials is None: return redirect(url_for('youtube.login')) credentials = client.OAuth2Credentials.from_json(credentials) if credentials.access_token_expired: return redirect(url_for('youtube.login')) http_auth = credentials.authorize(httplib2.Http()) g.youtube = build('youtube', 'v3', http=http_auth) return f(*args, **kwargs) return decorated_function
def build_service(self): """ Build service for connect to google fusiontables. """ logger.info('Build service for connect to google fusiontables server.') try: credentials = ServiceAccountCredentials.from_json_keyfile_name( self.PATH_TO_KEY, scopes=['https://www.googleapis.com/auth/fusiontables'] ) http_auth = credentials.authorize(Http()) except IOError: raise IncorrectAccessFilePathException( "Access key path '{0}' is incorrect.".format( self.PATH_TO_KEY ) ) self.SERVICE = build('fusiontables', 'v2', http=http_auth)
def initialize_analyticsreporting(): """Initializes an analyticsreporting service object. Returns: analytics an authorized analyticsreporting service object. """ credentials = ServiceAccountCredentials.from_p12_keyfile( SERVICE_ACCOUNT_EMAIL, KEY_FILE_LOCATION, scopes=SCOPES) http = credentials.authorize(httplib2.Http()) # Build the service object. analytics = build('analytics', 'v4', http=http, discoveryServiceUrl=DISCOVERY_URI) return analytics
def auth(): gauth = GoogleAuth() gauth.LoadCredentialsFile("letitrain-creds-gdrive.txt") if gauth.credentials is None: gauth.LocalWebserverAuth() elif gauth.access_token_expired: gauth.Refresh() else: gauth.Authorize() gauth.SaveCredentialsFile("letitrain-creds-gdrive.txt") httpauth = gauth.Get_Http_Object() service = discovery.build('drive', 'v3', http=httpauth) return gauth, httpauth, service # Retrieves the information about every file # Can either do deleted or regular files
def oauth2_drive_service(): """ Used to get valid user credentials from storage that returns a Google Drive service object with authorized API access. Do NOT perform OAuth2 flow to obtain new credentials if nothing has been stored, or if the stored credentials are invalid. Use another script named ct_gdrive_oauth2.py for that. """ # Get credentials from storage creds_path = os.path.join(args.creds_dir, OAUTH2_STORAGE_CREDS_FILENAME) credentials = oauth2client.file.Storage(creds_path).get() if not credentials or credentials.invalid: raise Exception('Unauthorized Access!') # Authorize http requests http = credentials.authorize(httplib2.Http()) # Return an authorized Drive APIv3 service object return discovery.build('drive', 'v3', http=http)
def authcheck(): scope='https://www.googleapis.com/auth/userinfo.email' try: credentials = GoogleCredentials.get_application_default() if credentials.create_scoped_required(): credentials = credentials.create_scoped(scope) except ApplicationDefaultCredentialsError: return "Unable to acquire application default credentials" http = httplib2.Http() credentials.authorize(http) service = build(serviceName='oauth2', version= 'v2',http=http) resp = service.userinfo().get().execute() return resp['email']
def main(): credentials = get_credentials() http = credentials.authorize(httplib2.Http()) service = discovery.build('calendar', 'v3', http=http) now = datetime.utcnow().isoformat() + 'Z' # 'Z' indicates UTC time print('???5?????????') eventsResult = service.events().list( calendarId='primary', timeMin=now, maxResults=5, singleEvents=True, orderBy='startTime').execute() events = eventsResult.get('items', []) if not events: print('No upcoming events found.') for event in events: start = event['start'].get('dateTime', event['start'].get('date')) print(start, event['summary'])
def main(): with open(flags.config) as input: config = json.load(input) if not config.get('disable_collection', False): logger.info('Sending version information to stitchdata.com. ' + 'To disable sending anonymous usage data, set ' + 'the config parameter "disable_collection" to true') threading.Thread(target=collect).start() credentials = get_credentials() http = credentials.authorize(httplib2.Http()) discoveryUrl = ('https://sheets.googleapis.com/$discovery/rest?' 'version=v4') service = discovery.build('sheets', 'v4', http=http, discoveryServiceUrl=discoveryUrl) spreadsheet = get_spreadsheet(service, config['spreadsheet_id']) input = io.TextIOWrapper(sys.stdin.buffer, encoding='utf-8') state = None state = persist_lines(service, spreadsheet, input) emit_state(state) logger.debug("Exiting normally")
def get_service(api_name, api_version, scope, key_file_location, service_account_email): """Get a service that communicates to a Google API. Args: api_name: The name of the api to connect to. api_version: The api version to connect to. scope: A list auth scopes to authorize for the application. key_file_location: The path to a valid service account p12 key file. service_account_email: The service account email address. Returns: A service that is connected to the specified API. """ credentials = ServiceAccountCredentials.from_p12_keyfile( service_account_email, key_file_location, scopes=scope) http = credentials.authorize(httplib2.Http()) # Build the Google API service object. service = build(api_name, api_version, http=http) return service
def initYoutube(): flow = flow_from_clientsecrets(constants.YOUTUBE_CLIENT_SECRETS_FILE,\ message=constants.MISSING_CLIENT_SECRETS_MESSAGE,\ scope=constants.YOUTUBE_READ_WRITE_SCOPE, redirect_uri='http://localhost:8080/') storage = Storage("%s-oauth2.json" % sys.argv[0]) credentials = storage.get() if credentials is None or credentials.invalid: flags = argparser.parse_args() credentials = run_flow(flow, storage, flags) else: print("Found valid oauth2 json") youtube = build(constants.YOUTUBE_API_SERVICE_NAME, constants.YOUTUBE_API_VERSION, http=credentials.authorize(httplib2.Http())) print("Done authentication with Youtube") return youtube
def __init__(self, config, logger): """ Constructor :param config: Configuration dict :param logger: Python logger """ # Suppress cache warnings from gogogle api lib logging.getLogger('googleapiclient.discovery_cache').setLevel(logging.ERROR) self._client_secret_file = os.path.join(config['credentials_dir'], config['client_secret_file_name']) self._credentials_file = os.path.join(config['credentials_dir'], config['credentials_file_name']) self._logger = logger self._config = config self._credentials = self._get_credentials() # Bootstrap the Gmail client service http = self._credentials.authorize(httplib2.Http()) self._service = discovery.build('gmail', 'v1', http=http)
def main(): """Shows basic usage of the Google Calendar API. Creates a Google Calendar API service object and outputs a list of the next 10 events on the user's calendar. """ rValue = "" credentials = get_credentials() http = credentials.authorize(httplib2.Http()) service = discovery.build('calendar', 'v3', http=http) now = datetime.datetime.utcnow().isoformat() + 'Z' # 'Z' indicates UTC time rValue = rValue + 'Getting the upcoming 10 events\n' eventsResult = service.events().list( calendarId='primary', timeMin=now, maxResults=10, singleEvents=True, orderBy='startTime').execute() events = eventsResult.get('items', []) if not events: rValue = rValue + 'No upcoming events found.\n' for event in events: start = event['start'].get('dateTime', event['start'].get('date')) rValue = rValue + start + " " + event['summary'] + '\n' return rValue
def get_youtube_vid_id(self, query, max_results=10): """ Makes a request to youtube API with a search query and returns the corresponding video's id. :param query: search query of type string to be used for searching youtube. :param max_results: The maximum results returned by searching youtube :returns: The movie id of the first video came up in the youtube search """ youtube = build(Connection.YOUTUBE_API_SERVICE_NAME, Connection.YOUTUBE_API_VERSION, developerKey=self.keys['google']) search_response = youtube.search().list( q=query, part="id,snippet", maxResults=max_results ).execute() for search_result in search_response.get("items", []): if search_result["id"]["kind"] == "youtube#video": return search_result["id"]["videoId"] else: return None
def get_training_features(potential_event, fb_event, fb_event_attending): if 'owner' in fb_event['info']: owner_name = 'id%s' % fb_event['info']['owner']['id'] else: owner_name = '' location = event_locations.get_address_for_fb_event(fb_event).encode('utf-8') def strip_text(s): return strip_punctuation(s.encode('utf8')).lower() name = strip_text(fb_event['info'].get('name', '')) description = strip_text(fb_event['info'].get('description', '')) attendee_list = ' '.join(['id%s' % x['id'] for x in fb_event_attending['attending']['data']]) source_list = ' '.join('id%s' % x.id for x in potential_event.source_ids_only()) #TODO(lambert): maybe include number-of-keywords and keyword-density? #TODO(lambert): someday write this as a proper mapreduce that reduces across languages and builds a classifier model per language? # for now we can just grep and build sub-models per-language on my client machine. return (attendee_list,) return (potential_event.language, owner_name, location, name, description, attendee_list, source_list)
def __init__(self, wf): """Construct a new GoogleInterface. Checks Auth status and if unauthorized will prompt for authorization""" self.HTTP_INSTANCE = httplib2.Http(**get_http_kw_args(wf)) self.log = wf.logger self.wf = wf self.CLIENT_SECRET_FILE = 'client_secret.json' self.APPLICATION_NAME = 'Alfred Today' self.SCOPES = 'https://www.googleapis.com/auth/calendar.readonly' credentials = self._get_credentials() if not credentials or credentials.invalid: self._authorize_google() http = credentials.authorize(self.HTTP_INSTANCE) self.service = discovery.build('calendar', 'v3', http=http)
def __init__(self): try: self._flags = argparse.ArgumentParser(parents=[tools.argparser]).parse_args() # Prevent a browser from opening self._flags.noauth_local_webserver = True except ImportError: self._flags = None ## Init logging self._logger = self.create_logger(logging_filename=LOGGING_FILE) logging.getLogger('googleapiclient.discovery_cache').setLevel(logging.ERROR) ## Get credentials, build calendar service object mkdir_p(CREDENTIALS_DIR) credentials = self.get_credentials(CLIENT_SECRET_FILE, SCOPES, CREDENTIALS_DIR, CREDENTIALS_FILENAME) self._service = self.get_service(credentials) self.naive_find_event_overlap()
def main(): """Shows basic usage of the Google Calendar API. Creates a Google Calendar API service object and outputs a list of the next 10 events on the user's calendar. """ credentials = get_credentials() http = credentials.authorize(httplib2.Http()) service = discovery.build('calendar', 'v3', http=http) now = datetime.datetime.utcnow().isoformat() + 'Z' # 'Z' indicates UTC time print('Getting the upcoming 10 events') eventsResult = service.events().list( calendarId='primary', timeMin=now, maxResults=10, singleEvents=True, orderBy='startTime').execute() events = eventsResult.get('items', []) if not events: print('No upcoming events found.') for event in events: start = event['start'].get('dateTime', event['start'].get('date')) print(start, event['summary'])
def config_g_tasks(user, http_auth): service = discovery.build('tasks', 'v1', http=http_auth) options = [] results = service.tasklists().list( maxResults=10).execute() logging.debug(results) if results: options = [{"value": r.get('id'), "label": r.get('title')} for r in results.get('items', [])] return { "input": "select", "multi": False, "prop": "taskList", "instructions": "Choose a task list", "options": options } # Fetch classes
def getRemoteFolderInfo(Resource_ID): """ Method for retrieving metadata about a file or folder on Google Drive. Args: Resource_ID: string. Unique ID representing the resource Returns: FolderInfo: JSON Fromtted files resource. """ http = Authorize() service = discovery.build('drive', 'v2', http=http) logging.debug("DRIVEQUERY: METADATA: Requesting info for %s" % Resource_ID) print(Resource_ID) FolderInfo = service.files().get(fileId=Resource_ID[0]).execute() logging.debug("DRIVEQUERY: METADATA: Recieved info for %s" % FolderInfo['title']) return FolderInfo
def shortenUrl(self, URL): """ URL Shortener function that when combined with the uploading script adds a new key:value to the JSON response with a much more managable URL. Args: URL: string. URL parsed from JSON response """ http = Authorize() service = discovery.build('urlshortener', 'v1', http=http) url = service.url() body = { 'longUrl': URL } response = url.insert(body=body).execute() logging.debug("URLSHRINK: %s" % response) short_url = response['id'] logging.debug("URLSHRINK: %s" % short_url) return short_url
def main(): credentials = get_credentials() http = credentials.authorize(httplib2.Http()) service = discovery.build('calendar', 'v3', http=http) now = datetime.datetime.utcnow().isoformat() + 'Z' # 'Z' indicates UTC time print('Getting the events') eventsResult = service.events().list( calendarId='primary', timeMin=now, singleEvents=True, orderBy='startTime', timeMax = "2016-11-20T22:32:00.285773Z").execute() events = eventsResult.get('items', []) if not events: print('No upcoming events found.') for event in events: if ("Class of " in event["summary"] or "Lab of " in event["summary"]): service.events().delete(calendarId='primary', eventId=event["id"]).execute() print ("Deleted: ", event["summary"], event["start"]) print ("Deletion done!")
def main(): """Shows basic usage of the Google Calendar API. Creates a Google Calendar API service object and outputs a list of the next 10 events on the user's calendar. """ credentials = get_credentials() http = credentials.authorize(httplib2.Http()) service = discovery.build('calendar', 'v3', http=http) CALENDAR_NAME = 'csv-to-calendar' calendarID = initCalendar(service, CALENDAR_NAME) mat = getMatrixFromCSV('timetable.csv') events = parseMatrixIntoEvents(mat) for x in events: uploadEvent(service, x, calendarID)
def get_google_api_client(module, service, user_agent_product, user_agent_version, scopes=None, api_version='v1'): """ Get the discovery-based python client. Use when a cloud client is not available. client = get_google_api_client(module, 'compute', user_agent_product=USER_AGENT_PRODUCT, user_agent_version=USER_AGENT_VERSION) :returns: A tuple containing the authorized client to the specified service and a params dict {'service_account_email': '...', 'credentials_file': '...', 'project_id': ...} :rtype: ``tuple`` """ if not scopes: scopes = GCP_DEFAULT_SCOPES http_auth, conn_params = get_google_api_auth(module, scopes=scopes, user_agent_product=user_agent_product, user_agent_version=user_agent_version) client = build(service, api_version, http=http_auth) return (client, conn_params)
def Write(gs_path, data, suffix='.txt'): """Writes data to the cloud.""" credentials = GoogleCredentials.get_application_default() service = discovery.build('storage', 'v1', credentials=credentials) bucket_name, object_name = gs_path[5:].split('/', 1) logging.info('Uploading file: %s/%s', bucket_name, object_name) # Save the data off into a temp file. tfile = tempfile.NamedTemporaryFile(delete=False, suffix=suffix) tfile.write(data) tfile.close() # Upload the data. logging.info('filename: %s %s %s', tfile.name, object_name, bucket_name) req = service.objects().insert(media_body=tfile.name, name=object_name, bucket=bucket_name) _ = req.execute() # Cleanup. os.remove(tfile.name)
def google_search(key_word): service = build("customsearch", "v1", developerKey="??KEY") res = service.cse().list( q=key_word, cx='????ID', num=10, #Valid values are integers between 1 and 10, inclusive. ).execute() for value in res: #print(value) if 'items' in value: for results in res[value]: #print(results) print(results['formattedUrl'])
def build_service(self): ok = False logging.debug("Building %s service for %s (%s)" % (self.credential_type, self.api, self.version)) kwargs = {} if self.credential_type == 'user': if not self.http_auth: self.get_http_auth() kwargs['http'] = self.http_auth ok = bool(self.http_auth) else: kwargs['credentials'] = self.credentials ok = bool(self.credentials) self.service = discovery.build(self.api, self.version, **kwargs) if not ok: logging.warning("Failed to build service for %s (%s) - Credential failure?" % (self.api, self.version)) return ok
def get_google_api_service(service_name, service_version, scopes): '''Use Google Client API to get a Google API Service. :param service_name: the name of requested service, e.g. 'sheets' :param service_version: the version of requested service, e.g. 'v4' :param scopes: the authentication requested. :return: googleapiclient.discovery.Resource Object, tied to google service API. ''' try: return discovery.build( service_name, service_version, http=get_authorized_http_object(scopes) ) except AttributeError: # config variables are missing # todo: write as real exception raise
def get_authenticated_service(): ''' Authorize the request and store authorization credentials. ''' flow = flow_from_clientsecrets(CLIENT_SECRETS_FILE, scope=YOUTUBE_READ_WRITE_SSL_SCOPE, message=MISSING_CLIENT_SECRETS_MESSAGE) storage = Storage("%s-oauth2.json" % sys.argv[0]) credentials = storage.get() if credentials is None or credentials.invalid: credentials = run_flow(flow, storage) # Trusted testers can download this discovery document from the # developers page and it should be in the same directory with the code. return build(API_SERVICE_NAME, API_VERSION, http=credentials.authorize(httplib2.Http()))
def get_authenticated_service(): '''Authorize the request and store authorization credentials to YouTube''' flow = flow_from_clientsecrets(CLIENT_SECRETS_FILE, scope=YOUTUBE_READ_WRITE_SSL_SCOPE, message=MISSING_CLIENT_SECRETS_MESSAGE) storage = Storage("%s-oauth2.json" % sys.argv[0]) credentials = storage.get() if credentials is None or credentials.invalid: credentials = run_flow(flow, storage) # Trusted testers can download this discovery document from the # developers page and it should be in the same directory with the code. return build(API_SERVICE_NAME, API_VERSION, http=credentials.authorize(httplib2.Http()))
def get_authenticated_service(self): """ Create youtube oauth2 connection """ credentials = AccessTokenCredentials( access_token=self.get_auth_code(), user_agent='Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/56.0.2924.87 Safari/537.36' ) return build( 'youtube', 'v3', http=credentials.authorize(httplib2.Http()) )
def connect(self ): """ Establish connection """ config = self.configuration.get_config() self.client = build( config["apiServiceName"], config["apiVersion"], developerKey=config["developerKey"])
def create_pubsub_client(http=None): credentials = oauth2client.GoogleCredentials.get_application_default() if credentials.create_scoped_required(): credentials = credentials.create_scoped(PUBSUB_SCOPES) if not http: http = httplib2.Http() credentials.authorize(http) return discovery.build('pubsub', 'v1', http=http)
def _fetch_all_youtube_videos(self, playlistId): """ Fetches a playlist of videos from youtube We splice the results together in no particular order Parameters: parm1 - (string) playlistId Returns: playListItem Dict """ YOUTUBE_API_SERVICE_NAME = "youtube" YOUTUBE_API_VERSION = "v3" youtube = build(YOUTUBE_API_SERVICE_NAME, YOUTUBE_API_VERSION, developerKey=self.youtube_api_server_key) res = youtube.playlistItems().list( part="snippet", playlistId=playlistId, maxResults="50" ).execute() nextPageToken = res.get('nextPageToken') while ('nextPageToken' in res): nextPage = youtube.playlistItems().list( part="snippet", playlistId=playlistId, maxResults="50", pageToken=nextPageToken ).execute() res['items'] = res['items'] + nextPage['items'] if 'nextPageToken' not in nextPage: res.pop('nextPageToken', None) else: nextPageToken = nextPage['nextPageToken'] return res
def __init__(self, config): self._config = config credentials = self._get_credentials(config) self._http = credentials.authorize(httplib2.Http(cache=".cache")) self._service = discovery.build('drive', 'v3', http=self._http) self._appliances = get_appliances()