Python datetime 模块,datetime() 实例源码

我们从Python开源项目中,提取了以下43个代码示例,用于说明如何使用datetime.datetime()

项目:Easysentiment    作者:Jflick58    | 项目源码 | 文件源码
def default(self, obj):
        """default method."""
        if hasattr(obj, '__json__'):
            return obj.__json__()
        elif isinstance(obj, collections.Iterable):
            return list(obj)
        elif isinstance(obj, datetime):
            return obj.isoformat()
        elif hasattr(obj, '__getitem__') and hasattr(obj, 'keys'):
            return dict(obj)
        elif hasattr(obj, '__dict__'):
            return {member: getattr(obj, member)
                    for member in dir(obj)
                    if not member.startswith('_') and
                    not hasattr(getattr(obj, member), '__call__')}

        return json.JSONEncoder.default(self, obj)
项目:cbapi-python    作者:carbonblack    | 项目源码 | 文件源码
def get_response_and_time(self, key, default=(None, None)):
        """ Retrieves response and timestamp for `key` if it's stored in cache,
        otherwise returns `default`

        :param key: key of resource
        :param default: return this if `key` not found in cache
        :returns: tuple (response, datetime)

        .. note:: Response is restored after unpickling with :meth:`restore_response`
        """
        try:
            if key not in self.responses:
                key = self.keys_map[key]
            response, timestamp = self.responses[key]
        except KeyError:
            return default
        return self.restore_response(response), timestamp
项目:socialhome    作者:jaywink    | 项目源码 | 文件源码
def setUpTestData(cls):
        super().setUpTestData()
        cls.create_local_and_remote_user()
        cls.user2 = AnonymousUser()
        cls.local_user = UserFactory()
        cls.public_content = ContentFactory(
            visibility=Visibility.PUBLIC, text="**Foobar**", author=cls.profile,
        )
        cls.site_content = ContentFactory(
            visibility=Visibility.SITE, text="_Foobar_"
        )
        cls.limited_content = ContentFactory(visibility=Visibility.LIMITED)
        cls.self_content = ContentFactory(visibility=Visibility.SELF)
        cls.remote_content = ContentFactory(
            visibility=Visibility.PUBLIC, remote_created=make_aware(datetime.datetime(2015, 1, 1)),
            author=cls.remote_profile,
        )
        cls.ids = [
            cls.public_content.id, cls.site_content.id, cls.limited_content.id, cls.self_content.id
        ]
        cls.set = {
            cls.public_content, cls.site_content, cls.limited_content, cls.self_content
        }
项目:Easysentiment    作者:Jflick58    | 项目源码 | 文件源码
def default(self, obj):
        """default method."""
        if hasattr(obj, '__json__'):
            return obj.__json__()
        elif isinstance(obj, collections.Iterable):
            return list(obj)
        elif isinstance(obj, datetime):
            return obj.isoformat()
        elif hasattr(obj, '__getitem__') and hasattr(obj, 'keys'):
            return dict(obj)
        elif hasattr(obj, '__dict__'):
            return {member: getattr(obj, member)
                    for member in dir(obj)
                    if not member.startswith('_') and
                    not hasattr(getattr(obj, member), '__call__')}

        return json.JSONEncoder.default(self, obj)
项目:monasca-transform    作者:openstack    | 项目源码 | 文件源码
def test_am_pm_behaviour(self):
        check_time = datetime.datetime(
            year=2016, month=11, day=7, hour=22,
            minute=10, second=0, microsecond=1)
        PreHourlyProcessorUtil.get_data_provider().set_last_processed(
            date_time=(check_time + datetime.timedelta(hours=-12)))
        self.assertTrue(PreHourlyProcessorUtil.is_time_to_run(check_time))
项目:stalker_pyramid    作者:eoyilmaz    | 项目源码 | 文件源码
def get_time(cls, request, time_attr):
        """Extracts a time object from the given request

        :param request: the request object
        :param time_attr: the attribute name
        :return: datetime.timedelta
        """
        time_part = datetime.datetime.strptime(
            request.params[time_attr][:-4],
            '%a, %d %b %Y %H:%M:%S'
        )

        return datetime.timedelta(
            hours=time_part.hour,
            minutes=time_part.minute
        )
