我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用datetime.datetime.strftime()。
def update_crawl_message(crawl_messages): crawl_messages.set_message(0, config.EVENT_NAME) crawl_messages.set_message_colors(0, graphics.BRIGHT_BLUE, graphics.BLACK) now = datetime.utcnow() crawl_messages.set_message(1, datetime.strftime(now, '%H:%M:%S')) if now < config.EVENT_START_TIME: delta = config.EVENT_START_TIME - now seconds = delta.total_seconds() bg = graphics.BLUE if seconds > 3600 else graphics.RED crawl_messages.set_message(2, '%s starts in %s' % (config.EVENT_NAME, delta_time_to_string(delta))) crawl_messages.set_message_colors(2, graphics.WHITE, bg) elif now < config.EVENT_END_TIME: delta = config.EVENT_END_TIME - now seconds = delta.total_seconds() fg = graphics.YELLOW if seconds > 3600 else graphics.ORANGE crawl_messages.set_message(2, '%s ends in %s' % (config.EVENT_NAME, delta_time_to_string(delta))) crawl_messages.set_message_colors(2, fg, graphics.BLACK) else: crawl_messages.set_message(2, '%s is over.' % config.EVENT_NAME) crawl_messages.set_message_colors(2, graphics.RED, graphics.BLACK)
def _compute_name(self): comp_name = '/' for hc_res_clinical_impression in self: if hc_res_clinical_impression.subject_type == 'patient': comp_name = hc_res_clinical_impression.subject_patient_id.name if hc_res_clinical_impression.subject_patient_id.birth_date: subject_patient_birth_date = datetime.strftime(datetime.strptime(hc_res_clinical_impression.subject_patient_id.birth_date, DF), "%Y-%m-%d") comp_name = comp_name + "("+ subject_patient_birth_date + ")," if hc_res_clinical_impression.subject_type == 'group': comp_name = hc_res_clinical_impression.subject_group_id.name + "," if hc_res_clinical_impression.code_id: comp_name = comp_name + " " + hc_res_clinical_impression.code_id.name + "," or '' if hc_res_clinical_impression.date: patient_date = datetime.strftime(datetime.strptime(hc_res_clinical_impression.date, DTF), "%Y-%m-%d") comp_name = comp_name + " " + patient_date hc_res_clinical_impression.name = comp_name
def _compute_name(self): comp_name = '/' for hc_res_condition in self: if hc_res_condition.subject_type == 'patient': comp_name = hc_res_condition.subject_patient_id.name if hc_res_condition.subject_patient_id.birth_date: subject_patient_birth_date = datetime.strftime(datetime.strptime(hc_res_condition.subject_patient_id.birth_date, DF), "%Y-%m-%d") comp_name = comp_name + "("+ subject_patient_birth_date + ")" if hc_res_condition.subject_type == 'group': comp_name = hc_res_condition.subject_group_id.name if hc_res_condition.code_id: comp_name = comp_name + ", " + hc_res_condition.code_id.name or '' if hc_res_condition.asserted_date: patient_asserted_date = datetime.strftime(datetime.strptime(hc_res_condition.asserted_date, DTF), "%Y-%m-%d") comp_name = comp_name + ", " + patient_asserted_date hc_res_condition.name = comp_name
def _compute_name(self): comp_name = '/' for hc_res_encounter in self: if hc_res_encounter.subject_type == 'patient': comp_name = hc_res_encounter.subject_patient_id.name if hc_res_encounter.subject_patient_id.birth_date: subject_patient_birth_date = datetime.strftime(datetime.strptime(hc_res_encounter.subject_patient_id.birth_date, DF), "%Y-%m-%d") comp_name = comp_name + "("+ subject_patient_birth_date + ")" if hc_res_encounter.subject_type == 'group': comp_name = hc_res_encounter.subject_group_id.name # if hc_res_encounter.type_id: # comp_name = comp_name + ", " + hc_res_encounter.type_id.name or '' if hc_res_encounter.start_date: subject_start_date = datetime.strftime(datetime.strptime(hc_res_encounter.start_date, DTF), "%Y-%m-%d") comp_name = comp_name + ", " + subject_start_date hc_res_encounter.name = comp_name
def _compute_name(self): comp_name = '/' for hc_coverage in self: if hc_coverage.network_id: comp_name = hc_coverage.network_id.name or '' if hc_coverage.policy_holder_name: comp_name = comp_name + ", " + hc_coverage.policy_holder_name or '' if hc_coverage.subscriber_name: comp_name = comp_name + ", " + hc_coverage.subscriber_name or '' if hc_coverage.beneficiary_id: comp_name = comp_name + ", " + hc_coverage.beneficiary_id.name or '' if hc_coverage.type_id: comp_name = comp_name + ", " + hc_coverage.type_id.name or '' if hc_coverage.start_date: comp_name = comp_name + ", " + datetime.strftime(datetime.strptime(hc_coverage.start_date, DTF), "%Y-%m-%d") hc_coverage.name = comp_name
def checkSQL(self,symbol): self.cursor.execute("SELECT name FROM sqlite_master WHERE type='table';") res = self.cursor.fetchall()[0] if symbol in res: for i in range(self.delta.days + 1): if (self.startDate + timedelta(days=i)).weekday() in [6, 7]: continue self.cursor.execute("SELECT * FROM %s WHERE Date = %s" % (symbol, datetime.strftime(self.startDate + timedelta(days=i),"%Y-%m-%d"))) r = self.cursor.fetchall() if r == []: self.missing_data.append(symbol) break self.included_data.append(symbol) else: self.missing_data.append(symbol)
def setup(config=None): root_logger = logging.getLogger() log_path = config.get_safe('global', '--log-path', '.') if not os.path.exists(log_path): raise RuntimeError('configured ``--log-path`` value does not exist: %s' % log_path) date = datetime.strftime(datetime.utcnow(), '%Y-%m-%d') log_file = os.path.join(log_path, 'ceph-medic-%s.log' % date) root_logger.setLevel(logging.DEBUG) # File Logger fh = logging.FileHandler(log_file) fh.setLevel(logging.DEBUG) fh.setFormatter(logging.Formatter(FILE_FORMAT)) root_logger.addHandler(fh)
def get_stock(symbol): last_year_date = datetime.strftime(datetime.now() - relativedelta(years=1), "%Y-%m-%d") date = get_last_trading_date() url = requests.get('https://www.quandl.com/api/v3/datasets/WIKI/{}.json?start_date={}&end_date={}'.format(symbol, last_year_date, date)) json_dataset = url.json() json_data = json_dataset['dataset']['data'] dates = [] closing = [] for day in json_data: dates.append(datetime.strptime(day[0], "%Y-%m-%d")) closing.append(day[4]) plt.plot_date(dates, closing, '-') plt.title(symbol) plt.xlabel('Date') plt.ylable('Stock Price') plt.savefig('foo.png')
def get_game(self): # if the movelist is positioned part way through the game then # we must redo all moves to get the full game redo_count = len(gv.jcchess.get_redolist()) for i in range(0, redo_count): gv.jcchess.redo_move() game = chess.pgn.Game.from_board(self.chessboard) # if we did any redo moves then undo them now to get things back # the way they were for i in range(0, redo_count): gv.jcchess.undo_move() game.headers["Event"] = "Computer Chess Game" game.headers["Site"] = socket.gethostname() game.headers["Date"] = datetime.strftime(datetime.now(), '%Y.%m.%d') game.headers["Round"] = "-" game.headers["White"] = gv.jcchess.get_player(WHITE) game.headers["Black"] = gv.jcchess.get_player(BLACK) return game
def monta_caminho_inutilizacao(self, ambiente=None, data=None, serie=None, numero_inicial=None, numero_final=None): caminho = self.caminho + u'ArquivosXML/CTe/' if ambiente == 1: caminho = os.path.join(caminho, 'producao/') else: caminho = os.path.join(caminho, 'homologacao/') if data is None: data = datetime.now() caminho = os.path.join(caminho, data.strftime(u'%Y-%m') + u'/') serie = unicode(serie).strip().rjust(3, u'0') numero_inicial = unicode(numero_inicial).strip().rjust(9, u'0') numero_final = unicode(numero_final).strip().rjust(9, u'0') caminho = os.path.join(caminho, serie + u'-' + numero_inicial + u'-' + numero_final + u'/') return caminho
def _get_email_thread_attachment(ticket, email_category=None): try: _emails = ImplementationFactory.instance.get_singleton_of( 'MailerServiceBase' ).get_emails(ticket) except (KeyError, MailerServiceException) as ex: raise InternalServerError(str(ex)) emails = [email for email in _emails if email.category.lower() == email_category] try: content, filetype = utils.get_email_thread_content(ticket, emails) except (utils.EmailThreadTemplateNotFound, utils.EmailThreadTemplateSyntaxError) as ex: raise InternalServerError(str(ex)) content = base64.b64encode(content) name = 'ticket_{}_emails_{}{}'.format( ticket.publicId, datetime.strftime(datetime.now(), '%d-%m-%Y_%H-%M-%S'), mimetypes.guess_extension(filetype), ) return {'filetype': filetype, 'content': content, 'name': name}
def __str__(self): if self.user is None or self.date is None: error("Stringify `Tweet` without `user` or `date`.") raise ValueError() dt = self.date.astimezone(timezone(timedelta(hours=9))) ds = datetime.strftime(dt, "%Y-%m-%d %H:%M:%S JST") results = [self.user, ds] for t in [self.ja, self.zh]: if len(t) == 0: continue # Fix GBK encoding t = t.replace('?', '·') t = t.replace('?', '×') t = t.replace('#???', '') t = t.replace('#????????', '') results.extend(['', t.strip()]) results.extend([str(m) for m in self.media]) return '\n'.join(results)
def __str__(self): if self.user is None or self.date is None: error("Stringify `Tweet` without `user` or `date`.") raise ValueError() dt = self.date.astimezone(timezone(timedelta(hours=9))) ds = datetime.strftime(dt, "%Y-%m-%d %H:%M:%S JST") results = [self.user, ds] for t in [self.ja, self.zh]: if len(t) == 0: continue # Fix GBK encoding t = t.replace('?', '·') t = t.replace('?', '×') t = t.replace('?', '') t = t.replace('#???', '') t = t.replace('#????????', '') results.extend(['', t.strip()]) results.extend([str(m) for m in self.media]) return '\n'.join(results)
def enable(): ## get datetime for timestamp ## dt = datetime.now() date = datetime.strftime(dt, '%Y-%m-%d') fileOut = "/capture-data/" + date + ".pcap" #set timer t = time.time() pcap = subprocess.Popen(['/usr/sbin/tcpdump', '-n', '-e', '-w', fileOut], stdout=subprocess.PIPE) Ps.pcap = pcap.pid while t != 0: pcap return False ## end enable function
def create_key(session, name, pubkey, check_mode): if check_mode: from datetime import datetime now = datetime.utcnow() return { 'id': 0, 'key': pubkey, 'title': name, 'url': 'http://example.com/CHECK_MODE_GITHUB_KEY', 'created_at': datetime.strftime(now, '%Y-%m-%dT%H:%M:%SZ'), 'read_only': False, 'verified': False } else: return session.request( 'POST', API_BASE + '/user/keys', data=json.dumps({'title': name, 'key': pubkey})).json()
def _createGameFromData(self, gameData): temp = int(gameData['Template']) tempID, tempSettings, overrides = self._getTempSettings(temp) teams = self._assembleTeams(gameData) try: wlID = self.handler.createGame(tempID, self._getGameName(gameData), teams, settingsDict=tempSettings, overridenBonuses=overrides, teamless=self.teamless, message=self._getGameMessage(gameData)) self._adjustTemplateGameCount(temp, 1) createdStr = datetime.strftime(datetime.now(), self.TIMEFORMAT) self._updateEntityValue(self.games, gameData['ID'], WarlightID=wlID, Created=createdStr) return gameData except Exception as e: sides = gameData['Sides'] self.parent.log("Failed to make game with %s on %d because of %s" % (sides, temp, repr(e)), self.name, error=True) self._deleteGame(gameData, False, False)
def test_createGame(self, parser, datetime, delete): self.games.findEntities.return_value = [{'Template': '43', 'Sides': '1/2', 'Vetos': '8', 'ID': 'gameID'},] self.templates.findEntities.return_value = [{'ID': 'tempID', 'WarlightID': 4904, 'SET_A#B': '490', 'SET_SETTING': 314, 'OVERRIDE_Bonus': 12, 'Usage': '8', 'Name': 'TempName'},] self.teams.findEntities.return_value = [{'ID': '1', 'Players': '3022124041', 'Name': 'Team Name', 'Rating': '4034', 'Rank': '1'},] self.handler.createGame.return_value = "WLID" datetime.strftime.return_value = "strftime" self.league._createGame('gameID') self.games.updateMatchingEntities.assert_called_with({'ID': {'value': 'gameID', 'type': 'positive'}}, {'WarlightID': "WLID", 'Created': "strftime"}) self.handler.createGame.side_effect = IOError self.league._createGame('gameID') failStr = "Failed to make game with 1/2 on 43 because of IOError()" self.parent.log.assert_called_with(failStr, self.league.name, error=True) delete.assert_called_once_with(self.games.findEntities.return_value[0], False, False)
def test_decayRatings(self): oldCount = self.teams.updateMatchingEntities.call_count self._setProp(self.league.SET_RATING_DECAY, "0") current = datetime.strftime(datetime.now(), self.league.TIMEFORMAT) self._setProp(self.league.SET_LATEST_RUN, current) self.league._decayRatings() self._setProp(self.league.SET_RATING_DECAY, "10") self.league._decayRatings() assert_equals(self.teams.updateMatchingEntities.call_count, oldCount) self._setProp(self.league.SET_LATEST_RUN, "") self.teams.findEntities.return_value = [{'ID': 1, 'Rating': '33/5', 'Ongoing': '0', 'Finished': '8', 'Limit': '0', 'Confirmations': ''}, {'ID': 2, 'Rating': '34/8', 'Ongoing': '1', 'Finished': '0', 'Limit': '3', 'Confirmations': 'TRUE,FALSE,FALSE'}, {'ID': 3, 'Rating': '39/1', 'Ongoing': '2', 'Finished': '121', 'Limit': '12', 'Confirmations': 'TRUE,TRUE,TRUE'}, {'ID': 4, 'Rating': '12/0', 'Ongoing': '0', 'Finished': '0', 'Limit': '0', 'Confirmations': 'FALSE,FALSE,FALSE'}] self.league._decayRatings() self.teams.updateMatchingEntities.assert_called_with({'ID': {'value': 2, 'type': 'positive'}}, {'Rating': "24/8"}) assert_equals(self.teams.updateMatchingEntities.call_count, 2)
def test_run(self, update, execute, validate, restore, create, rescale, decay, calculate): self._setProp(self.league.SET_WAIT_PERIOD, 500) self._setProp(self.league.SET_LATEST_RUN, datetime.strftime(datetime.now() - timedelta(minutes=400), self.league.TIMEFORMAT)) self.league.run() update.assert_not_called() execute.assert_called_once_with() self._setProp(self.league.SET_WAIT_PERIOD, 300) self._setProp(self.league.SET_ACTIVE, "FALSE") self.league.run() create.assert_not_called() for fn in {update, validate, restore, rescale, decay, calculate}: fn.assert_called_once_with() assert_equals(execute.call_count, 2) execute.assert_called_with() self._setProp(self.league.SET_ACTIVE, "TRUE") self.teams.findEntities.return_value = [{'Limit': '10'},] * 10 self.templates.findEntities.return_value = xrange(5) assert_true(self.league.active) self.league.run() create.assert_called_once_with()
def _get_next_report_date(self, cr, uid, ids, field_name, arg, context=None): """Return the next report date based on the last report date and report period. :return: a string in DEFAULT_SERVER_DATE_FORMAT representing the date""" res = {} for challenge in self.browse(cr, uid, ids, context=context): last = datetime.strptime(challenge.last_report_date, DF).date() if challenge.report_message_frequency == 'daily': next = last + timedelta(days=1) res[challenge.id] = next.strftime(DF) elif challenge.report_message_frequency == 'weekly': next = last + timedelta(days=7) res[challenge.id] = next.strftime(DF) elif challenge.report_message_frequency == 'monthly': month_range = calendar.monthrange(last.year, last.month) next = last.replace(day=month_range[1]) + timedelta(days=1) res[challenge.id] = next.strftime(DF) elif challenge.report_message_frequency == 'yearly': res[challenge.id] = last.replace(year=last.year + 1).strftime(DF) # frequency == 'once', reported when closed only else: res[challenge.id] = False return res
def default(self, o): # to support arbitrary iterators try: iterable = iter(o) except TypeError: pass else: return list(iterable) # support datetime if isinstance(o, datetime): # return o.isoformat() if o else '' return datetime.strftime(o, '%Y-%m-%d %H:%M:%S') if o else '' elif hasattr(o, '__dict__'): return o.__dict__ return str(o) # return JSONEncoder.default(self, o)
def _filter_bs_D(self): dateline = sys.argv[2] d_unix = time.mktime(time.strptime(dateline, '%Y%m%d')) unix_time = d_unix - 86000 yestoday = datetime.fromtimestamp(unix_time).strftime('%Y%m%d') sql = "select * from s_stock_fenbi_daily where kp_xc > 15000000 and dateline in (%s, %s)" % (yestoday, dateline) data = self.mysql.getRecord(sql) tmp_list = {} for i in range(0, len(data)): if data[i]['s_code'] not in tmp_list.keys(): tmp_list[data[i]['s_code']] = [] tmp_list[data[i]['s_code']].append(data[i]) c = [] for k, v in tmp_list.items(): if len(v) == 2: c.append(k) return c
def __str__(self): for url in self.media: filename = os.path.basename(url) dir = os.path.join(config['cq_root_dir'], config['cq_image_dir'], 'twitter') path = os.path.join(dir, filename) # ??twitter?????????? if not os.path.exists(dir): os.mkdir(dir) # ?? if not os.path.exists(path): resp = requests.get(url, timeout=60, proxies=config.get('proxies')) with open(path, 'wb') as f: f.write(resp.content) dt = self.date.astimezone(timezone(timedelta(hours=9))) ds = datetime.strftime(dt, "%Y-%m-%d %H:%M:%S JST") results = [ds, ] text = self.text text = text.replace('?', '·').replace('?', '×').replace('#????????', '').replace('?', '') results.extend(['', text.strip()]) results.extend([str(CQImage(os.path.join('twitter', os.path.basename(m)))) for m in self.media]) return '\n'.join(results)
def test_fill_with_duplicate_status(self): sample_user=twitter.User( id=718443, name='Bob Loblaw', screen_name='bobby', location='Vancouver, Canada', verified=True ) sample_status=twitter.Status( created_at=datetime.strftime(timezone.make_aware(datetime.now()), '%a %b %d %H:%M:%S +0000 %Y'), id=1234567, text='Hello world, again!', user=sample_user, retweet_count=1, favorite_count=5, ) movie = Movie.objects.get(imdbID="123456") duplicate_tweet = Tweet().fillWithStatusObject(sample_status, movie) self.assertEqual(duplicate_tweet.text, 'Hello world')
def add_user(username: str, password: str, facebook_id: int): """ Inserts a user to the database :param username: users username :param password: users password :param facebook_id: users unique facebook id :return: nothing """ data = [] data.append(username) data.append(password) data.append(facebook_id) data.append(datetime.strftime(datetime.now(), '%Y-%m-%d %H:%M:%S')) # Adds the data to the table conn = database.DatabaseConnector.connection cur = conn.cursor() try: cur.execute("INSERT INTO `user`(`username`,`password`,`facebook_id`,`registry_date`) VALUES (?,?,?,?)", data) except: cur.execute("UPDATE `user` SET password = ?, facebook_id = ? where username = \"" + username + "\"", data[1:3]) conn.commit()
def assemble_scene_id_list(ref_time, prow, sat, end_date, delta=16): scene_id_list = [] padded_pr, date_part, location, archive = find_valid_scene(ref_time, prow, sat) while ref_time < end_date: scene_str = '{}{}{}{}{}'.format(sat, padded_pr, date_part, location, archive) print('add scene: {}, for {}'.format(scene_str, datetime.strftime(ref_time, '%Y-%m-%d'))) scene_id_list.append(scene_str) ref_time += timedelta(days=delta) date_part = datetime.strftime(ref_time, '%Y%j') return scene_id_list
def save_result(self, alpha, epochs, filename, save_path='./'): #self.result.to_csv( # os.path.join(save_path, # filename + datetime.strftime(datetime.now(), # '%Y%m%d-%H%M.csv')) #) ax = self.result.plot(title="Average adherence rate of patients", legend=True, yticks=[0.5, 0.6, 0.7, 0.8, 0.9]) ax.set(xlabel = "alpha = {}, epochs = {}".format(alpha, epochs)) fig = ax.get_figure() fig.savefig( os.path.join(save_path, filename + datetime.strftime(datetime.now(), 'plot_%Y%m%d-%H%M.png')) )
def initLastTime(self, path): if not os.path.exists(path): cur_time = dself._now.strftime("%y%m%d %H:%M:%S") self._last_time = datetime.strptime(cur_time, "%y%m%d %H:%M:%S").isoformat() return False last_re = re.compile("last_time : (.*)\\n?") ifs = open(path, "r") lines = ifs.readlines() for l in reversed(lines): if not l: continue m = last_re.match(l) if m is not None: self._last_time = (m.groups(0))[0] ifs.close() return True ifs.close() cur_time = self._now.strftime("%y%m%d %H:%M:%S") self._last_time = datetime.strptime(cur_time, "%y%m%d %H:%M:%S").isoformat() return False
def create_or_update_user_variable(self, name, value, var_type=2): try: if int(var_type) == 3: if isinstance(value, datetime): var_value = datetime.strftime(value, '%Y-%m-%d') elif int(var_type) == 4: if isinstance(value, time): var_value = time.strftime(value, '%H:%M') finally: var_value = str(value) params = uservariables['save'].copy() params['vname'] = name params['vvalue'] = var_value params['vtype'] = str(var_type) req = requests.get(self.base_url, params) status = req.json() if status == 'Variable name already exists!': params['param'] = uservariables['update']['param'] req = requests.get(self.base_url, params) status = req.json() return status
def current_time(): ts = time.time() return datetime.fromtimestamp(ts).strftime(TIME_FORMAT) # converts a string to a timestamp
def ts_to_string(ts): return datetime.strftime(ts, TIME_FORMAT) # Generate a random password
def Log(self, eventType, dataList): ''' Create an entry in the log file. Each entry will look like: timestamp\tevent\tdata1\tdata2 <etc>\n where: timestamp = integer seconds since the UNIX epoch event = string identifying the event data1..n = individual data fields, as appropriate for each event type. To avoid maintenance issues w/r/t enormous log files, the log filename that's stored in the settings file is passed through datetime.strftime() so we can expand any format codes found there against the current date/time and create e.g. a monthly log file. ''' now = int(time()) today = datetime.fromtimestamp(now) # if there's no explicit log file path/name, we create one # that's the current year & month. fileName = self.settings.logFilePath if not fileName: fileName = "%Y-%m.txt" self.settings.logFilePath = fileName path = self.GetPath(fileName) path = today.strftime(path) with open(path, "a+t") as f: f.write("{0}\t{1}\t".format(now, eventType)) f.write("\t".join(dataList)) f.write("\n")
def nmap_to_ip_ctime(nmap): """take nmap output and return a ctime and ip parsed from it""" firstlines = nmap.splitlines()[:2] mo = re.search('scan initiated (.+) as:', firstlines[0]) c_date = datetime.strptime(mo.group(1), '%a %b %d %H:%M:%S %Y') ctime = int(datetime.strftime(c_date, '%s')) mo = re.search('Nmap scan report for .* \((.*)\)', firstlines[1]) ip = mo.group(1) return (ip, ctime)
def __str__(self): return '{} - {}'.format( datetime.strftime(self.request_at, '%Y-%m-%d %H:%M:%S'), datetime.strftime(self.created_at, '%Y-%m-%d %H:%M:%S') )
def strftime(self, _date, _format): """Convert date to string according to format :param date _date: date to convert :param str _format: format to use (same as for datetime.strftime) :return str: converted date """ _datetime = datetime.combine(_date, datetime.min.time()) return _datetime.strftime(_format)
def last_end_date(self): ''' The last day when this Task was active''' if self.end_time: return self.end_time.strftime('%Y-%m-%d') return None
def __repr__(self): if self.start_time: start_str = '%s' % self.start_time.strftime('%H:%M') else: start_str = 'None' if self.end_time: end_str = '%s' % self.end_time.strftime('%H:%M') work_str = '%s' % str(self.work_time).split('.')[0] else: end_str = 'None' work_str = 'in progress' if self.tid is not None: return '[%d:%s] - %s| %s (%s -> %s) - %s' % (self.tid, self.uid, self.last_end_date, work_str, start_str, end_str, self.name) return '[%s] - %s| %s (%s -> %s) - %s' % (self.uid, self.last_end_date, work_str, start_str, end_str, self.name)
def work_on(task_id=0, start_time_str=None): '''Start given task id''' tasks = get_tasks(condition=lambda x: x.tid == task_id) tasks = group_task_by(tasks, group='name') if not tasks: LOGGER.error("could not find task ID '%s'", task_id) else: task = tasks[0] start_time = None if start_time_str: date_str = datetime.strftime(datetime.today(), '%Y-%m-%d') start_time = date_str + ' ' + start_time_str Task(task.name, start_str=start_time).start()
def write(self, vals): status_history_obj = self.env['hc.care.plan.activity.detail.status.history'] res = super(CarePlanActivityDetail, self).write(vals) status_history_record_ids = status_history_obj.search([('end_date','=', False)]) if status_history_record_ids: if vals.get('status') and status_history_record_ids[0].status != vals.get('status'): for status_history in status_history_record_ids: status_history.end_date = datetime.strftime(datetime.today(), DTF) time_diff = datetime.today() - datetime.strptime(status_history.start_date, DTF) if time_diff: days = str(time_diff).split(',') if days and len(days) > 1: status_history.time_diff_day = str(days[0]) times = str(days[1]).split(':') if times and times > 1: status_history.time_diff_hour = str(times[0]) status_history.time_diff_min = str(times[1]) status_history.time_diff_sec = str(times[2]) else: times = str(time_diff).split(':') if times and times > 1: status_history.time_diff_hour = str(times[0]) status_history.time_diff_min = str(times[1]) status_history.time_diff_sec = str(times[2]) status_history_vals = { 'detail_id': self.id, 'status': vals.get('status'), 'start_date': datetime.today() } status_history_obj.create(status_history_vals) return res
def write(self, vals): status_history_obj = self.env['hc.compartment.definition.status.history'] publication_status_obj = self.env['hc.vs.publication.status'] res = super(CompartmentDefinition, self).write(vals) status_history_record_ids = status_history_obj.search([('end_date','=', False)]) if status_history_record_ids: if vals.get('status_id') and status_history_record_ids[0].status != vals.get('status_id'): for status_history in status_history_record_ids: status_history.end_date = datetime.strftime(datetime.today(), DTF) time_diff = datetime.today() - datetime.strptime(status_history.start_date, DTF) if time_diff: days = str(time_diff).split(',') if days and len(days) > 1: status_history.time_diff_day = str(days[0]) times = str(days[1]).split(':') if times and times > 1: status_history.time_diff_hour = str(times[0]) status_history.time_diff_min = str(times[1]) status_history.time_diff_sec = str(times[2]) else: times = str(time_diff).split(':') if times and times > 1: status_history.time_diff_hour = str(times[0]) status_history.time_diff_min = str(times[1]) status_history.time_diff_sec = str(times[2]) publication_status = publication_status_obj.browse(vals.get('status_id')) status_history_vals = { 'compartment_definition_id': self.id, 'status': publication_status.name, 'start_date': datetime.today() } status_history_obj.create(status_history_vals) return res
def write(self, vals): status_history_obj = self.env['hc.code.system.status.history'] publication_status_obj = self.env['hc.vs.publication.status'] res = super(CodeSystem, self).write(vals) status_history_record_ids = status_history_obj.search([('end_date','=', False)]) if status_history_record_ids: if vals.get('status_id') and status_history_record_ids[0].status != vals.get('status_id'): for status_history in status_history_record_ids: status_history.end_date = datetime.strftime(datetime.today(), DTF) time_diff = datetime.today() - datetime.strptime(status_history.start_date, DTF) if time_diff: days = str(time_diff).split(',') if days and len(days) > 1: status_history.time_diff_day = str(days[0]) times = str(days[1]).split(':') if times and times > 1: status_history.time_diff_hour = str(times[0]) status_history.time_diff_min = str(times[1]) status_history.time_diff_sec = str(times[2]) else: times = str(time_diff).split(':') if times and times > 1: status_history.time_diff_hour = str(times[0]) status_history.time_diff_min = str(times[1]) status_history.time_diff_sec = str(times[2]) publication_status = publication_status_obj.browse(vals.get('status_id')) status_history_vals = { 'code_system_id': self.id, 'status': publication_status.name, 'start_date': datetime.today() } status_history_obj.create(status_history_vals) return res
def _compute_name(self): comp_name = '/' for hc_res_research_subject in self: if hc_res_research_subject.identifier_id: comp_name = hc_res_research_subject.identifier_id.name or '' if hc_res_research_subject.individual_id: comp_name = comp_name + ", " + hc_res_research_subject.individual_id.name or '' if hc_res_research_subject.study_id: comp_name = comp_name + ", " + hc_res_research_subject.study_id.name or '' if hc_res_research_subject.period_start_date: comp_name = comp_name + ", " + datetime.strftime(datetime.strptime(hc_res_research_subject.period_start_date, DTF), "%Y-%m-%d") hc_res_research_subject.name = comp_name
def write(self, vals): status_history_obj = self.env['hc.research.subject.status.history'] res = super(ResearchSubject, self).write(vals) status_history_record_ids = status_history_obj.search([('end_date','=', False)]) if status_history_record_ids: if vals.get('status') and status_history_record_ids[0].status != vals.get('status'): for status_history in status_history_record_ids: status_history.end_date = datetime.strftime(datetime.today(), DTF) time_diff = datetime.today() - datetime.strptime(status_history.start_date, DTF) if time_diff: days = str(time_diff).split(',') if days and len(days) > 1: status_history.time_diff_day = str(days[0]) times = str(days[1]).split(':') if times and times > 1: status_history.time_diff_hour = str(times[0]) status_history.time_diff_min = str(times[1]) status_history.time_diff_sec = str(times[2]) else: times = str(time_diff).split(':') if times and times > 1: status_history.time_diff_hour = str(times[0]) status_history.time_diff_min = str(times[1]) status_history.time_diff_sec = str(times[2]) status_history_vals = { 'research_subject_id': self.id, 'status': vals.get('status'), 'start_date': datetime.today() } status_history_obj.create(status_history_vals) return res
def write(self, vals): status_history_obj = self.env['hc.appointment.participant.status.history'] res = super(AppointmentParticipant, self).write(vals) status_history_record_ids = status_history_obj.search([('end_date','=', False)]) if status_history_record_ids: if vals.get('status') and status_history_record_ids[0].status != vals.get('status'): for status_history in status_history_record_ids: status_history.end_date = datetime.strftime(datetime.today(), DTF) time_diff = datetime.today() - datetime.strptime(status_history.start_date, DTF) if time_diff: days = str(time_diff).split(',') if days and len(days) > 1: status_history.time_diff_day = str(days[0]) times = str(days[1]).split(':') if times and times > 1: status_history.time_diff_hour = str(times[0]) status_history.time_diff_min = str(times[1]) status_history.time_diff_sec = str(times[2]) else: times = str(time_diff).split(':') if times and times > 1: status_history.time_diff_hour = str(times[0]) status_history.time_diff_min = str(times[1]) status_history.time_diff_sec = str(times[2]) status_history_vals = { 'participant_id': self.id, 'status': vals.get('status'), 'start_date': datetime.today() } status_history_obj.create(status_history_vals) return res
def write(self, vals): status_history_obj = self.env['hc.eligibility.request.status.history'] fm_status_obj = self.env['hc.vs.fm.status'] res = super(EligibilityRequest, self).write(vals) status_history_record_ids = status_history_obj.search([('end_date','=', False)]) if status_history_record_ids: if vals.get('status_id') and status_history_record_ids[0].status != vals.get('status_id'): for status_history in status_history_record_ids: status_history.end_date = datetime.strftime(datetime.today(), DTF) time_diff = datetime.today() - datetime.strptime(status_history.start_date, DTF) if time_diff: days = str(time_diff).split(',') if days and len(days) > 1: status_history.time_diff_day = str(days[0]) times = str(days[1]).split(':') if times and times > 1: status_history.time_diff_hour = str(times[0]) status_history.time_diff_min = str(times[1]) status_history.time_diff_sec = str(times[2]) else: times = str(time_diff).split(':') if times and times > 1: status_history.time_diff_hour = str(times[0]) status_history.time_diff_min = str(times[1]) status_history.time_diff_sec = str(times[2]) fm_status = fm_status_obj.browse(vals.get('status_id')) status_history_vals = { 'eligibility_request_id': self.id, 'status': fm_status.name, 'start_date': datetime.today() } if vals.get('status_id') == 'entered-in-error': status_id_history_vals.update({'end_date': datetime.today()}) status_history_obj.create(status_history_vals) return res
def _compute_name(self): comp_name = '/' for hc_res_detected_issue in self: if hc_res_detected_issue.patient_id: comp_name = hc_res_detected_issue.patient_id.name if hc_res_detected_issue.patient_id.birth_date: patient_birth_date = datetime.strftime(datetime.strptime(hc_res_detected_issue.patient_id.birth_date, DF), "%Y-%m-%d") comp_name = comp_name + "("+ patient_birth_date + ")," if hc_res_detected_issue.category_id: comp_name = comp_name + " " + hc_res_detected_issue.category_id.name + "," or '' if hc_res_detected_issue.date: patient_date = datetime.strftime(datetime.strptime(hc_res_detected_issue.date, DTF), "%Y-%m-%d") comp_name = comp_name + " " + patient_date hc_res_detected_issue.name = comp_name
def write(self, vals): status_history_obj = self.env['hc.activity.definition.status.history'] publication_status_obj = self.env['hc.vs.publication.status'] res = super(ActivityDefinition, self).write(vals) status_history_record_ids = status_history_obj.search([('end_date','=', False)]) if status_history_record_ids: if vals.get('status_id') and status_history_record_ids[0].status != vals.get('status_id'): for status_history in status_history_record_ids: status_history.end_date = datetime.strftime(datetime.today(), DTF) time_diff = datetime.today() - datetime.strptime(status_history.start_date, DTF) if time_diff: days = str(time_diff).split(',') if days and len(days) > 1: status_history.time_diff_day = str(days[0]) times = str(days[1]).split(':') if times and times > 1: status_history.time_diff_hour = str(times[0]) status_history.time_diff_min = str(times[1]) status_history.time_diff_sec = str(times[2]) else: times = str(time_diff).split(':') if times and times > 1: status_history.time_diff_hour = str(times[0]) status_history.time_diff_min = str(times[1]) status_history.time_diff_sec = str(times[2]) publication_status = publication_status_obj.browse(vals.get('status_id')) status_history_vals = { 'activity_definition_id': self.id, 'status': publication_status.name, 'start_date': datetime.today() } status_history_obj.create(status_history_vals) return res
def write(self, vals): status_history_obj = self.env['hc.payment.notice.status.history'] fm_status_obj = self.env['hc.vs.fm.status'] res = super(PaymentNotice, self).write(vals) status_history_record_ids = status_history_obj.search([('end_date','=', False)]) if status_history_record_ids: if vals.get('status_id') and status_history_record_ids[0].status != vals.get('status_id'): for status_history in status_history_record_ids: status_history.end_date = datetime.strftime(datetime.today(), DTF) time_diff = datetime.today() - datetime.strptime(status_history.start_date, DTF) if time_diff: days = str(time_diff).split(',') if days and len(days) > 1: status_history.time_diff_day = str(days[0]) times = str(days[1]).split(':') if times and times > 1: status_history.time_diff_hour = str(times[0]) status_history.time_diff_min = str(times[1]) status_history.time_diff_sec = str(times[2]) else: times = str(time_diff).split(':') if times and times > 1: status_history.time_diff_hour = str(times[0]) status_history.time_diff_min = str(times[1]) status_history.time_diff_sec = str(times[2]) fm_status = fm_status_obj.browse(vals.get('status_id')) status_history_vals = { 'payment_notice_id': self.id, 'status': fm_status.name, 'start_date': datetime.today() } if vals.get('status_id') == 'entered-in-error': status_id_history_vals.update({'end_date': datetime.today()}) status_history_obj.create(status_history_vals) return res
def _compute_unique_person(self): comp_unique_person = '/' for hc_person in self: if hc_person.name_id: comp_unique_person = hc_person.name_id.name or '' if hc_person.gender: comp_unique_person = comp_unique_person + ", " + hc_person.gender or '' if hc_person.birth_date: comp_unique_person = comp_unique_person + ", " + datetime.strftime(datetime.strptime(hc_person.birth_date, DF), "%Y-%m-%d") hc_person.unique_person = comp_unique_person
def write(self, vals): status_history_obj = self.env['hc.value.set.status.history'] res = super(ValueSet, self).write(vals) status_history_record_ids = status_history_obj.search([('end_date','=', False)]) if status_history_record_ids: if vals.get('status') and status_history_record_ids[0].status != vals.get('status'): for status_history in status_history_record_ids: status_history.end_date = datetime.strftime(datetime.today(), DTF) time_diff = datetime.today() - datetime.strptime(status_history.start_date, DTF) if time_diff: days = str(time_diff).split(',') if days and len(days) > 1: status_history.time_diff_day = str(days[0]) times = str(days[1]).split(':') if times and times > 1: status_history.time_diff_hour = str(times[0]) status_history.time_diff_min = str(times[1]) status_history.time_diff_sec = str(times[2]) else: times = str(time_diff).split(':') if times and times > 1: status_history.time_diff_hour = str(times[0]) status_history.time_diff_min = str(times[1]) status_history.time_diff_sec = str(times[2]) status_history_vals = { 'value_set_id': self.id, 'status': vals.get('status'), 'start_date': datetime.today() } status_history_obj.create(status_history_vals) return res