我们从Python开源项目中,提取了以下43个代码示例,用于说明如何使用datetime.datetime()。
def default(self, obj): """default method.""" if hasattr(obj, '__json__'): return obj.__json__() elif isinstance(obj, collections.Iterable): return list(obj) elif isinstance(obj, datetime): return obj.isoformat() elif hasattr(obj, '__getitem__') and hasattr(obj, 'keys'): return dict(obj) elif hasattr(obj, '__dict__'): return {member: getattr(obj, member) for member in dir(obj) if not member.startswith('_') and not hasattr(getattr(obj, member), '__call__')} return json.JSONEncoder.default(self, obj)
def get_response_and_time(self, key, default=(None, None)): """ Retrieves response and timestamp for `key` if it's stored in cache, otherwise returns `default` :param key: key of resource :param default: return this if `key` not found in cache :returns: tuple (response, datetime) .. note:: Response is restored after unpickling with :meth:`restore_response` """ try: if key not in self.responses: key = self.keys_map[key] response, timestamp = self.responses[key] except KeyError: return default return self.restore_response(response), timestamp
def setUpTestData(cls): super().setUpTestData() cls.create_local_and_remote_user() cls.user2 = AnonymousUser() cls.local_user = UserFactory() cls.public_content = ContentFactory( visibility=Visibility.PUBLIC, text="**Foobar**", author=cls.profile, ) cls.site_content = ContentFactory( visibility=Visibility.SITE, text="_Foobar_" ) cls.limited_content = ContentFactory(visibility=Visibility.LIMITED) cls.self_content = ContentFactory(visibility=Visibility.SELF) cls.remote_content = ContentFactory( visibility=Visibility.PUBLIC, remote_created=make_aware(datetime.datetime(2015, 1, 1)), author=cls.remote_profile, ) cls.ids = [ cls.public_content.id, cls.site_content.id, cls.limited_content.id, cls.self_content.id ] cls.set = { cls.public_content, cls.site_content, cls.limited_content, cls.self_content }
def test_am_pm_behaviour(self): check_time = datetime.datetime( year=2016, month=11, day=7, hour=22, minute=10, second=0, microsecond=1) PreHourlyProcessorUtil.get_data_provider().set_last_processed( date_time=(check_time + datetime.timedelta(hours=-12))) self.assertTrue(PreHourlyProcessorUtil.is_time_to_run(check_time))
def get_time(cls, request, time_attr): """Extracts a time object from the given request :param request: the request object :param time_attr: the attribute name :return: datetime.timedelta """ time_part = datetime.datetime.strptime( request.params[time_attr][:-4], '%a, %d %b %Y %H:%M:%S' ) return datetime.timedelta( hours=time_part.hour, minutes=time_part.minute )
def get_datetime(cls, request, date_attr, time_attr): """Extracts a UTC datetime object from the given request :param request: the request object :param date_attr: the attribute name :return: datetime.datetime """ date_part = datetime.datetime.strptime( request.params[date_attr][:-4], '%a, %d %b %Y %H:%M:%S' ) time_part = datetime.datetime.strptime( request.params[time_attr][:-4], '%a, %d %b %Y %H:%M:%S' ) # update the time values of date_part with time_part return date_part.replace( hour=time_part.hour, minute=time_part.minute, second=time_part.second, microsecond=time_part.microsecond )
def coerce_to_dtype(dtype, value): """ Make a value with the specified numpy dtype. Only datetime64[ns] and datetime64[D] are supported for datetime dtypes. """ name = dtype.name if name.startswith('datetime64'): if name == 'datetime64[D]': return make_datetime64D(value) elif name == 'datetime64[ns]': return make_datetime64ns(value) else: raise TypeError( "Don't know how to coerce values of dtype %s" % dtype ) return dtype.type(value)
def get_open_and_close(day, early_closes): market_open = pd.Timestamp( datetime( year=day.year, month=day.month, day=day.day, hour=9, minute=31), tz='US/Eastern').tz_convert('UTC') # 1 PM if early close, 4 PM otherwise close_hour = 13 if day in early_closes else 16 market_close = pd.Timestamp( datetime( year=day.year, month=day.month, day=day.day, hour=close_hour), tz='US/Eastern').tz_convert('UTC') return market_open, market_close
def create_test_panel_ohlc_source(sim_params, env): start = sim_params.first_open \ if sim_params else pd.datetime(1990, 1, 3, 0, 0, 0, 0, pytz.utc) end = sim_params.last_close \ if sim_params else pd.datetime(1990, 1, 8, 0, 0, 0, 0, pytz.utc) index = env.days_in_range(start, end) price = np.arange(0, len(index)) + 100 high = price * 1.05 low = price * 0.95 open_ = price + .1 * (price % 2 - .5) volume = np.ones(len(index)) * 1000 arbitrary = np.ones(len(index)) df = pd.DataFrame({'price': price, 'high': high, 'low': low, 'open': open_, 'volume': volume, 'arbitrary': arbitrary}, index=index) panel = pd.Panel.from_dict({0: df}) return DataPanelSource(panel), panel
def get_open_and_close(day, early_closes): # only "early close" event in Bovespa actually is a late start # as the market only opens at 1pm open_hour = 13 if day in quarta_cinzas else 10 market_open = pd.Timestamp( datetime( year=day.year, month=day.month, day=day.day, hour=open_hour, minute=00), tz='America/Sao_Paulo').tz_convert('UTC') market_close = pd.Timestamp( datetime( year=day.year, month=day.month, day=day.day, hour=16), tz='America/Sao_Paulo').tz_convert('UTC') return market_open, market_close
def _coerce_datetime(maybe_dt): if isinstance(maybe_dt, datetime.datetime): return maybe_dt elif isinstance(maybe_dt, datetime.date): return datetime.datetime( year=maybe_dt.year, month=maybe_dt.month, day=maybe_dt.day, tzinfo=pytz.utc, ) elif isinstance(maybe_dt, (tuple, list)) and len(maybe_dt) == 3: year, month, day = maybe_dt return datetime.datetime( year=year, month=month, day=day, tzinfo=pytz.utc, ) else: raise TypeError('Cannot coerce %s into a datetime.datetime' % type(maybe_dt).__name__)
def get_open_and_close(day, early_closes): market_open = pd.Timestamp( datetime( year=day.year, month=day.month, day=day.day, hour=9, minute=31), tz='US/Eastern').tz_convert('UTC') # 1 PM if early close, 4 PM otherwise close_hour = 13 if day in early_closes else 16 market_close = pd.Timestamp( datetime( year=day.year, month=day.month, day=day.day, hour=close_hour), tz='Asia/Shanghai').tz_convert('UTC') return market_open, market_close
def test_generator_dates(self): """ Ensure the pipeline of generators are in sync, at least as far as their current dates. """ sim_params = factory.create_simulation_parameters( start=datetime(2011, 7, 30, tzinfo=pytz.utc), end=datetime(2012, 7, 30, tzinfo=pytz.utc), env=self.env, ) algo = TestAlgo(self, sim_params=sim_params, env=self.env) trade_source = factory.create_daily_trade_source( [8229], sim_params, env=self.env, ) algo.set_sources([trade_source]) gen = algo.get_generator() self.assertTrue(list(gen)) self.assertTrue(algo.slippage.latest_date) self.assertTrue(algo.latest_date)
def test_progress(self): """ Ensure the pipeline of generators are in sync, at least as far as their current dates. """ sim_params = factory.create_simulation_parameters( start=datetime(2008, 1, 1, tzinfo=pytz.utc), end=datetime(2008, 1, 5, tzinfo=pytz.utc), env=self.env, ) algo = TestAlgo(self, sim_params=sim_params, env=self.env) trade_source = factory.create_daily_trade_source( [8229], sim_params, env=self.env, ) algo.set_sources([trade_source]) gen = algo.get_generator() results = list(gen) self.assertEqual(results[-2]['progress'], 1.0)
def test_checks_should_trigger(self): class CountingRule(Always): count = 0 def should_trigger(self, dt, env): CountingRule.count += 1 return True for r in [CountingRule] * 5: self.em.add_event( Event(r(), lambda context, data: None) ) mock_algo_class = namedtuple('FakeAlgo', ['trading_environment']) mock_algo = mock_algo_class(trading_environment="fake_env") self.em.handle_data(mock_algo, None, datetime.datetime.now()) self.assertEqual(CountingRule.count, 5)
def writeValue(self, value): if isinstance(value, (str, unicode)): self.simpleElement("string", value) elif isinstance(value, bool): # must switch for bool before int, as bool is a # subclass of int... if value: self.simpleElement("true") else: self.simpleElement("false") elif isinstance(value, (int, long)): self.simpleElement("integer", "%d" % value) elif isinstance(value, float): self.simpleElement("real", repr(value)) elif isinstance(value, dict): self.writeDict(value) elif isinstance(value, Data): self.writeData(value) elif isinstance(value, datetime.datetime): self.simpleElement("date", _dateToString(value)) elif isinstance(value, (tuple, list)): self.writeArray(value) else: raise TypeError("unsuported type: %s" % type(value))
def make_comparable(self, other): if isinstance(other, DateTime): s = self.value o = other.value elif datetime and isinstance(other, datetime.datetime): s = self.value o = other.strftime("%Y%m%dT%H:%M:%S") elif isinstance(other, (str, unicode)): s = self.value o = other elif hasattr(other, "timetuple"): s = self.timetuple() o = other.timetuple() else: otype = (hasattr(other, "__class__") and other.__class__.__name__ or type(other)) raise TypeError("Can't compare %s and %s" % (self.__class__.__name__, otype)) return s, o
def get_response(self, key, default=None): """ Retrieves response and timestamp for `key` if it's stored in cache, otherwise returns `default` :param key: key of resource :param default: return this if `key` not found in cache :returns: tuple (response, datetime) .. note:: Response is restored after unpickling with :meth:`restore_response` """ try: if key not in self.responses: key = self.keys_map[key] response = self.responses[key] except KeyError: return default return response
def test_activity_sets_sla_triaged_at(): r = new_report() r.save() assert r.sla_triaged_at is None # An activity that shouldn't update sla_triaged_at d1 = now() r.activities.create(id=1, type='activity-comment', created_at=d1) assert r.sla_triaged_at is None # And now one that should d2 = d1 + datetime.timedelta(hours=3) r.activities.create(id=2, type='activity-bug-not-applicable', created_at=d2) assert r.sla_triaged_at == d2 # And now another aciivity that would update the date, if it wasn't already set d3 = d2 + datetime.timedelta(hours=3) r.activities.create(id=3, type='activity-bug-resolved', created_at=d3) assert r.sla_triaged_at == d2
def _load_date(val): microsecond = 0 tz = None try: if len(val) > 19: if val[19] == '.': microsecond = int(val[20:26]) if len(val) > 26: tz = TomlTz(val[26:32]) else: tz = TomlTz(val[19:25]) except ValueError: tz = None try: d = datetime.datetime(int(val[:4]), int(val[5:7]), int(val[8:10]), int(val[11:13]), int(val[14:16]), int(val[17:19]), microsecond, tz) except ValueError: return None return d
def setUp(self): aws_data = { 'ScalableTargets': [ { 'ServiceNamespace': 'ecs', 'ResourceId': 'service/my_cluster/my_service', 'ScalableDimension': 'ecs:service:DesiredCount', 'MinCapacity': 1, 'MaxCapacity': 3, 'RoleARN': 'my_role_arn', 'CreationTime': datetime(2017, 4, 14) } ], 'NextToken': None } init = Mock(return_value=None) init.return_value = None appscaling_client = Mock() appscaling_client.describe_scalable_targets = Mock(return_value=aws_data) client = Mock(return_value=appscaling_client) with Replacer() as r: r.replace('boto3.client', client) r.replace('deployfish.aws.appscaling.ScalingPolicy.__init__', init) self.appscaling = ApplicationAutoscaling('my_service', 'my_cluster')
def setUp(self): aws_data = { 'ServiceNamespace': 'ecs', 'ResourceId': 'service/my_cluster/my_service', 'ScalableDimension': 'ecs:service:DesiredCount', 'MinCapacity': 1, 'MaxCapacity': 3, 'RoleARN': 'my_role_arn', 'CreationTime': datetime(2017, 4, 14) } init = Mock(return_value=None) init.return_value = None appscaling_client = Mock() self.describe_scaling_targets = Mock(return_value=aws_data) appscaling_client.describe_scalable_targets = self.describe_scaling_targets client = Mock(return_value=appscaling_client) with Replacer() as r: r.replace('boto3.client', client) r.replace('deployfish.aws.appscaling.ScalingPolicy.__init__', init) self.appscaling = ApplicationAutoscaling('my_service', 'my_cluster', aws=aws_data)
def test_edited_is_false_for_newly_created_content_within_15_minutes_grace_period(self): with freeze_time(self.public_content.created + datetime.timedelta(minutes=14)): self.public_content.save() self.assertFalse(self.public_content.edited)
def test_edited_is_true_for_newly_created_content_after_15_minutes_grace_period(self): with freeze_time(self.public_content.created + datetime.timedelta(minutes=16)): self.public_content.save() self.assertTrue(self.public_content.edited)
def test_dict_for_view_edited_post(self): with freeze_time(self.public_content.created + datetime.timedelta(minutes=16)): self.public_content.save() self.assertEqual(self.public_content.dict_for_view(self.user), { "author": self.public_content.author_id, "author_guid": self.public_content.author.guid, "author_handle": self.public_content.author.handle, "author_home_url": self.public_content.author.home_url, "author_image": self.public_content.author.safer_image_url_small, "author_is_local": bool(self.public_content.author.user), "author_name": self.public_content.author.handle, "author_profile_url": self.public_content.author.get_absolute_url(), "content_type": self.public_content.content_type.string_value, "delete_url": reverse("content:delete", kwargs={"pk": self.public_content.id}), "detail_url": self.public_content.get_absolute_url(), "formatted_timestamp": self.public_content.timestamp, "guid": self.public_content.guid, "has_shared": False, "humanized_timestamp": "%s (edited)" % self.public_content.humanized_timestamp, "id": self.public_content.id, "is_authenticated": True, "is_author": True, "is_following_author": False, "parent": "", "profile_id": self.public_content.author.id, "rendered": self.public_content.rendered, "reply_count": 0, "reply_url": reverse("content:reply", kwargs={"pk": self.public_content.id}), "shares_count": 0, "slug": self.public_content.slug, "through": self.public_content.id, "update_url": reverse("content:update", kwargs={"pk": self.public_content.id}), })
def test_is_time_to_run_before_late_metric_slack_time(self): check_time = datetime.datetime( year=2016, month=11, day=7, hour=11, minute=9, second=59, microsecond=0) PreHourlyProcessorUtil.get_data_provider().set_last_processed( date_time=(check_time + datetime.timedelta(hours=-1))) self.assertFalse(PreHourlyProcessorUtil.is_time_to_run(check_time))
def test_is_time_to_run_after_late_metric_slack_time(self): check_time = datetime.datetime( year=2016, month=11, day=7, hour=11, minute=10, second=0, microsecond=1) PreHourlyProcessorUtil.get_data_provider().set_last_processed( date_time=(check_time + datetime.timedelta(hours=-1))) self.assertTrue(PreHourlyProcessorUtil.is_time_to_run(check_time))
def test_is_time_to_run_with_already_done_this_hour(self): check_time = datetime.datetime( year=2016, month=11, day=7, hour=11, minute=30, second=0, microsecond=0) PreHourlyProcessorUtil.get_data_provider().set_last_processed( date_time=check_time) self.assertFalse(PreHourlyProcessorUtil.is_time_to_run(check_time))
def test_is_time_to_run_after_midnight_but_before_late_metric_slack_time( self): check_time = datetime.datetime( year=2016, month=11, day=7, hour=0, minute=5, second=0, microsecond=0) PreHourlyProcessorUtil.get_data_provider().set_last_processed( date_time=(check_time + datetime.timedelta(hours=-1))) self.assertFalse(PreHourlyProcessorUtil.is_time_to_run(check_time))
def test_is_time_to_run_after_midnight_and_after_late_metric_slack_time( self): check_time = datetime.datetime( year=2016, month=11, day=7, hour=0, minute=10, second=0, microsecond=1) PreHourlyProcessorUtil.get_data_provider().set_last_processed( date_time=(check_time + datetime.timedelta(hours=-1))) self.assertTrue(PreHourlyProcessorUtil.is_time_to_run(check_time))
def test_same_time_different_day_behaviour(self): check_time = datetime.datetime( year=2016, month=11, day=7, hour=22, minute=10, second=0, microsecond=1) PreHourlyProcessorUtil.get_data_provider().set_last_processed( date_time=(check_time + datetime.timedelta(days=-1))) self.assertTrue(PreHourlyProcessorUtil.is_time_to_run(check_time))
def test_crontab(): c = Crontab() c.add('boo') c.add('foo', 0) c.add('bar', [1, 3], -5, -1, -1, 0) assert c.actions(0, 1, 1, 1, 1) == {'boo', 'foo'} assert c.actions(1, 1, 1, 1, 1) == {'boo'} assert c.actions(1, 5, 1, 1, 7) == {'boo', 'bar'} assert c.actions(3, 5, 1, 1, 7) == {'boo', 'bar'} ts = mktime(datetime(2016, 1, 17, 5, 1).timetuple()) assert c.actions_ts(ts) == {'boo', 'bar'}
def _get_extensions(self): pathname = os.path.join(self.dirname, self.filename) name_ver = '%s-%s' % (self.name, self.version) info_dir = '%s.dist-info' % name_ver arcname = posixpath.join(info_dir, 'EXTENSIONS') wrapper = codecs.getreader('utf-8') result = [] with ZipFile(pathname, 'r') as zf: try: with zf.open(arcname) as bf: wf = wrapper(bf) extensions = json.load(wf) cache = self._get_dylib_cache() prefix = cache.prefix_to_dir(pathname) cache_base = os.path.join(cache.base, prefix) if not os.path.isdir(cache_base): os.makedirs(cache_base) for name, relpath in extensions.items(): dest = os.path.join(cache_base, convert_path(relpath)) if not os.path.exists(dest): extract = True else: file_time = os.stat(dest).st_mtime file_time = datetime.datetime.fromtimestamp(file_time) info = zf.getinfo(relpath) wheel_time = datetime.datetime(*info.date_time) extract = wheel_time > file_time if extract: zf.extract(relpath, cache_base) result.append((name, dest)) except KeyError: pass return result
def _get_x509_days_left(x509): date_fmt = '%Y%m%d%H%M%SZ' current_datetime = datetime.datetime.utcnow() not_after = time.strptime(x509.get_notAfter(), date_fmt) not_before = time.strptime(x509.get_notBefore(), date_fmt) ret = {'not_after': (datetime.datetime(*not_after[:6]) - current_datetime).days, 'not_before': (datetime.datetime(*not_before[:6]) - current_datetime).days} return ret
def time_combined(year,month,day,hour,minute,meridiem): """ Time is tricky. So I am following the simple rules; 12:** AM will have the hour changed to 0 1:** AM will not have the hour changed 12:** PM will not have the hour changed 1:** PM will have the hour changed by adding 12 From these simple points, I have constructed the following if statements to take control of the correct hour. """ if meridiem == "AM": if hour == 12: hour = 0 else: if hour < 12: hour = hour + 12 # Create the final start/end date fields return datetime.datetime( year, month, day, hour, minute )
def parse_neuralynx_time_string(self, time_string): # Parse a datetime object from the idiosyncratic time string in Neuralynx file headers try: tmp_date = [int(x) for x in time_string.split()[4].split('/')] tmp_time = [int(x) for x in time_string.split()[-1].replace('.', ':').split(':')] tmp_microsecond = tmp_time[3] * 1000 except: warnings.warn('Unable to parse time string from Neuralynx header: ' + time_string) return None else: return datetime.datetime(tmp_date[2], tmp_date[0], tmp_date[1], # Year, month, day tmp_time[0], tmp_time[1], tmp_time[2], # Hour, minute, second tmp_microsecond)
def test_get_vacations_view_is_working_properly(self): """testing if GET: /api/users/{id}/vacations view is working properly """ from stalker import db, Vacation import datetime vac1 = Vacation( user=self.test_user1, start=datetime.datetime(2016, 4, 24, 0, 0), end=datetime.datetime(2016, 4, 28, 0, 0) ) vac2 = Vacation( user=self.test_user1, start=datetime.datetime(2016, 7, 1, 0, 0), end=datetime.datetime(2016, 7, 8, 0, 0) ) db.DBSession.add_all([vac1, vac2]) db.DBSession.flush() import transaction transaction.commit() from stalker import User user1 = User.query.filter(User.login == self.test_user1.login).first() response = self.test_app.get( '/api/users/%s/vacations' % self.test_user1.id ) self.assertEqual( sorted(response.json_body), sorted([ { 'id': v.id, '$ref': '/api/vacations/%s' % v.id, 'name': v.name, 'entity_type': v.entity_type } for v in [user1.vacations[0], user1.vacations[1]] ]) ) # TASKS
def test_update_entity_is_working_properly(self): """testing if update_entity() method is working properly """ # create a time log import datetime start = datetime.datetime(2016, 7, 26, 16) end = datetime.datetime(2016, 7, 26, 17) new_end = datetime.datetime(2016, 7, 26, 18) from stalker import db, TimeLog db.DBSession.flush() db.DBSession.commit() t1 = TimeLog( task=self.test_task1, resource=self.test_user1, start=start, end=end, created_by=self.test_user2 ) db.DBSession.add(t1) db.DBSession.commit() from stalker_pyramid.testing import DummyRequest, DummyMultiDict request = DummyRequest() request.matchdict['id'] = t1.id request.params = DummyMultiDict() from stalker_pyramid.views import EntityViewBase request.params['end'] = \ EntityViewBase.milliseconds_since_epoch(new_end) self.patch_logged_in_user(request) time_log_view = time_log.TimeLogViews(request) response = time_log_view.update_entity() t1_db = TimeLog.query.filter(TimeLog.name == t1.name).first() self.assertEqual(t1_db.end, new_end)
def test_delete_entity_is_working_properly(self): """testing if delete_entity() method is working properly """ # create a time log import datetime start = datetime.datetime(2016, 7, 26, 16) end = datetime.datetime(2016, 7, 26, 17) from stalker import db, TimeLog db.DBSession.flush() db.DBSession.commit() t1 = TimeLog( task=self.test_task1, resource=self.test_user1, start=start, end=end, created_by=self.test_user2 ) db.DBSession.add(t1) db.DBSession.commit() from stalker_pyramid.testing import DummyRequest request = DummyRequest() request.matchdict['id'] = t1.id time_log_view = time_log.TimeLogViews(request) response = time_log_view.delete_entity() self.assertIsNone( TimeLog.query .filter(TimeLog.task == self.test_task1) .filter(TimeLog.resource == self.test_user1) .first() )
def test_update_entity_is_working_properly_with_post(self): """testing if POST: /api/time_logs/{id} view is working properly """ # create a time log import datetime start = datetime.datetime(2016, 7, 26, 16) end = datetime.datetime(2016, 7, 26, 17) new_end = datetime.datetime(2016, 7, 26, 18) from stalker import db, TimeLog db.DBSession.flush() db.DBSession.commit() t1 = TimeLog( task=self.test_task1, resource=self.test_user1, start=start, end=end, created_by=self.test_user2 ) db.DBSession.add(t1) db.DBSession.commit() from stalker_pyramid.views import EntityViewBase self.admin_login() response = self.test_app.post( '/api/time_logs/%s' % t1.id, params={ 'end': EntityViewBase.milliseconds_since_epoch(new_end) }, status=200 ) t1_db = TimeLog.query.filter(TimeLog.name == t1.name).first() self.assertEqual(t1_db.end, new_end)
def test_delete_entity_is_working_properly(self): """testing if DELETE: /api/time_logs/{id} view is working properly """ # create a time log import datetime start = datetime.datetime(2016, 7, 26, 16) end = datetime.datetime(2016, 7, 26, 17) from stalker import db, TimeLog db.DBSession.flush() db.DBSession.commit() t1 = TimeLog( task=self.test_task1, resource=self.test_user1, start=start, end=end, created_by=self.test_user2 ) db.DBSession.add(t1) db.DBSession.commit() response = self.test_app.delete( '/api/time_logs/%s' % t1.id, status=200 ) self.assertIsNone( TimeLog.query .filter(TimeLog.task == self.test_task1) .filter(TimeLog.resource == self.test_user1) .first() )
def test_create_entity_with_invalid_user_id(self): """testing if create_entity() method is working properly with invalid user_id parameter """ import datetime start = datetime.datetime(2016, 4, 22, 10) end = datetime.datetime(2016, 4, 22, 16) from stalker_pyramid.testing import DummyRequest, DummyMultiDict from stalker_pyramid.views import EntityViewBase request = DummyRequest() request.params = DummyMultiDict() request.params['user_id'] = -1 request.params['start'] = \ EntityViewBase.milliseconds_since_epoch(start) request.params['end'] = EntityViewBase.milliseconds_since_epoch(end) vacation_views = vacation.VacationViews(request) from pyramid.httpexceptions import HTTPServerError with self.assertRaises(HTTPServerError) as cm: vacation_views.create_entity() self.assertEqual( str(cm.exception), 'Missing "user_id" parameter' )