项目:stalker_pyramid    作者:eoyilmaz    | 项目源码 | 文件源码
def get_datetime(cls, request, date_attr, time_attr):
        """Extracts a UTC  datetime object from the given request
        :param request: the request object
        :param date_attr: the attribute name
        :return: datetime.datetime
        """
        date_part = datetime.datetime.strptime(
            request.params[date_attr][:-4],
            '%a, %d %b %Y %H:%M:%S'
        )

        time_part = datetime.datetime.strptime(
            request.params[time_attr][:-4],
            '%a, %d %b %Y %H:%M:%S'
        )

        # update the time values of date_part with time_part
        return date_part.replace(
            hour=time_part.hour,
            minute=time_part.minute,
            second=time_part.second,
            microsecond=time_part.microsecond
        )
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def coerce_to_dtype(dtype, value):
    """
    Make a value with the specified numpy dtype.

    Only datetime64[ns] and datetime64[D] are supported for datetime dtypes.
    """
    name = dtype.name
    if name.startswith('datetime64'):
        if name == 'datetime64[D]':
            return make_datetime64D(value)
        elif name == 'datetime64[ns]':
            return make_datetime64ns(value)
        else:
            raise TypeError(
                "Don't know how to coerce values of dtype %s" % dtype
            )
    return dtype.type(value)
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def get_open_and_close(day, early_closes):
    market_open = pd.Timestamp(
        datetime(
            year=day.year,
            month=day.month,
            day=day.day,
            hour=9,
            minute=31),
        tz='US/Eastern').tz_convert('UTC')
    # 1 PM if early close, 4 PM otherwise
    close_hour = 13 if day in early_closes else 16
    market_close = pd.Timestamp(
        datetime(
            year=day.year,
            month=day.month,
            day=day.day,
            hour=close_hour),
        tz='US/Eastern').tz_convert('UTC')

    return market_open, market_close
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def create_test_panel_ohlc_source(sim_params, env):
    start = sim_params.first_open \
        if sim_params else pd.datetime(1990, 1, 3, 0, 0, 0, 0, pytz.utc)

    end = sim_params.last_close \
        if sim_params else pd.datetime(1990, 1, 8, 0, 0, 0, 0, pytz.utc)

    index = env.days_in_range(start, end)
    price = np.arange(0, len(index)) + 100
    high = price * 1.05
    low = price * 0.95
    open_ = price + .1 * (price % 2 - .5)
    volume = np.ones(len(index)) * 1000
    arbitrary = np.ones(len(index))

    df = pd.DataFrame({'price': price,
                       'high': high,
                       'low': low,
                       'open': open_,
                       'volume': volume,
                       'arbitrary': arbitrary},
                      index=index)
    panel = pd.Panel.from_dict({0: df})

    return DataPanelSource(panel), panel
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def get_open_and_close(day, early_closes):
    # only "early close" event in Bovespa actually is a late start
    # as the market only opens at 1pm
    open_hour = 13 if day in quarta_cinzas else 10
    market_open = pd.Timestamp(
        datetime(
            year=day.year,
            month=day.month,
            day=day.day,
            hour=open_hour,
            minute=00),
        tz='America/Sao_Paulo').tz_convert('UTC')
    market_close = pd.Timestamp(
        datetime(
            year=day.year,
            month=day.month,
            day=day.day,
            hour=16),
        tz='America/Sao_Paulo').tz_convert('UTC')

    return market_open, market_close
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def _coerce_datetime(maybe_dt):
    if isinstance(maybe_dt, datetime.datetime):
        return maybe_dt
    elif isinstance(maybe_dt, datetime.date):
        return datetime.datetime(
            year=maybe_dt.year,
            month=maybe_dt.month,
            day=maybe_dt.day,
            tzinfo=pytz.utc,
        )
    elif isinstance(maybe_dt, (tuple, list)) and len(maybe_dt) == 3:
        year, month, day = maybe_dt
        return datetime.datetime(
            year=year,
            month=month,
            day=day,
            tzinfo=pytz.utc,
        )
    else:
        raise TypeError('Cannot coerce %s into a datetime.datetime'
                        % type(maybe_dt).__name__)
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def get_open_and_close(day, early_closes):
    market_open = pd.Timestamp(
        datetime(
            year=day.year,
            month=day.month,
            day=day.day,
            hour=9,
            minute=31),
        tz='US/Eastern').tz_convert('UTC')
    # 1 PM if early close, 4 PM otherwise
    close_hour = 13 if day in early_closes else 16
    market_close = pd.Timestamp(
        datetime(
            year=day.year,
            month=day.month,
            day=day.day,
            hour=close_hour),
        tz='Asia/Shanghai').tz_convert('UTC')

    return market_open, market_close
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def test_generator_dates(self):
        """
        Ensure the pipeline of generators are in sync, at least as far as
        their current dates.
        """

        sim_params = factory.create_simulation_parameters(
            start=datetime(2011, 7, 30, tzinfo=pytz.utc),
            end=datetime(2012, 7, 30, tzinfo=pytz.utc),
            env=self.env,
        )
        algo = TestAlgo(self, sim_params=sim_params, env=self.env)
        trade_source = factory.create_daily_trade_source(
            [8229],
            sim_params,
            env=self.env,
        )
        algo.set_sources([trade_source])

        gen = algo.get_generator()
        self.assertTrue(list(gen))

        self.assertTrue(algo.slippage.latest_date)
        self.assertTrue(algo.latest_date)
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def test_progress(self):
        """
        Ensure the pipeline of generators are in sync, at least as far as
        their current dates.
        """
        sim_params = factory.create_simulation_parameters(
            start=datetime(2008, 1, 1, tzinfo=pytz.utc),
            end=datetime(2008, 1, 5, tzinfo=pytz.utc),
            env=self.env,
        )
        algo = TestAlgo(self, sim_params=sim_params, env=self.env)
        trade_source = factory.create_daily_trade_source(
            [8229],
            sim_params,
            env=self.env,
        )
        algo.set_sources([trade_source])

        gen = algo.get_generator()
        results = list(gen)
        self.assertEqual(results[-2]['progress'], 1.0)
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def test_checks_should_trigger(self):
        class CountingRule(Always):
            count = 0

            def should_trigger(self, dt, env):
                CountingRule.count += 1
                return True

        for r in [CountingRule] * 5:
                self.em.add_event(
                    Event(r(), lambda context, data: None)
                )

        mock_algo_class = namedtuple('FakeAlgo', ['trading_environment'])
        mock_algo = mock_algo_class(trading_environment="fake_env")
        self.em.handle_data(mock_algo, None, datetime.datetime.now())

        self.assertEqual(CountingRule.count, 5)
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def writeValue(self, value):
        if isinstance(value, (str, unicode)):
            self.simpleElement("string", value)
        elif isinstance(value, bool):
            # must switch for bool before int, as bool is a
            # subclass of int...
            if value:
                self.simpleElement("true")
            else:
                self.simpleElement("false")
        elif isinstance(value, (int, long)):
            self.simpleElement("integer", "%d" % value)
        elif isinstance(value, float):
            self.simpleElement("real", repr(value))
        elif isinstance(value, dict):
            self.writeDict(value)
        elif isinstance(value, Data):
            self.writeData(value)
        elif isinstance(value, datetime.datetime):
            self.simpleElement("date", _dateToString(value))
        elif isinstance(value, (tuple, list)):
            self.writeArray(value)
        else:
            raise TypeError("unsuported type: %s" % type(value))
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def make_comparable(self, other):
        if isinstance(other, DateTime):
            s = self.value
            o = other.value
        elif datetime and isinstance(other, datetime.datetime):
            s = self.value
            o = other.strftime("%Y%m%dT%H:%M:%S")
        elif isinstance(other, (str, unicode)):
            s = self.value
            o = other
        elif hasattr(other, "timetuple"):
            s = self.timetuple()
            o = other.timetuple()
        else:
            otype = (hasattr(other, "__class__")
                     and other.__class__.__name__
                     or type(other))
            raise TypeError("Can't compare %s and %s" %
                            (self.__class__.__name__, otype))
        return s, o
