我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用datetime.datetime.strptime()。
def convertToDate(fmt="%Y-%m-%d"): """ Helper to create a parse action for converting parsed date string to Python datetime.date Params - - fmt - format to be passed to datetime.strptime (default=C{"%Y-%m-%d"}) Example:: date_expr = pyparsing_common.iso8601_date.copy() date_expr.setParseAction(pyparsing_common.convertToDate()) print(date_expr.parseString("1999-12-31")) prints:: [datetime.date(1999, 12, 31)] """ def cvt_fn(s,l,t): try: return datetime.strptime(t[0], fmt).date() except ValueError as ve: raise ParseException(s, l, str(ve)) return cvt_fn
def convertToDatetime(fmt="%Y-%m-%dT%H:%M:%S.%f"): """ Helper to create a parse action for converting parsed datetime string to Python datetime.datetime Params - - fmt - format to be passed to datetime.strptime (default=C{"%Y-%m-%dT%H:%M:%S.%f"}) Example:: dt_expr = pyparsing_common.iso8601_datetime.copy() dt_expr.setParseAction(pyparsing_common.convertToDatetime()) print(dt_expr.parseString("1999-12-31T23:59:59.999")) prints:: [datetime.datetime(1999, 12, 31, 23, 59, 59, 999000)] """ def cvt_fn(s,l,t): try: return datetime.strptime(t[0], fmt) except ValueError as ve: raise ParseException(s, l, str(ve)) return cvt_fn
def getMoreInfo(self, nzb): """ Get details about a torrent. .. seealso:: MovieSearcher.correctRelease """ data = self.getHTMLData(nzb['detail_url']) soup = BeautifulSoup(data, 'html.parser') description = soup.find(id='description') if description: nzb['description'] = description.prettify() line = soup.find(text='Date de publication').parent.parent pub = line.find_all('td')[1] added = datetime.strptime(pub.getText().split('(')[0].strip(), '%d/%m/%Y %H:%M') nzb['age'] = (datetime.now() - added).days self.log.debug(nzb['age'])
def save_account(instance, content): username = content['username'] if not USERNAME_REGEX.match(username): username = '[invalid_username]' domain = instance.domain acct = "@" + username + "@" + domain if Account.query.filter_by(username=acct).count() != 0: return Account.query.filter_by(username=acct).first() else: creation_date = datetime.strptime(content['created_at'], "%Y-%m-%dT%H:%M:%S.%fZ") account = Account(mastodon_id=content['id'], username=acct, display_name=to_text(content['display_name']), creation_date=creation_date, note=to_text(content['note']), url=validate_url(content['url']), avatar=validate_url(content['avatar']), instance=instance, blacklisted=False) save(account) return account
def check_event_dates(self, request): """ Check if new event end is later then start. This validation is implemented here 'cause model validation can't work with multiple fields. :param requeset: django request instance. :type request: django.http.request.HttpRequest. :return: is event dates validated? :rtype: bool. """ start = datetime.strptime(request.data.get("start"), DATETIME_FORMAT) end = datetime.strptime(request.data.get("end"), DATETIME_FORMAT) return start < end
def __init__(self, segment): self.lab_number = segment[3][0] self.profile_code = segment[4][0][0][0] self.profile_description = segment[4][0][1][0] self.request_datetime = datetime.strptime( segment[6][0][:12], DATETIME_FORMAT ) self.observation_datetime = datetime.strptime( segment[7][0][:12], DATETIME_FORMAT ) self.last_edited = datetime.strptime( segment[22][0], DATETIME_FORMAT ) self.result_status = TEST_STATUS_MAPPING[segment[25][0]]
def __init__(self, segments): self.allergy_type = segments[2][0][0][0] self.allergy_type_description = segments[2][0][1][0] self.certainty_id = segments[2][0][3][0] self.certainty_description = segments[2][0][4][0] self.allergy_reference_name = segments[3][0][0][0] self.allergy_description = segments[3][0][1][0] self.allergen_reference_system = None if len(segments[3][0]) > 2: self.allergen_reference_system = segments[3][0][2][0] self.allergen_reference = None if len(segments[3][0]) > 3: self.allergen_reference = segments[3][0][3][0] self.status_id = segments[4][0][0][0] self.status_description = segments[4][0][1][0] self.diagnosis_datetime = datetime.strptime( segments[4][0][4][0], DATETIME_FORMAT ) self.allergy_start_datetime = datetime.strptime( segments[6][0], DATETIME_FORMAT )
def format_date(val, fmt='%m-%d-%Y'): """ Transform the input string to a datetime object :param val: the input string for date :param fmt: the input format for the date """ date_obj = None try: date_obj = datetime.strptime(val, fmt) except Exception as exc: log.warning("Problem formatting date: {} - {} due: {}" .format(val, fmt, exc)) return date_obj
def _last_activity_points(project): default = datetime(1970, 1, 1, 0, 0).strftime('%Y-%m-%dT%H:%M:%S') updated_datetime = (project.get('updated') or default) last_activity_datetime = (project.get('last_activity_raw') or default) updated_datetime = updated_datetime.split('.')[0] last_activity_datetime = last_activity_datetime.split('.')[0] updated = datetime.strptime(updated_datetime, '%Y-%m-%dT%H:%M:%S') last_activity = datetime.strptime(last_activity_datetime, '%Y-%m-%dT%H:%M:%S') most_recent = max(updated, last_activity) days_since_modified = (datetime.utcnow() - most_recent).days if days_since_modified < 1: return 50 if days_since_modified < 2: return 20 if days_since_modified < 3: return 10 if days_since_modified < 4: return 5 return 0
def handleEntryResponse(self, modul, entryIn ): assert self.cfg is not None appID = self.sink.getAppId() if not modul in self.cfg.keys(): print("RewriteDates cannot process a modul it doesn't know anything about: %s" % modul) return( modul, entryIn ) modulCfg = self.cfg[ modul ] for boneName, boneInfo in modulCfg: if "type" in boneInfo.keys() and boneInfo["type"]=="date": if boneName in entryIn.keys() and entryIn[boneName]: data = entryIn[boneName] if isinstance( data, unicode ) and data!="None": try: data = datetime.strptime(str( data ), "%d.%m.%Y %H:%M:%S") except: pass entryIn[boneName] = data return( modul, entryIn )
def _get_reviews_props(movie_code): cur_reviews_url = _REVIEWS_URL.format(code=movie_code) reviews_page = bs(urllib.request.urlopen(cur_reviews_url), "html.parser") reviews = reviews_page.find_all("td", {"class": "comment-summary"}) user_reviews = [] for review in reviews: try: rating = int(re.findall(_USER_REVIEW_RATING_REGEX, str(review))[0]) date_str = re.findall( r"on (\d{1,2} [a-zA-Z]+ \d{4})", str(review))[0] date = datetime.strptime(date_str, "%d %B %Y").date() contents = review.find_all( 'a', href=re.compile(r'reviews.+?'))[0].contents[0] user = review.find_all( 'a', href=re.compile(r'/user/.+?'))[1].contents[0] user_reviews.append({ 'score': rating, 'review_date': date, 'contents': contents, 'user': user }) except Exception: # pylint: disable=W0703 pass return {'imdb_user_reviews': user_reviews} # ==== crawling a movie profile ====
def _get_user_review_props(review): review_props = {} date_str = review.find_all("span", {"class": "date"})[0].contents[0] date_str = _parse_date_str(date_str) review_props['review_date'] = datetime.strptime( date_str, "%B %d, %Y").date() review_props['score'] = int(review.find_all( "div", {"class": "metascore_w"})[0].contents[0]) try: review_props['text'] = review.find_all( 'span', {'class': 'blurb blurb_expanded'})[0].contents[0].strip() except IndexError: review_props['text'] = review.find_all( 'div', {'class': 'review_body'})[0].contents[1].contents[0].strip() review_props['user'] = review.find_all( 'span', {'class': 'author'})[0].contents[0].contents[0] review_props['total_reactions'] = int(review.find_all( 'span', {'class': 'total_count'})[0].contents[0]) review_props['pos_reactions'] = int(review.find_all( 'span', {'class': 'yes_count'})[0].contents[0]) review_props['neg_reactions'] = review_props[ 'total_reactions'] - review_props['pos_reactions'] return review_props
def pathfilter(path, mindate): """ return whether path should not be processed based on mindate return value of False indicates no filtering return value of true indicates the path should be filtered """ if mindate == None: return False subdir = os.path.basename(path) try: if datetime.strptime(subdir, "%Y%m%d") < mindate: logging.debug("Skip(1) subdir %s", subdir) return True except ValueError: logging.debug("Skip(2) subdir %s", subdir) return True return False
def list_archive_timestamps(url, min_date, max_date, user_agent): """ List the available archive between min_date and max_date for the given URL """ logger.info('Listing the archives for the url {url}'.format(url=url)) # Construct the URL used to download the memento list parameters = {'url': url, 'output': 'json', 'from': min_date.strftime(WEB_ARCHIVE_TIMESTAMP_FORMAT), 'to': max_date.strftime(WEB_ARCHIVE_TIMESTAMP_FORMAT)} cdx_url = WEB_ARCHIVE_CDX_TEMPLATE.format(params=urlencode(parameters)) req = Request(cdx_url, None, {'User-Agent': user_agent}) with urlopen(req) as cdx: memento_json = cdx.read().decode("utf-8") timestamps = [] # Ignore the first line which contains column names for url_key, timestamp, original, mime_type, status_code, digest, length in json.loads(memento_json)[1:]: # Ignore archives with a status code != OK if status_code == '200': timestamps.append(datetime.strptime(timestamp, WEB_ARCHIVE_TIMESTAMP_FORMAT)) return timestamps
def main(): args = parse_args() logging.basicConfig(level=(logging.WARN if args.quiet else logging.INFO)) # Don't allow more than 10 concurrent requests to the wayback machine concurrency = min(args.concurrency, 10) # Scrape results are stored in a temporary folder if no folder specified target_folder = args.target_folder if args.target_folder else tempfile.gettempdir() logger.info('Writing scrape results in the folder {target_folder}'.format(target_folder=target_folder)) # Parse the period entered by the user (throws an exception if the dates are not correctly formatted) from_date = datetime.strptime(args.from_date, CLI_DATE_FORMAT) to_date = datetime.strptime(args.to_date, CLI_DATE_FORMAT) # The scraper downloads the elements matching the given xpath expression in the target folder scraper = Scraper(target_folder, args.xpath) # Launch the scraping using the scraper previously instantiated scrape_archives(args.website_url, scraper.scrape, from_date, to_date, args.user_agent, timedelta(days=args.delta), concurrency)
def load_from_file(filename): ''' Load and return data from file :param filename: path of the database.csv file :return: (date, latitude, longitude, magnitude) (np.array) ''' date, latitude, longitude, magnitude = [], [], [], [] with open(filename, "r") as f: f.readline() # Skip first line for line in f: elements = line.split(',') try: date.append(datetime.strptime("{} {}".format(elements[0], elements[1]), "%m/%d/%Y %H:%M:%S")) latitude.append(float(elements[2])) longitude.append(float(elements[3])) magnitude.append(elements[8]) except ValueError: pass return np.array(date), np.float32(latitude), np.float32(longitude), np.float32(magnitude)
def parse_block_header(fid): """ Parse the block header found in *fid*. Return a mapping containing the header information. """ block_header = {} for line in fid: if line.startswith('#'): break toks = line.split() key = '_'.join(toks[1:-1])[:-1] value = toks[-1] if key.endswith('date'): value = datetime.strptime(value, '%Y,%j,%H:%M:%S') block_header[key] = value return block_header
def iaga2df(iaga2002_fname, D_to_radians=True): """ Parser the magnetometer data record stored in the IAGA-2002 format file *iaga2002_fname*. If *D_to_radians*, declination data (D) are converted from degrees to radians. Return the tuple with the :class:`DataFrame` containing the data and header information """ with open(iaga2002_fname) as fid: # parse header header, cols = parse_header(fid) keys = ['B_' + x for x in cols] # parse data index = [] data_map = defaultdict(list) for line in fid: toks = line.split() dt = datetime.strptime(toks[0] + ' ' + toks[1], '%Y-%m-%d %H:%M:%S.%f') index.append(dt) data = map(convert_float, toks[3:]) for key_i, data_i in zip(keys, data): if key_i == 'B_D' and D_to_radians: data_i = math.radians(data_i) data_map[key_i].append(data_i) df = PD.DataFrame(index=index, data=data_map) return df, header
def read_sm_csv(csv_fname): """ Parse the SuperMAG CSV format data record *csv_fname*. For each station, store the information in pandas :class:`DataFrame`. Return a mapping between the station identifier and data frame. """ df = PD.read_csv(csv_fname, header=0, parse_dates=[0], date_parser=lambda x: datetime.strptime(x, '%Y-%m-%d %H:%M:%S'), index_col=0) df_map = {name: group for name, group in df.groupby('IAGA')} for df in df_map.itervalues(): del df['IAGA'] df.rename(columns={'N': 'B_N', 'E': 'B_E', 'Z': 'B_Z'}, inplace=True) return df_map
def __init__(self, p1c1_fname=P1C1_FNAME): """ Parse *p1c1_fname* and store DCB (in [TECU]) in the mapping :class:`datetime` -> ['svn', 'prn'] -> integer ID. """ super(P1C1Table, self).__init__() if p1c1_fname == P1C1_FNAME and not os.path.isfile(p1c1_fname): update_p1c1() with open(p1c1_fname) as fid: for line in fid: if line.startswith('#'): continue cols = line.split() date = datetime.strptime(cols[0], '%Y-%m-%d') prn = int(cols[1]) svn = int(cols[2]) CA_P_m = float(cols[3]) self.setdefault(date, {}).setdefault('prn', {})[prn] = CA_P_m self[date].setdefault('svn', {})[svn] = CA_P_m
def fname2date(rinex_fname): """ Return the :class:`datetime` associated with the RIENX file *rinex_fname* named according to the standard convention. """ basename = os.path.basename(rinex_fname) doy = basename[4:7] daily_or_hour = basename[7] yy = basename[9:11] dt = datetime.strptime(doy + yy, '%j%y') if daily_or_hour == '0': return dt elif daily_or_hour in [chr(x) for x in range(ord('a'), ord('x') + 1)]: return dt + timedelta(hours=ord(daily_or_hour) - ord('a')) else: raise ValueError('could not parse date from RINEX file name ' '{}'.format(rinex_fname))
def __init__(self, glo_status_fname=GLO_STATUS_FNAME): """ Parse *glo_status_fname* and store GLONASS status information. """ super(GLONASS_Status, self).__init__() if glo_status_fname == GLO_STATUS_FNAME and not os.path.isfile(glo_status_fname): update_glo_status() def parse_dt(date, time): if date == '0000-00-00' and time == '00:00': return None else: return datetime.strptime(date + ' ' + time, '%Y-%m-%d %H:%M') with open(glo_status_fname) as fid: for line in fid: if line.startswith('#'): continue toks = line.split() launch_dt = parse_dt(toks[0], toks[1]) start_dt = parse_dt(toks[2], toks[3]) end_dt = parse_dt(toks[4], toks[5]) slot, freq, plane, GLONASS, cosmos = map(int, toks[6:]) interval = DateTimeInterval.closed_open(start_dt, end_dt) info = StatusInfo(launch_dt, slot, freq, plane, GLONASS, cosmos) self.setdefault(slot, OrderedDict())[interval] = info
def parse(omni_fname, colspecs=COLSPECS, names=NAMES, na_values=NA_VALUES): """ Parse the OMNI data record *omni_fname* and return a :class:`DataFrame`. To parse, use the fixed columns *colspecs*, the column identifiers *names*, and acceptable NaN column mapping *na_values*. """ df = PD.read_fwf(omni_fname, colspecs=colspecs, header=None, names=names, na_values=na_values, parse_dates={'date': [0, 1, 2, 3]}, date_parser=lambda x: datetime.strptime(x, '%Y %j %H %M')) df.set_index('date', inplace=True) return df
def toDict(self): return {'user_id': self.user_id, 'project_id': self.project_id, 'is_admin': self.is_admin, 'read_deleted': self.read_deleted, 'roles': self.roles, 'remote_address': self.remote_address, 'timestamp': datetime.strptime(self.timestamp, '%Y-%m-%dT%H:%M:%S.%f'), 'request_id': self.request_id, 'auth_token': self.auth_token, 'quota_class': self.quota_class, 'user_name': self.user_name, 'service_catalog': self.service_catalog, 'project_name': self.project_name, 'instance_lock_checked': self.instance_lock_checked, 'tenant': self.tenant, 'user': self.user}
def objectHookHandler(json_dict): for key, value in json_dict.items(): if isinstance(value, dict): json_dict[key] = objectHookHandler(value) else: try: json_dict[key] = datetime.strptime(value, "%Y-%m-%dT%H:%M:%S.%f") except Exception as ex: pass if "synergy_object" in json_dict: synergy_object = json_dict["synergy_object"] try: objClass = import_class(synergy_object["name"]) objInstance = objClass() return objInstance.deserialize(json_dict) except SynergyError as ex: raise ex else: return json_dict
def _compute_name(self): comp_name = '/' for hc_res_clinical_impression in self: if hc_res_clinical_impression.subject_type == 'patient': comp_name = hc_res_clinical_impression.subject_patient_id.name if hc_res_clinical_impression.subject_patient_id.birth_date: subject_patient_birth_date = datetime.strftime(datetime.strptime(hc_res_clinical_impression.subject_patient_id.birth_date, DF), "%Y-%m-%d") comp_name = comp_name + "("+ subject_patient_birth_date + ")," if hc_res_clinical_impression.subject_type == 'group': comp_name = hc_res_clinical_impression.subject_group_id.name + "," if hc_res_clinical_impression.code_id: comp_name = comp_name + " " + hc_res_clinical_impression.code_id.name + "," or '' if hc_res_clinical_impression.date: patient_date = datetime.strftime(datetime.strptime(hc_res_clinical_impression.date, DTF), "%Y-%m-%d") comp_name = comp_name + " " + patient_date hc_res_clinical_impression.name = comp_name
def _compute_name(self): comp_name = '/' for hc_res_condition in self: if hc_res_condition.subject_type == 'patient': comp_name = hc_res_condition.subject_patient_id.name if hc_res_condition.subject_patient_id.birth_date: subject_patient_birth_date = datetime.strftime(datetime.strptime(hc_res_condition.subject_patient_id.birth_date, DF), "%Y-%m-%d") comp_name = comp_name + "("+ subject_patient_birth_date + ")" if hc_res_condition.subject_type == 'group': comp_name = hc_res_condition.subject_group_id.name if hc_res_condition.code_id: comp_name = comp_name + ", " + hc_res_condition.code_id.name or '' if hc_res_condition.asserted_date: patient_asserted_date = datetime.strftime(datetime.strptime(hc_res_condition.asserted_date, DTF), "%Y-%m-%d") comp_name = comp_name + ", " + patient_asserted_date hc_res_condition.name = comp_name
def _compute_name(self): comp_name = '/' for hc_res_encounter in self: if hc_res_encounter.subject_type == 'patient': comp_name = hc_res_encounter.subject_patient_id.name if hc_res_encounter.subject_patient_id.birth_date: subject_patient_birth_date = datetime.strftime(datetime.strptime(hc_res_encounter.subject_patient_id.birth_date, DF), "%Y-%m-%d") comp_name = comp_name + "("+ subject_patient_birth_date + ")" if hc_res_encounter.subject_type == 'group': comp_name = hc_res_encounter.subject_group_id.name # if hc_res_encounter.type_id: # comp_name = comp_name + ", " + hc_res_encounter.type_id.name or '' if hc_res_encounter.start_date: subject_start_date = datetime.strftime(datetime.strptime(hc_res_encounter.start_date, DTF), "%Y-%m-%d") comp_name = comp_name + ", " + subject_start_date hc_res_encounter.name = comp_name
def _get_time_obj(time_str): """ In some ESM versions the time format is YYYY/MM/DD instead of DD/MM/YYYY. This detects and normalizes to the latter. Args: last_time (str): timestamp in format 'YYYY/MM/DD HH:MM:SS' or 'DD/MM/YYYY HH:MM:SS' Returns: str of timestamp in 'DD/MM/YYYY HH:MM:SS' format or None if neither format matches """ time_format1 = '%m/%d/%Y %H:%M:%S' time_format2 = '%Y/%m/%d %H:%M:%S' try: time_obj = datetime.strptime(time_str, time_format1) except ValueError: try: time_obj = datetime.strptime(time_str, time_format2) except ValueError: logging.debug('Invalid time format: {}'.format(time_str)) time_obj = None return time_obj
def setUp(self): self.config_cv = { 'start_date':'2014-01-01', 'end_date':'2014-05-05', 'train_on':{'days':0, 'weeks':5}, 'test_on':{'days':0, 'weeks':1}, 'fake_freq':'6W' } d_labels = {'response' : pd.Series([1.18, 1.28], index=[0,1]), 'Feces_kg_day' : pd.Series([1.18, 1.28], index=[0,1]), 'ToiletID' : pd.Series(['a08D000000PXgspIAD', 'a08D000000PXgspIAD'], index=[0,1]), 'Collection_Date': pd.Series([datetime.strptime('2014-10-16','%Y-%m-%d'), datetime.strptime('2014-10-17','%Y-%m-%d')], index=[0,1]) } d_feat = {'Total_Waste_kg_day' : pd.Series([7.13, 7.63], index=[0,1]), 'ToiletID' : pd.Series(['a08D000000PXgspIAD', 'a08D000000PXgspIAD'], index=[0,1]), 'Collection_Date': pd.Series([datetime.strptime('2014-10-16','%Y-%m-%d'), datetime.strptime('2014-10-17','%Y-%m-%d')], index=[0,1]), 'Feces_kg_day_lag1' : pd.Series([0, 1.18], index=[0,1]), 'Feces_kg_day_lag2' : pd.Series([0, 0], index=[0,1]), 'Feces_kg_day_lag3' : pd.Series([0, 0], index=[0,1]) } self.features_big = pd.DataFrame(d_feat) self.labels_big = pd.DataFrame(d_labels)
def _read_task_metadata(self, task_dir): """Read the meta file containing core fields for dstat.""" try: with open(os.path.join(task_dir, 'meta.yaml'), 'r') as f: meta = yaml.load('\n'.join(f.readlines())) # Make sure that create-time string is turned into a datetime meta['create-time'] = datetime.strptime(meta['create-time'], '%Y-%m-%d %H:%M:%S.%f') return meta except (IOError, OSError): # lookup_job_tasks may try to read the task metadata as a task is being # created. In that case, just catch the exception and return None. return None
def date_prompt(self): print("") print("existing hours: ") for i, entry in enumerate(self.time_log.values()): weekday = DAY_INDEX[datetime.strptime(str(entry.date), "%Y-%m-%d").weekday()] if entry.holiday: print("[{0:2}] {1:3} {2} - {3} hours - {4}".format(i, weekday, entry.date, entry.time, entry.holiday)) # item(ISO date, existing hours) else: print("[{0:2}] {1:3} {2} - {3} hours".format(i, weekday, entry.date, entry.time)) # item(ISO date, existing hours) date_index = input("select date(s): ") print("") if date_index.find("-") != -1: date_range = date_index.split("-") if len(date_range) != 2: print("Invalid date selection. Expected int or range (i.e., '1-5').") return else: self.entry_prompt(list( range(int(date_range[0]), int(date_range[1]) + 1) )) else: try: self.entry_prompt(int(date_index)) except ValueError: print("Invalid date selection. Expected int or range (i.e., '1-5').")
def get_evaluations(cls, start_date, end_date, dx): """ Return evaluation info """ Evaluation = Pool().get('gnuhealth.patient.evaluation') start_date = datetime.strptime(str(start_date), '%Y-%m-%d') end_date = datetime.strptime(str(end_date), '%Y-%m-%d') end_date += relativedelta(hours=+23,minutes=+59,seconds=+59) clause = [ ('evaluation_start', '>=', start_date), ('evaluation_start', '<=', end_date), ] if dx: clause.append(('diagnosis', '=', dx)) res = Evaluation.search(clause) return(res)
def count_evaluations(cls, start_date, end_date, dx): """ count diagnoses by groups """ Evaluation = Pool().get('gnuhealth.patient.evaluation') start_date = datetime.strptime(str(start_date), '%Y-%m-%d') end_date = datetime.strptime(str(end_date), '%Y-%m-%d') end_date += relativedelta(hours=+23,minutes=+59,seconds=+59) clause = [ ('evaluation_start', '>=', start_date), ('evaluation_start', '<=', end_date), ('diagnosis', '=', dx), ] res = Evaluation.search_count(clause) return(res)
def deserialize_column(self, column, value): if value is None: return None if isinstance(column.type, sqltypes.DateTime): return datetime.strptime(value, self.DATETIME_FORMAT) if isinstance(column.type, sqltypes.Time): hour, minute, second = value.split(':') return time(int(hour), int(minute), int(second)) if isinstance(column.type, sqltypes.Integer): return int(value) if isinstance(column.type, sqltypes.Float): return float(value) return value
def get(self): cluster = self.get_argument('cluster') fr = self.get_argument('fr', default='1970-01-01T00:00:00.000000') to = self.get_argument('to', default='2200-01-01T00:00:00.000000') # Parse the dates fr = datetime.strptime(fr, "%Y-%m-%dT%H:%M:%S.%f") to = datetime.strptime(to, "%Y-%m-%dT%H:%M:%S.%f") logs = list() for log_line in DB.session.query( ClusterLog ).filter(and_(ClusterLog.cluster == cluster, ClusterLog.when < to, ClusterLog.when > fr)).all(): logs.append(log_line.to_dict()) self.set_status(200) self.write(json.dumps(logs).encode('utf-8'))