我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用nose.tools.assert_equals()。
def test_package_update(self): idf.plugin.create_country_codes() helpers.call_action( 'package_create', name='test_package', custom_text='this is my custom text', country_code='uk', resources=[{ 'url': 'http://test.com/', 'custom_resource_text': 'my custom resource', }]) result = helpers.call_action( 'package_update', name='test_package', custom_text='this is my updated text', country_code='ie', resources=[{ 'url': 'http://test.com/', 'custom_resource_text': 'updated custom resource', }] ) nt.assert_equals('this is my updated text', result['custom_text']) nt.assert_equals([u'ie'], result['country_code']) nt.assert_equals('updated custom resource', result['resources'][0]['custom_resource_text'])
def test_returns_no_tracking_when_no_execution_id_is_found(self): self.engine.cwlogs.get_log_events.side_effect = CloudWatchStreamDoesNotExist() workflow_state = {e: core.STATE_UNKNOWN for e in self.workflow_events} execution_id = "transaction-id-123" expected = { "events_defined": workflow_state, "events_received": [], "tracking_summary": { "last_received_event": None, "subscribers": [], "execution_path": self.engine \ ._generate_execution_path(workflow_state) } } actual = self.engine.track(self.workflow_id, self.execution_id) nt.assert_equals(expected, actual)
def test_successfully_gets_log_events(self): mocked_timestamp = 1476826208 * 1000 mocked_message = '{"foo": "bar"}' mocked_events = { "events": [ { "timestamp": mocked_timestamp, "message": mocked_message } ], "nextForwardToken": None } self.logs.cwlogs.get_log_events.return_value = mocked_events expected = [{ "timestamp": utils.format_datetime(datetime.fromtimestamp(mocked_timestamp / 1000)), "data": json.loads(mocked_message) }] actual = self.logs.get_log_events(self.log_group, self.log_stream) nt.assert_equals(expected, actual)
def test_simple_engine(): engine = Engine({ 'datasource': { 'class': TestDataSource, 'params': { 'csv': test_data_file, }, }, 'algorithm': { 'class': TestSimpleAlgorithm, 'params': { 'model.pickle': '~/.tidml/tests/model.pkl', # default built-in }, }, }) engine.train() models = engine.load_models() prediction = engine.predict(models, 3) nt.assert_equals(prediction, 6)
def test_multiple_algorithms_engine(): engine = Engine({ 'datasource': TestEmptyDataSource, 'algorithms': { 'algo1': { 'class': TestMultiAlgorithm, 'params': {'p': 'A'}, }, 'algo2': { 'class': TestMultiAlgorithm, 'params': {'p': 'B'}, }, }, 'serving': TestIdentityServing, }) models = { 'algo1': object(), 'algo2': object(), } nt.assert_equals(engine.predict(models, None), { 'algo1': 'A', 'algo2': 'B', })
def test_ipv4_async(): global FLAG FLAG = Value('i', 0) nma = nmap.PortScannerAsync() def callback_result(host, scan_result): global FLAG FLAG.value = 1 nma.scan(hosts='127.0.0.1', arguments='-p 22 -Pn', callback=callback_result) while nma.still_scanning(): nma.wait(2) assert_equals(FLAG.value, 1)
def test_ipv6_async(): global FLAG FLAG = Value('i', 0) nma = nmap.PortScannerAsync() def callback_result(host, scan_result): global FLAG FLAG.value = 1 nma.scan(hosts='::1', arguments='-6 -p 22 -Pn', callback=callback_result) while nma.still_scanning(): nma.wait(2) assert_equals(FLAG.value, 1)
def test_multipe_osmatch(): assert('osmatch' in nm['127.0.0.1']) assert('portused' in nm['127.0.0.1']) for osm in nm['127.0.0.1']['osmatch']: assert('accuracy' in osm) assert('line' in osm) assert('name' in osm) assert('osclass' in osm) assert('accuracy' in osm['osclass'][0]) assert('cpe' in osm['osclass'][0]) assert('osfamily' in osm['osclass'][0]) assert('osgen' in osm['osclass'][0]) assert('type' in osm['osclass'][0]) assert('vendor' in osm['osclass'][0]) # def test_host_and_port_as_unicode(): # # nosetests -x -s nmap/test_nmap.py:test_port_as_unicode # # Covers bug : https://bitbucket.org/xael/python-nmap/issues/9/can-not-pass-ports-with-unicode-string-at # nma = nm.scan(hosts=u'127.0.0.1', ports=u'22') # assert_equals(nma['nmap']['scaninfo']['error'], '')
def test_Resources(): rANO = gantt.Resource('ANO') rANO.add_vacations( dfrom=datetime.date(2015, 2, 2), dto=datetime.date(2015, 2, 4) ) # test global vacations assert_equals(rANO.is_available(datetime.date(2015, 1, 1)), False) # test resource vacations assert_equals(rANO.is_available(datetime.date(2015, 2, 1)), True) assert_equals(rANO.is_available(datetime.date(2015, 2, 2)), False) assert_equals(rANO.is_available(datetime.date(2015, 2, 3)), False) assert_equals(rANO.is_available(datetime.date(2015, 2, 3)), False) assert_equals(rANO.is_available(datetime.date(2015, 2, 5)), True) # Second resource rJLS = gantt.Resource('JLS') return
def test_latex_completions(): from IPython.core.latex_symbols import latex_symbols import random ip = get_ipython() # Test some random unicode symbols keys = random.sample(latex_symbols.keys(), 10) for k in keys: text, matches = ip.complete(k) nt.assert_equal(len(matches),1) nt.assert_equal(text, k) nt.assert_equal(matches[0], latex_symbols[k]) # Test a more complex line text, matches = ip.complete(u'print(\\alpha') nt.assert_equals(text, u'\\alpha') nt.assert_equals(matches[0], latex_symbols['\\alpha']) # Test multiple matching latex symbols text, matches = ip.complete(u'\\al') nt.assert_in('\\alpha', matches) nt.assert_in('\\aleph', matches)
def test_image_to_obs(self, mock_ps, mock_dl): session = jobs.db_session() session.query(bm.Observation).delete() session.commit() obs = session.query(bm.Observation).all() nt.assert_equals(obs, []) teardown_singleton_pubsub() mock_dl.return_value = [(7.0, 'mockingbird')] jobs.image_to_observation(self.file_path_gps, self.file_path) obs = session.query(bm.Observation).all() # TODO: test better what's in this obs nt.assert_equals(len(obs), 1) mock_dl.assert_called_once_with(self.file_path) mock_ps.assert_called_once_with() # as long as we return the repr of geometry, we cannot do this assert: # mock_ps().publish.assert_called_once_with(obs[0].as_public_dict()) nt.assert_equals(mock_ps().publish.call_count, 1)
def test_ipv6_async(): global FLAG_ipv6 FLAG_ipv6 = Value('i', 0) nma_ipv6 = nmap.PortScannerAsync() def callback_result(host, scan_result): global FLAG_ipv6 FLAG_ipv6.value = 1 nma_ipv6.scan(hosts='::1', arguments='-6 -p 22 -Pn', callback=callback_result) while nma_ipv6.still_scanning(): nma_ipv6.wait(2) assert_equals(FLAG_ipv6.value, 1)
def prepare_migration_attachments_ipv6(api): engine = api.system_service() hosts_service = engine.hosts_service() for index, host in enumerate( test_utils.hosts_in_cluster_v4(engine, CLUSTER_NAME), start=1): host_service = hosts_service.host_service(id=host.id) ip_address = MIGRATION_NETWORK_IPv6_ADDR.format(index) ip_configuration = network_utils_v4.create_static_ip_configuration( ipv6_addr=ip_address, ipv6_mask=MIGRATION_NETWORK_IPv6_MASK) network_utils_v4.modify_ip_config( engine, host_service, MIGRATION_NETWORK, ip_configuration) actual_address = next(nic for nic in host_service.nics_service().list() if nic.name == VLAN200_IF_NAME).ipv6.address nt.assert_equals(IPAddress(actual_address), IPAddress(ip_address))
def attach_vm_network_to_host_static_config(api): host = test_utils.hosts_in_cluster_v3(api, CLUSTER_NAME)[0] ip_configuration = network_utils_v3.create_static_ip_configuration( VM_NETWORK_IPv4_ADDR, VM_NETWORK_IPv4_MASK, VM_NETWORK_IPv6_ADDR, VM_NETWORK_IPv6_MASK) network_utils_v3.attach_network_to_host( api, host, NIC_NAME, VM_NETWORK, ip_configuration) # TODO: currently ost uses v3 SDK that doesn't report ipv6. once available, # verify ipv6 as well. nt.assert_equals( host.nics.list(name=VLAN_IF_NAME)[0].ip.address, VM_NETWORK_IPv4_ADDR)
def prepare_migration_attachments_ipv4(api): for index, host in enumerate( test_utils.hosts_in_cluster_v3(api, CLUSTER_NAME), start=1): ip_address = MIGRATION_NETWORK_IPv4_ADDR.format(index) ip_configuration = network_utils_v3.create_static_ip_configuration( ipv4_addr=ip_address, ipv4_mask=MIGRATION_NETWORK_IPv4_MASK) network_utils_v3.attach_network_to_host( api, host, NIC_NAME, MIGRATION_NETWORK, ip_configuration) nt.assert_equals( host.nics.list(name=VLAN200_IF_NAME)[0].ip.address, ip_address)
def prepare_migration_attachments_ipv4(api): engine = api.system_service() hosts_service = engine.hosts_service() for index, host in enumerate( test_utils.hosts_in_cluster_v4(engine, CLUSTER_NAME), start=1): host_service = hosts_service.host_service(id=host.id) ip_address = MIGRATION_NETWORK_IPv4_ADDR.format(index) ip_configuration = network_utils_v4.create_static_ip_configuration( ipv4_addr=ip_address, ipv4_mask=MIGRATION_NETWORK_IPv4_MASK) network_utils_v4.attach_network_to_host( host_service, NIC_NAME, MIGRATION_NETWORK, ip_configuration) actual_address = next(nic for nic in host_service.nics_service().list() if nic.name == VLAN200_IF_NAME).ip.address nt.assert_equals(IPAddress(actual_address), IPAddress(ip_address))
def test_property_docstring_is_in_info_for_detail_level_0(): class A(object): @property def foobar(self): """This is `foobar` property.""" pass ip.user_ns['a_obj'] = A() nt.assert_equals( 'This is `foobar` property.', ip.object_inspect('a_obj.foobar', detail_level=0)['docstring']) ip.user_ns['a_cls'] = A nt.assert_equals( 'This is `foobar` property.', ip.object_inspect('a_cls.foobar', detail_level=0)['docstring'])
def test_open_dois(self, test_data): (doi, fulltext_url, license) = test_data my_pub = pub.lookup_product_by_doi(doi) my_pub.recalculate() logger.info(u"was looking for {}, got {}\n\n".format(fulltext_url, my_pub.fulltext_url)) logger.info(u"doi: http://doi.org/{}".format(doi)) logger.info(u"title: {}".format(my_pub.best_title)) logger.info(u"evidence: {}\n\n".format(my_pub.evidence)) if my_pub.error: logger.info(my_pub.error) assert_not_equals(my_pub.fulltext_url, None) # @data(*closed_dois) # def test_closed_dois(self, test_data): # (doi, fulltext_url, license) = test_data # my_pub = pub.lookup_product_by_doi(doi) # my_pub.recalculate() # # logger.info(u"was looking for {}, got {}\n\n".format(fulltext_url, my_pub.fulltext_url)) # logger.info(u"doi: http://doi.org/{}".format(doi)) # logger.info(u"title: {}".format(my_pub.best_title)) # logger.info(u"evidence: {}\n\n".format(my_pub.evidence)) # if my_pub.error: # logger.info(my_pub.error) # # assert_equals(my_pub.fulltext_url, None) # # have to scrape the publisher pages to find these
def test_initialization(self): assert_equals(Acl('test').name, 'test')
def test_initialization_default(self): assert_equals(Acl().name, Acl._default_name)
def test_add(self): acl = Acl() acl.add(Ace()) assert_equals(len(acl), 1) assert_equals(acl[-1]._line_number, 1) acl.add(Ace()) assert_equals(len(acl), 1) assert_equals(acl[-1]._line_number, 1) acl.add(Ace(logging=2)) assert_equals(acl[-1]._line_number, 2)
def test_remove(self): acl = Acl() acl.add(Ace()) acl.remove(Ace()) assert_equals(len(acl), 0)
def test_getitem(self): acl = Acl() ace = Ace() acl.add(ace) assert_equals(id(acl[0]), id(ace))
def test_eqaulity(self): assert_equals(Acl(), Acl()) acl01 = Acl() acl01.add(Ace(logging=2)) acl02 = Acl() acl02.add(Ace(logging=2)) assert_equals(acl01, acl02)
def test_repr(self): expected = '<Acl test01 #0>' assert_equals(Acl('test01').__repr__(), expected)
def test_str(self): acl = Acl('test01') expected = 'Acl test01 #0' assert_equals(acl.__str__(), expected) acl.add(Ace(permit=False, network='1.2.3.0/24 4.5.6.0/24')) expected = 'Acl test01 #1\n\tdeny ip 1.2.3.0/24 4.5.6.0/24' assert_equals(acl.__str__(), expected)
def test1(self): """r2 does not verify the formula.""" assert_equals(self.hierarchy.check_all_ancestors("g2"), {'g1': {'f1': "['r2']"}}) # def test2(self): # """after adding an edge all the node are valid""" # self.hie2.add_edge("r2", "a1") # assert_equals(self.hie2.check(), # {'g1': {'or(not cnt(Region),<1<=Adj>cnt(Agent))': "[]"}})
def test_filter_get_health_to_get_only_stateless_nodes(self, MockCluster): # Mock cluster = MockCluster() payload = {} metrics = {"totalCPU": 0, "totalMEM": 0, "usedCPU": 0, "usedMEM": 0, "ratioCPU": 0, "ratioMEM": 0, "nbNodes": 0} with open('./test/mockupDown.json') as json_data: payload = json.load(json_data) cluster.get_health.return_value = payload # Act response = cluster.filter_stateless(metrics, cluster.get_health()) # Assert #assert_equals(response['nbNodes'], 4)
def test_package_create(self): result = helpers.call_action('package_create', name='test_package', custom_text='this is my custom text') nt.assert_equals('this is my custom text', result['custom_text'])
def test_package_update(self): helpers.call_action('package_create', name='test_package', custom_text='this is my custom text') result = helpers.call_action('package_update', name='test_package', custom_text='this is my updated text') nt.assert_equals('this is my updated text', result['custom_text'])
def test_package_show(self): helpers.call_action('package_create', name='test_package', custom_text='this is my custom text') result = helpers.call_action('package_show', name_or_id='test_package') nt.assert_equals('this is my custom text', result['custom_text'])
def test_package_create(self): idf.plugin_v4.create_country_codes() result = helpers.call_action('package_create', name='test_package', custom_text='this is my custom text', country_code='uk') nt.assert_equals('this is my custom text', result['custom_text']) nt.assert_equals([u'uk'], result['country_code'])
def test_package_update(self): idf.plugin_v4.create_country_codes() helpers.call_action('package_create', name='test_package', custom_text='this is my custom text', country_code='uk') result = helpers.call_action('package_update', name='test_package', custom_text='this is my updated text', country_code='ie') nt.assert_equals('this is my updated text', result['custom_text']) nt.assert_equals([u'ie'], result['country_code'])
def test_package_show(self): idf.plugin.create_country_codes() helpers.call_action( 'package_create', name='test_package', custom_text='this is my custom text', country_code='uk', resources=[{ 'url': 'http://test.com/', 'custom_resource_text': 'my custom resource', }] ) result = helpers.call_action('package_show', name_or_id='test_package') nt.assert_equals('my custom resource', result['resources'][0]['custom_resource_text']) nt.assert_equals('my custom resource', result['resources'][0]['custom_resource_text'])
def test_upgrade_from_sha_with_wrong_password_fails_to_upgrade(self): user = factories.User() password = u'testpassword' user_obj = model.User.by_name(user['name']) old_hash = self._set_password(password) user_obj._password = old_hash user_obj.save() nt.assert_false(user_obj.validate_password('wrongpass')) nt.assert_equals(old_hash, user_obj.password) nt.assert_false(pbkdf2_sha512.identify(user_obj.password))
def test_upgrade_from_pbkdf2_with_less_rounds(self): '''set up a pbkdf key with less than the default rounds If the number of default_rounds is increased in a later version of passlib, ckan should upgrade the password hashes for people without involvement from users''' user = factories.User() password = u'testpassword' user_obj = model.User.by_name(user['name']) # setup hash with salt/rounds less than the default old_hash = pbkdf2_sha512.encrypt(password, salt_size=2, rounds=10) user_obj._password = old_hash user_obj.save() nt.assert_true(user_obj.validate_password(password.encode('utf-8'))) # check that the hash has been updated nt.assert_not_equals(old_hash, user_obj.password) new_hash = pbkdf2_sha512.from_string(user_obj.password) nt.assert_true(pbkdf2_sha512.default_rounds > 10) nt.assert_equals(pbkdf2_sha512.default_rounds, new_hash.rounds) nt.assert_true(pbkdf2_sha512.default_salt_size, 2) nt.assert_equals(pbkdf2_sha512.default_salt_size, len(new_hash.salt)) nt.assert_true(pbkdf2_sha512.verify(password, user_obj.password))
def test_upgrade_from_pbkdf2_fails_with_wrong_password(self): user = factories.User() password = u'testpassword' user_obj = model.User.by_name(user['name']) # setup hash with salt/rounds less than the default old_hash = pbkdf2_sha512.encrypt(password, salt_size=2, rounds=10) user_obj._password = old_hash user_obj.save() nt.assert_false(user_obj.validate_password('wrong_pass')) # check that the hash has _not_ been updated nt.assert_equals(old_hash, user_obj.password)
def assert_inode_equal(self, a, b): nt.assert_equals(os.stat(a).st_ino, os.stat(b).st_ino, "%r and %r do not reference the same indoes" %(a, b))
def assert_content_equal(self, a, b): with open(a) as a_f: with open(b) as b_f: nt.assert_equals(a_f.read(), b_f.read())
def test_config_successfully_validates(self): ''' Test config is validated for a correct config ''' config_path = config_dir + "/valid.yaml" Engine.validate_config(config_path) nt.assert_equals(True, True)
def test_lambdas_are_setup(self): ''' Tests that aws lambdas are created or updated from lambdas defined in the config ''' lambda_names = [l['name'] for l in self.test_config['lambdas'] or []] expected = {name: ANY for name in lambda_names} actual = self.engine.setup_lambdas() nt.assert_equals(expected, actual)
def test_streams_and_subscriptions_are_setup(self): ''' Tests that streams are created and lambdas are subscribed successfully to these streams ''' lambda_mappings = self.engine.setup_lambdas() subscriptions = self.test_config['subscriptions'] or [] stream_names = [s['event'] for s in subscriptions] subscribers = [] for ss in subscriptions: for sb in ss['subscribers'] or []: subscribers.append(sb) expected = {name: ANY for name in stream_names} actual = self.engine.setup_streams_and_subscriptions(lambda_mappings) nt.assert_equals(expected, actual) nt.assert_equals(len(stream_names), self.engine.kinesis.get_or_create_stream.call_count) nt.assert_equals(len(subscribers), self.engine.awslambda.subscribe_to_stream.call_count)
def test_tracker_is_setup(self, generate_code_mock, write_mock): ''' Tests that the tracker lambda is created along with its log group and that it is subscribed to all streams in the workflow ''' workflow_id = "test_workflow" stream_arns = [ "arn:aws:kinesis:eu-west-1:xxxxxxxxxxxx:stream/TestEvent1", "arn:aws:kinesis:eu-west-1:xxxxxxxxxxxx:stream/TestEvent2", "arn:aws:kinesis:eu-west-1:xxxxxxxxxxxx:stream/TestEvent3" ] self.engine.setup_tracker(workflow_id, stream_arns) nt.assert_equals(1, self.engine.awslambda.create_or_update_function.call_count) nt.assert_equals(len(stream_arns), self.engine.awslambda.subscribe_to_stream.call_count) nt.assert_equals(1, self.engine.cwlogs.create_log_group.call_count)
def test_workflows_are_setup(self, generate_code_mock, write_mock): ''' Tests that all workflows as defined in the config are created along with their trackers ''' lambda_mappings = self.engine.setup_lambdas() stream_mappings = self.engine.setup_streams_and_subscriptions(lambda_mappings) workflows = self.test_config.get('workflows') or [] num_workflows = len(workflows) self.engine.setup_workflows(stream_mappings) num_log_groups_created = self.engine.cwlogs.create_log_group.call_count nt.assert_equals(num_workflows, num_log_groups_created)
def test_publish_is_successful(self, cwlogs_mock, kinesis_mock, lambda_mock): ''' Test data is successfully published to a stream ''' config_path = config_dir + "/valid.yaml" engine = Engine(config_path) engine.publish("test_stream", "test_data") num_publishes = engine.kinesis.publish.call_count nt.assert_equals(1, num_publishes)
def test_workflow_successfully_tracks_on_successful_execution(self): # Mock so that all events defined are received (and therefore logged) mocked_logged_events = [ { "timestamp": "2016-10-09T23:11:00Z", "data": { "event_name": e, "execution_id": self.execution_id } } for e in self.workflow_events ] self.engine.cwlogs.get_log_events.return_value = mocked_logged_events expected_workflow_state = {e: core.STATE_RECEIVED for e in self.workflow_events} expected_last_received_event = self.workflow_events[len(self.workflow_events)-1] expected_subscribers = [ss['subscribers'] for ss in self.test_config['subscriptions'] \ if ss['event'] == expected_last_received_event][0] expected = { "events_defined": expected_workflow_state, "events_received": mocked_logged_events, "tracking_summary": { "last_received_event": expected_last_received_event, "subscribers": expected_subscribers, "execution_path": self.engine \ ._generate_execution_path(expected_workflow_state) } } actual = self.engine.track(self.workflow_id, self.execution_id) nt.assert_equals(expected, actual)
def test_workflow_successfully_tracks_on_failed_execution(self): # Mock so that all events defined except the last one # are received (and therefore logged) mocked_logged_events = [ { "timestamp": "2016-10-09T23:11:00Z", "data": { "event_name": e, "execution_id": self.execution_id } } for e in self.workflow_events[:-1] # exclude last event ] self.engine.cwlogs.get_log_events.return_value = mocked_logged_events expected_workflow_state = {e: core.STATE_RECEIVED for e in self.workflow_events[:-1]} expected_workflow_state[self.workflow_events[-1:][0]] = core.STATE_UNKNOWN expected_last_received_event = self.workflow_events[len(self.workflow_events[:-1])-1] expected_subscribers = [ss['subscribers'] for ss in self.test_config['subscriptions'] \ if ss['event'] == expected_last_received_event][0] expected = { "events_defined": expected_workflow_state, "events_received": mocked_logged_events, "tracking_summary": { "last_received_event": expected_last_received_event, "subscribers": expected_subscribers, "execution_path": self.engine \ ._generate_execution_path(expected_workflow_state) } } actual = self.engine.track(self.workflow_id, self.execution_id) nt.assert_equals(expected, actual)