项目:cbapi-python    作者:carbonblack    | 项目源码 | 文件源码
def get_response(self, key, default=None):
        """ Retrieves response and timestamp for `key` if it's stored in cache,
        otherwise returns `default`

        :param key: key of resource
        :param default: return this if `key` not found in cache
        :returns: tuple (response, datetime)

        .. note:: Response is restored after unpickling with :meth:`restore_response`
        """
        try:
            if key not in self.responses:
                key = self.keys_map[key]
            response = self.responses[key]
        except KeyError:
            return default
        return response
项目:tts-bug-bounty-dashboard    作者:18F    | 项目源码 | 文件源码
def test_activity_sets_sla_triaged_at():
    r = new_report()
    r.save()
    assert r.sla_triaged_at is None

    # An activity that shouldn't update sla_triaged_at
    d1 = now()
    r.activities.create(id=1, type='activity-comment', created_at=d1)
    assert r.sla_triaged_at is None

    # And now one that should
    d2 = d1 + datetime.timedelta(hours=3)
    r.activities.create(id=2, type='activity-bug-not-applicable', created_at=d2)
    assert r.sla_triaged_at == d2

    # And now another aciivity that would update the date, if it wasn't already set
    d3 = d2 + datetime.timedelta(hours=3)
    r.activities.create(id=3, type='activity-bug-resolved', created_at=d3)
    assert r.sla_triaged_at == d2
项目:otRebuilder    作者:Pal3love    | 项目源码 | 文件源码
def _load_date(val):
    microsecond = 0
    tz = None
    try:
        if len(val) > 19:
            if val[19] == '.':
                microsecond = int(val[20:26])
                if len(val) > 26:
                    tz = TomlTz(val[26:32])
            else:
                tz = TomlTz(val[19:25])
    except ValueError:
        tz = None
    try:
        d = datetime.datetime(int(val[:4]), int(val[5:7]), int(val[8:10]), int(val[11:13]), int(val[14:16]), int(val[17:19]), microsecond, tz)
    except ValueError:
        return None
    return d
项目:deployfish    作者:caltechads    | 项目源码 | 文件源码
def setUp(self):
        aws_data = {
            'ScalableTargets': [
                {
                    'ServiceNamespace': 'ecs',
                    'ResourceId': 'service/my_cluster/my_service',
                    'ScalableDimension': 'ecs:service:DesiredCount',
                    'MinCapacity': 1,
                    'MaxCapacity': 3,
                    'RoleARN': 'my_role_arn',
                    'CreationTime': datetime(2017, 4, 14)
                }
            ],
            'NextToken': None
        }
        init = Mock(return_value=None)
        init.return_value = None
        appscaling_client = Mock()
        appscaling_client.describe_scalable_targets = Mock(return_value=aws_data)
        client = Mock(return_value=appscaling_client)
        with Replacer() as r:
            r.replace('boto3.client', client)
            r.replace('deployfish.aws.appscaling.ScalingPolicy.__init__', init)
            self.appscaling = ApplicationAutoscaling('my_service', 'my_cluster')
项目:deployfish    作者:caltechads    | 项目源码 | 文件源码
def setUp(self):
        aws_data = {
            'ServiceNamespace': 'ecs',
            'ResourceId': 'service/my_cluster/my_service',
            'ScalableDimension': 'ecs:service:DesiredCount',
            'MinCapacity': 1,
            'MaxCapacity': 3,
            'RoleARN': 'my_role_arn',
            'CreationTime': datetime(2017, 4, 14)
        }
        init = Mock(return_value=None)
        init.return_value = None
        appscaling_client = Mock()
        self.describe_scaling_targets = Mock(return_value=aws_data)
        appscaling_client.describe_scalable_targets = self.describe_scaling_targets
        client = Mock(return_value=appscaling_client)
        with Replacer() as r:
            r.replace('boto3.client', client)
            r.replace('deployfish.aws.appscaling.ScalingPolicy.__init__', init)
            self.appscaling = ApplicationAutoscaling('my_service', 'my_cluster', aws=aws_data)
项目:socialhome    作者:jaywink    | 项目源码 | 文件源码
def test_edited_is_false_for_newly_created_content_within_15_minutes_grace_period(self):
        with freeze_time(self.public_content.created + datetime.timedelta(minutes=14)):
            self.public_content.save()
            self.assertFalse(self.public_content.edited)
项目:socialhome    作者:jaywink    | 项目源码 | 文件源码
def test_edited_is_true_for_newly_created_content_after_15_minutes_grace_period(self):
        with freeze_time(self.public_content.created + datetime.timedelta(minutes=16)):
            self.public_content.save()
            self.assertTrue(self.public_content.edited)
项目:socialhome    作者:jaywink    | 项目源码 | 文件源码
def test_dict_for_view_edited_post(self):
        with freeze_time(self.public_content.created + datetime.timedelta(minutes=16)):
            self.public_content.save()
            self.assertEqual(self.public_content.dict_for_view(self.user), {
                "author": self.public_content.author_id,
                "author_guid": self.public_content.author.guid,
                "author_handle": self.public_content.author.handle,
                "author_home_url": self.public_content.author.home_url,
                "author_image": self.public_content.author.safer_image_url_small,
                "author_is_local": bool(self.public_content.author.user),
                "author_name": self.public_content.author.handle,
                "author_profile_url": self.public_content.author.get_absolute_url(),
                "content_type": self.public_content.content_type.string_value,
                "delete_url": reverse("content:delete", kwargs={"pk": self.public_content.id}),
                "detail_url": self.public_content.get_absolute_url(),
                "formatted_timestamp": self.public_content.timestamp,
                "guid": self.public_content.guid,
                "has_shared": False,
                "humanized_timestamp": "%s (edited)" % self.public_content.humanized_timestamp,
                "id": self.public_content.id,
                "is_authenticated": True,
                "is_author": True,
                "is_following_author": False,
                "parent": "",
                "profile_id": self.public_content.author.id,
                "rendered": self.public_content.rendered,
                "reply_count": 0,
                "reply_url": reverse("content:reply", kwargs={"pk": self.public_content.id}),
                "shares_count": 0,
                "slug": self.public_content.slug,
                "through": self.public_content.id,
                "update_url": reverse("content:update", kwargs={"pk": self.public_content.id}),
            })
项目:monasca-transform    作者:openstack    | 项目源码 | 文件源码
def test_is_time_to_run_before_late_metric_slack_time(self):
        check_time = datetime.datetime(
            year=2016, month=11, day=7, hour=11,
            minute=9, second=59, microsecond=0)
        PreHourlyProcessorUtil.get_data_provider().set_last_processed(
            date_time=(check_time + datetime.timedelta(hours=-1)))
        self.assertFalse(PreHourlyProcessorUtil.is_time_to_run(check_time))
项目:monasca-transform    作者:openstack    | 项目源码 | 文件源码
def test_is_time_to_run_after_late_metric_slack_time(self):
        check_time = datetime.datetime(
            year=2016, month=11, day=7, hour=11,
            minute=10, second=0, microsecond=1)
        PreHourlyProcessorUtil.get_data_provider().set_last_processed(
            date_time=(check_time + datetime.timedelta(hours=-1)))
        self.assertTrue(PreHourlyProcessorUtil.is_time_to_run(check_time))
项目:monasca-transform    作者:openstack    | 项目源码 | 文件源码
def test_is_time_to_run_with_already_done_this_hour(self):
        check_time = datetime.datetime(
            year=2016, month=11, day=7, hour=11,
            minute=30, second=0, microsecond=0)
        PreHourlyProcessorUtil.get_data_provider().set_last_processed(
            date_time=check_time)
        self.assertFalse(PreHourlyProcessorUtil.is_time_to_run(check_time))
项目:monasca-transform    作者:openstack    | 项目源码 | 文件源码
def test_is_time_to_run_after_midnight_but_before_late_metric_slack_time(
            self):
        check_time = datetime.datetime(
            year=2016, month=11, day=7, hour=0,
            minute=5, second=0, microsecond=0)
        PreHourlyProcessorUtil.get_data_provider().set_last_processed(
            date_time=(check_time + datetime.timedelta(hours=-1)))
        self.assertFalse(PreHourlyProcessorUtil.is_time_to_run(check_time))
项目:monasca-transform    作者:openstack    | 项目源码 | 文件源码
def test_is_time_to_run_after_midnight_and_after_late_metric_slack_time(
            self):
        check_time = datetime.datetime(
            year=2016, month=11, day=7, hour=0,
            minute=10, second=0, microsecond=1)
        PreHourlyProcessorUtil.get_data_provider().set_last_processed(
            date_time=(check_time + datetime.timedelta(hours=-1)))
        self.assertTrue(PreHourlyProcessorUtil.is_time_to_run(check_time))
项目:monasca-transform    作者:openstack    | 项目源码 | 文件源码
def test_same_time_different_day_behaviour(self):
        check_time = datetime.datetime(
            year=2016, month=11, day=7, hour=22,
            minute=10, second=0, microsecond=1)
        PreHourlyProcessorUtil.get_data_provider().set_last_processed(
            date_time=(check_time + datetime.timedelta(days=-1)))
        self.assertTrue(PreHourlyProcessorUtil.is_time_to_run(check_time))
项目:dsq    作者:baverman    | 项目源码 | 文件源码
def test_crontab():
    c = Crontab()
    c.add('boo')
    c.add('foo', 0)
    c.add('bar', [1, 3], -5, -1, -1, 0)

    assert c.actions(0, 1, 1, 1, 1) == {'boo', 'foo'}
    assert c.actions(1, 1, 1, 1, 1) == {'boo'}
    assert c.actions(1, 5, 1, 1, 7) == {'boo', 'bar'}
    assert c.actions(3, 5, 1, 1, 7) == {'boo', 'bar'}

    ts = mktime(datetime(2016, 1, 17, 5, 1).timetuple())
    assert c.actions_ts(ts) == {'boo', 'bar'}
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def _get_extensions(self):
        pathname = os.path.join(self.dirname, self.filename)
        name_ver = '%s-%s' % (self.name, self.version)
        info_dir = '%s.dist-info' % name_ver
        arcname = posixpath.join(info_dir, 'EXTENSIONS')
        wrapper = codecs.getreader('utf-8')
        result = []
        with ZipFile(pathname, 'r') as zf:
            try:
                with zf.open(arcname) as bf:
                    wf = wrapper(bf)
                    extensions = json.load(wf)
                    cache = self._get_dylib_cache()
                    prefix = cache.prefix_to_dir(pathname)
                    cache_base = os.path.join(cache.base, prefix)
                    if not os.path.isdir(cache_base):
                        os.makedirs(cache_base)
                    for name, relpath in extensions.items():
                        dest = os.path.join(cache_base, convert_path(relpath))
                        if not os.path.exists(dest):
                            extract = True
                        else:
                            file_time = os.stat(dest).st_mtime
                            file_time = datetime.datetime.fromtimestamp(file_time)
                            info = zf.getinfo(relpath)
                            wheel_time = datetime.datetime(*info.date_time)
                            extract = wheel_time > file_time
                        if extract:
                            zf.extract(relpath, cache_base)
                        result.append((name, dest))
            except KeyError:
                pass
        return result
项目:nova    作者:hubblestack    | 项目源码 | 文件源码
def _get_x509_days_left(x509):
    date_fmt = '%Y%m%d%H%M%SZ'
    current_datetime = datetime.datetime.utcnow()
    not_after = time.strptime(x509.get_notAfter(), date_fmt)
    not_before = time.strptime(x509.get_notBefore(), date_fmt)

    ret = {'not_after': (datetime.datetime(*not_after[:6]) - current_datetime).days,
           'not_before': (datetime.datetime(*not_before[:6]) - current_datetime).days}

    return ret
项目:NearBeach    作者:robotichead    | 项目源码 | 文件源码
def time_combined(year,month,day,hour,minute,meridiem):
    """
    Time is tricky. So I am following the simple rules;
    12:** AM will have the hour changed to 0
    1:** AM will not have the hour changed
    12:** PM will not have the hour changed
    1:** PM will have the hour changed by adding 12

    From these simple points, I have constructed the following
    if statements to take control of the correct hour.
    """
    if meridiem == "AM":
        if hour == 12:
            hour = 0
    else:
        if hour < 12:
            hour = hour + 12



    # Create the final start/end date fields
    return datetime.datetime(
        year,
        month,
        day,
        hour,
        minute
    )
项目:spyking-circus    作者:spyking-circus    | 项目源码 | 文件源码
def parse_neuralynx_time_string(self, time_string):
        # Parse a datetime object from the idiosyncratic time string in Neuralynx file headers
        try:
            tmp_date = [int(x) for x in time_string.split()[4].split('/')]
            tmp_time = [int(x) for x in time_string.split()[-1].replace('.', ':').split(':')]
            tmp_microsecond = tmp_time[3] * 1000
        except:
            warnings.warn('Unable to parse time string from Neuralynx header: ' + time_string)
            return None
        else:
            return datetime.datetime(tmp_date[2], tmp_date[0], tmp_date[1],  # Year, month, day
                                     tmp_time[0], tmp_time[1], tmp_time[2],  # Hour, minute, second
                                     tmp_microsecond)
项目:stalker_pyramid    作者:eoyilmaz    | 项目源码 | 文件源码
def test_get_vacations_view_is_working_properly(self):
        """testing if GET: /api/users/{id}/vacations view is working properly
        """
        from stalker import db, Vacation
        import datetime
        vac1 = Vacation(
            user=self.test_user1,
            start=datetime.datetime(2016, 4, 24, 0, 0),
            end=datetime.datetime(2016, 4, 28, 0, 0)
        )

        vac2 = Vacation(
            user=self.test_user1,
            start=datetime.datetime(2016, 7, 1, 0, 0),
            end=datetime.datetime(2016, 7, 8, 0, 0)
        )
        db.DBSession.add_all([vac1, vac2])

        db.DBSession.flush()
        import transaction
        transaction.commit()

        from stalker import User
        user1 = User.query.filter(User.login == self.test_user1.login).first()
        response = self.test_app.get(
            '/api/users/%s/vacations' % self.test_user1.id
        )

        self.assertEqual(
            sorted(response.json_body),
            sorted([
                {
                    'id': v.id,
                    '$ref': '/api/vacations/%s' % v.id,
                    'name': v.name,
                    'entity_type': v.entity_type
                } for v in [user1.vacations[0], user1.vacations[1]]
            ])
        )

    # TASKS
项目:stalker_pyramid    作者:eoyilmaz    | 项目源码 | 文件源码
def test_update_entity_is_working_properly(self):
        """testing if update_entity() method is working properly
        """
        # create a time log
        import datetime
        start = datetime.datetime(2016, 7, 26, 16)
        end = datetime.datetime(2016, 7, 26, 17)
        new_end = datetime.datetime(2016, 7, 26, 18)

        from stalker import db, TimeLog

        db.DBSession.flush()
        db.DBSession.commit()

        t1 = TimeLog(
            task=self.test_task1,
            resource=self.test_user1,
            start=start,
            end=end,
            created_by=self.test_user2
        )
        db.DBSession.add(t1)
        db.DBSession.commit()

        from stalker_pyramid.testing import DummyRequest, DummyMultiDict
        request = DummyRequest()
        request.matchdict['id'] = t1.id
        request.params = DummyMultiDict()

        from stalker_pyramid.views import EntityViewBase
        request.params['end'] = \
            EntityViewBase.milliseconds_since_epoch(new_end)

        self.patch_logged_in_user(request)
        time_log_view = time_log.TimeLogViews(request)

        response = time_log_view.update_entity()

        t1_db = TimeLog.query.filter(TimeLog.name == t1.name).first()
        self.assertEqual(t1_db.end, new_end)
项目:stalker_pyramid    作者:eoyilmaz    | 项目源码 | 文件源码
def test_delete_entity_is_working_properly(self):
        """testing if delete_entity() method is working properly
        """
        # create a time log
        import datetime
        start = datetime.datetime(2016, 7, 26, 16)
        end = datetime.datetime(2016, 7, 26, 17)

        from stalker import db, TimeLog

        db.DBSession.flush()
        db.DBSession.commit()

        t1 = TimeLog(
            task=self.test_task1,
            resource=self.test_user1,
            start=start,
            end=end,
            created_by=self.test_user2
        )
        db.DBSession.add(t1)
        db.DBSession.commit()

        from stalker_pyramid.testing import DummyRequest
        request = DummyRequest()
        request.matchdict['id'] = t1.id
        time_log_view = time_log.TimeLogViews(request)

        response = time_log_view.delete_entity()

        self.assertIsNone(
            TimeLog.query
            .filter(TimeLog.task == self.test_task1)
            .filter(TimeLog.resource == self.test_user1)
            .first()
        )
项目:stalker_pyramid    作者:eoyilmaz    | 项目源码 | 文件源码
def test_update_entity_is_working_properly_with_post(self):
        """testing if POST: /api/time_logs/{id} view is working properly
        """
        # create a time log
        import datetime
        start = datetime.datetime(2016, 7, 26, 16)
        end = datetime.datetime(2016, 7, 26, 17)
        new_end = datetime.datetime(2016, 7, 26, 18)

        from stalker import db, TimeLog

        db.DBSession.flush()
        db.DBSession.commit()

        t1 = TimeLog(
            task=self.test_task1,
            resource=self.test_user1,
            start=start,
            end=end,
            created_by=self.test_user2
        )
        db.DBSession.add(t1)
        db.DBSession.commit()

        from stalker_pyramid.views import EntityViewBase

        self.admin_login()
        response = self.test_app.post(
            '/api/time_logs/%s' % t1.id,
            params={
                'end': EntityViewBase.milliseconds_since_epoch(new_end)
            },
            status=200
        )

        t1_db = TimeLog.query.filter(TimeLog.name == t1.name).first()
        self.assertEqual(t1_db.end, new_end)
项目:stalker_pyramid    作者:eoyilmaz    | 项目源码 | 文件源码
def test_delete_entity_is_working_properly(self):
        """testing if DELETE: /api/time_logs/{id} view is working properly
        """
        # create a time log
        import datetime
        start = datetime.datetime(2016, 7, 26, 16)
        end = datetime.datetime(2016, 7, 26, 17)

        from stalker import db, TimeLog

        db.DBSession.flush()
        db.DBSession.commit()

        t1 = TimeLog(
            task=self.test_task1,
            resource=self.test_user1,
            start=start,
            end=end,
            created_by=self.test_user2
        )
        db.DBSession.add(t1)
        db.DBSession.commit()

        response = self.test_app.delete(
            '/api/time_logs/%s' % t1.id,
            status=200
        )

        self.assertIsNone(
            TimeLog.query
            .filter(TimeLog.task == self.test_task1)
            .filter(TimeLog.resource == self.test_user1)
            .first()
        )
项目:stalker_pyramid    作者:eoyilmaz    | 项目源码 | 文件源码
def test_create_entity_with_invalid_user_id(self):
        """testing if create_entity() method is working properly with invalid
        user_id parameter
        """
        import datetime
        start = datetime.datetime(2016, 4, 22, 10)
        end = datetime.datetime(2016, 4, 22, 16)

        from stalker_pyramid.testing import DummyRequest, DummyMultiDict
        from stalker_pyramid.views import EntityViewBase
        request = DummyRequest()
        request.params = DummyMultiDict()
        request.params['user_id'] = -1
        request.params['start'] = \
            EntityViewBase.milliseconds_since_epoch(start)
        request.params['end'] = EntityViewBase.milliseconds_since_epoch(end)

        vacation_views = vacation.VacationViews(request)
        from pyramid.httpexceptions import HTTPServerError
        with self.assertRaises(HTTPServerError) as cm:
            vacation_views.create_entity()

        self.assertEqual(
            str(cm.exception),
            'Missing "user_id" parameter'
        )