Python six 模块,assertCountEqual() 实例源码

我们从Python开源项目中,提取了以下44个代码示例,用于说明如何使用six.assertCountEqual()

项目:dbt    作者:fishtown-analytics    | 项目源码 | 文件源码
def test__two_models_simple_ref(self):
        self.use_models({
            'model_one': 'select * from events',
            'model_two': "select * from {{ref('model_one')}}",
        })

        compiler = self.get_compiler(self.get_project())
        graph, linker = compiler.compile()

        six.assertCountEqual(self,
                             linker.nodes(),
                             [
                                 'model.test_models_compile.model_one',
                                 'model.test_models_compile.model_two',
                             ])

        six.assertCountEqual(
            self,
            linker.edges(),
            [ ('model.test_models_compile.model_one','model.test_models_compile.model_two',) ])
项目:pyfilesystem2    作者:PyFilesystem    | 项目源码 | 文件源码
def test_subdir_uses_same_walker(self):
        class CustomWalker(walk.Walker):

            @classmethod
            def bind(cls, fs):
                return walk.BoundWalker(fs, walker_class=CustomWalker)

        class CustomizedMemoryFS(MemoryFS):
            walker_class=CustomWalker

        base_fs=CustomizedMemoryFS()
        base_fs.settext("a", "a")
        base_fs.makedirs("b")
        base_fs.settext("b/c", "c")
        base_fs.settext("b/d", "d")
        base_walker=base_fs.walk
        self.assertEqual(base_walker.walker_class, CustomWalker)
        six.assertCountEqual(self, ["/a", "/b/c", "/b/d"], base_walker.files())

        sub_fs=base_fs.opendir("b")
        sub_walker=sub_fs.walk
        self.assertEqual(sub_walker.walker_class, CustomWalker)
        six.assertCountEqual(self, ["/c", "/d"], sub_walker.files())
项目:paleo    作者:TalwalkarLab    | 项目源码 | 文件源码
def test_dependency(self):
        two_towers = """{
            "name" : "test",
            "layers" : {
                "data": {
                    "parents": []
                },
                "conv1": {
                    "parents": ["data"]
                },
                "conv2": {
                    "parents": ["data"]
                },
                "output": {
                    "parents" : ["conv1", "conv2"]
                }
            }
        }
        """
        self.graph.load_from_string(two_towers)
        nested_list = self._to_strings(self.graph.nested_list)
        self.assertEqual(nested_list[0], 'data')
        six.assertCountEqual(self, nested_list[1], (['conv1'], ['conv2']))
        self.assertEqual(nested_list[2], 'output')
项目:paleo    作者:TalwalkarLab    | 项目源码 | 文件源码
def test_dependency2(self):
        two_towers = """{
            "name" : "test",
            "layers" : {
                "data": {
                    "parents": []
                },
                "conv1": {
                    "parents": ["data"]
                },
                "output": {
                    "parents" : ["data", "conv1"]
                }
            }
        }
        """
        self.graph.load_from_string(two_towers)
        nested_list = self._to_strings(self.graph.nested_list)
        self.assertEqual(nested_list[0], 'data')
        six.assertCountEqual(self, nested_list[1], (['conv1'], ))
        self.assertEqual(nested_list[2], 'output')
项目:ssbio    作者:SBRG    | 项目源码 | 文件源码
def test_get_pdbs_for_gene(self):
        model = 'e_coli_core'
        gene = 'b0118'

        expected = [('1l5j', 'A'), ('1l5j', 'B')]

        six.assertCountEqual(self, expected, ssbio.databases.bigg.get_pdbs_for_gene(model, gene))

        model = 'e_coli_core'
        gene = 'b0351'

        expected = []

        six.assertCountEqual(self, expected, ssbio.databases.bigg.get_pdbs_for_gene(model, gene))
项目:fold    作者:tensorflow    | 项目源码 | 文件源码
def test_finalize_stats_summaries(self):
    p = plan.Plan(None)
    p.save_summaries_secs = 42
    p.losses['foo'] = tf.constant([1.0])
    p.losses['bar'] = tf.constant([2.0, 3.0])
    p.metrics['baz'] = tf.constant(4)
    p.metrics['qux'] = tf.constant([5.0, 6.0])
    p.finalize_stats()
    with self.test_session():
      self.assertEqual(6, p.loss_total.eval({p.batch_size_placeholder: 1}))
      summary = tf.Summary()
      summary.ParseFromString(p.summaries.eval({p.batch_size_placeholder: 1}))
      qux_string = tf.summary.histogram('qux', [5, 6]).eval()
      qux_proto = tf.Summary()
      qux_proto.ParseFromString(qux_string)
      qux_histogram = qux_proto.value[0].histo
      expected_values = [
          tf.Summary.Value(tag='foo', simple_value=1),
          tf.Summary.Value(tag='bar', simple_value=5),
          tf.Summary.Value(tag='loss_total', simple_value=6),
          tf.Summary.Value(tag='baz', simple_value=4),
          tf.Summary.Value(tag='qux', histo=qux_histogram)]
      six.assertCountEqual(self, expected_values, summary.value)
      summary.ParseFromString(p.summaries.eval({p.batch_size_placeholder: 2}))
      expected_values = [
          tf.Summary.Value(tag='foo', simple_value=0.5),
          tf.Summary.Value(tag='bar', simple_value=2.5),
          tf.Summary.Value(tag='loss_total', simple_value=3),
          tf.Summary.Value(tag='baz', simple_value=4),
          tf.Summary.Value(tag='qux', histo=qux_histogram)]
      six.assertCountEqual(self, expected_values, summary.value)
项目:libmozdata    作者:mozilla    | 项目源码 | 文件源码
def test_filestats(self):
        path = 'netwerk/protocol/http/nsHttpConnectionMgr.cpp'
        info = FileStats(path).get_info()
        self.assertIsNotNone(info)
        self.assertEqual(info['path'], 'netwerk/protocol/http/nsHttpConnectionMgr.cpp')
        self.assertEqual(info['module'], 'Necko')
        six.assertCountEqual(self, info['components'], ['Core::Networking', 'Core::Networking: Cache', 'Core::Networking: Cookies', 'Core::Networking: FTP', 'Core::Networking: File', 'Core::Networking: HTTP', 'Core::Networking: JAR', 'Core::Networking: Websockets'])
        self.assertGreater(len(info['owners']), 0)
        self.assertGreater(len(info['peers']), 0)
项目:pyqubes    作者:tommilligan    | 项目源码 | 文件源码
def test_compile_flags_boolean_simple(self):
        flags = {
            "--foo": True,
            "--bar": False
        }
        compiled_args = pyqubes.compile.flags_boolean(flags)
        six.assertCountEqual(self, compiled_args, ["--foo"])
项目:pyqubes    作者:tommilligan    | 项目源码 | 文件源码
def test_compile_flags_boolean_complex(self):
        flags = {
            "--foo": 1 != 1,
            "--bar": "spam",
            "--eggs": None,
        }
        compiled_args = pyqubes.compile.flags_boolean(flags)
        six.assertCountEqual(self, compiled_args, ["--bar"])
项目:pyqubes    作者:tommilligan    | 项目源码 | 文件源码
def test_compile_flags_boolean_empty(self):
        flags = {}
        compiled_args = pyqubes.compile.flags_boolean(flags)
        six.assertCountEqual(self, compiled_args, [])
项目:pyqubes    作者:tommilligan    | 项目源码 | 文件源码
def test_compile_flags_store_empty(self):
        flags = {}
        compiled_args = pyqubes.compile.flags_store(flags)
        six.assertCountEqual(self, compiled_args, [])
项目:pyqubes    作者:tommilligan    | 项目源码 | 文件源码
def test_compile_flags_store_iterable_single_empty(self):
        flags = {
            "--foo": [],
        }
        compiled_args = pyqubes.compile.flags_store_iterable(flags)
        six.assertCountEqual(self, compiled_args, [])
项目:pyqubes    作者:tommilligan    | 项目源码 | 文件源码
def test_constants_fedora_all(self):
        six.assertCountEqual(self, c.FEDORA_ALL, [c.FEDORA, c.FEDORA_23])
项目:pyqubes    作者:tommilligan    | 项目源码 | 文件源码
def test_constants_debian_all(self):
        six.assertCountEqual(self, c.DEBIAN_ALL, [c.DEBIAN, c.DEBIAN_8])
项目:pyqubes    作者:tommilligan    | 项目源码 | 文件源码
def test_vm_template_vm_create_app(self):
        returned_vm = self.template_vm.create_app('app.thing')
        six.assertCountEqual(self, self.enact_patch.call_args[0][0], ['qvm-create', 'app.thing', '--template', 'one.thing', '--label', 'red'])
        self.assertIsInstance(returned_vm, pyqubes.vm.AppVM)
        self.assertEqual(self.template_vm.proactive, returned_vm.proactive)
        self.assertEqual(self.template_vm.operating_system, returned_vm.operating_system)
项目:pyqubes    作者:tommilligan    | 项目源码 | 文件源码
def test_vm_template_vm_create_app_flags(self):
        returned_vm = self.template_vm.create_app('app.thing', label='green', standalone=True)
        six.assertCountEqual(self, self.enact_patch.call_args[0][0], ['qvm-create', 'app.thing', '--standalone', '--template', 'one.thing', '--label', 'green'])
        self.assertIsInstance(returned_vm, pyqubes.vm.AppVM)
        self.assertEqual(self.template_vm.proactive, returned_vm.proactive)
        self.assertEqual(self.template_vm.operating_system, returned_vm.operating_system)
项目:pyqubes    作者:tommilligan    | 项目源码 | 文件源码
def test_qvm_create_explicit(self):
        command_args = pyqubes.qvm.qvm_create("pear", template='fruit', label='green')
        six.assertCountEqual(self, command_args, ["qvm-create", "pear", "--template", "fruit", "--label", "green"])
项目:TCP-IP    作者:JackZ0    | 项目源码 | 文件源码
def test_del_txt_record(self):
        first_record_mock = mock.MagicMock()
        first_record_mock.type = 'TXT'
        first_record_mock.name = "DIFFERENT"
        first_record_mock.data = self.record_content

        correct_record_mock = mock.MagicMock()
        correct_record_mock.type = 'TXT'
        correct_record_mock.name = self.record_prefix
        correct_record_mock.data = self.record_content

        last_record_mock = mock.MagicMock()
        last_record_mock.type = 'TXT'
        last_record_mock.name = self.record_prefix
        last_record_mock.data = "DIFFERENT"

        domain_mock = mock.MagicMock()
        domain_mock.name = DOMAIN
        domain_mock.get_records.return_value = [first_record_mock,
                                                correct_record_mock,
                                                last_record_mock]

        self.manager.get_all_domains.return_value = [domain_mock]

        self.digitalocean_client.del_txt_record(DOMAIN, self.record_name, self.record_content)

        correct_record_mock.destroy.assert_called()

        six.assertCountEqual(self, first_record_mock.destroy.call_args_list, [])
        six.assertCountEqual(self, last_record_mock.destroy.call_args_list, [])
项目:murano-pkg-check    作者:openstack    | 项目源码 | 文件源码
def test_while_unknown_does(self):
        MULTILINE_BODY = [
            {'While': '$.deploy()',
             'Does': ['$.a()', '$.b()']}
        ]
        self.g = self._checker.codeblock(MULTILINE_BODY)
        p1 = next(self.g)
        p2 = next(self.g)
        six.assertCountEqual(self, [
            'Unknown keyword "Does" in "While"',
            'Missing keyword "Do" for "While" code structure'],
            [p1.message, p2.message])
项目:dbt    作者:fishtown-analytics    | 项目源码 | 文件源码
def test__model_enabled(self):
        self.use_models({
            'model_one': 'select * from events',
            'model_two': "select * from {{ref('model_one')}}",
        })

        cfg = {
            "models": {
                "materialized": "table",
                "test_models_compile": {
                    "model_one": {"enabled": True},
                    "model_two": {"enabled": False},
                }
            }
        }

        compiler = self.get_compiler(self.get_project(cfg))
        graph, linker = compiler.compile()

        six.assertCountEqual(
            self, linker.nodes(),
            ['model.test_models_compile.model_one',
             'model.test_models_compile.model_two'])

        six.assertCountEqual(
            self, linker.edges(),
            [('model.test_models_compile.model_one',
              'model.test_models_compile.model_two',)])
项目:pyfilesystem2    作者:PyFilesystem    | 项目源码 | 文件源码
def test_opendir(self):
        # Make a simple directory structure
        self.fs.makedir('foo')
        self.fs.setbytes('foo/bar', b'barbar')
        self.fs.setbytes('foo/egg', b'eggegg')

        # Open a sub directory
        with self.fs.opendir('foo') as foo_fs:
            repr(foo_fs)
            text_type(foo_fs)
            six.assertCountEqual(self, foo_fs.listdir('/'), ['bar', 'egg'])
            self.assertTrue(foo_fs.isfile('bar'))
            self.assertTrue(foo_fs.isfile('egg'))
            self.assertEqual(foo_fs.getbytes('bar'), b'barbar')
            self.assertEqual(foo_fs.getbytes('egg'), b'eggegg')

        self.assertFalse(self.fs.isclosed())

        # Attempt to open a non-existent directory
        with self.assertRaises(errors.ResourceNotFound):
            self.fs.opendir('egg')

        # Check error when doing opendir on a non dir
        with self.assertRaises(errors.DirectoryExpected):
            self.fs.opendir('foo/egg')

        # These should work, and will essentially return a 'clone' of sorts
        self.fs.opendir('')
        self.fs.opendir('/')

        # Check ClosingSubFS closes 'parent'
        with self.fs.opendir('foo', factory=ClosingSubFS) as foo_fs:
            six.assertCountEqual(self, foo_fs.listdir('/'), ['bar', 'egg'])
            self.assertTrue(foo_fs.isfile('bar'))
            self.assertTrue(foo_fs.isfile('egg'))
            self.assertEqual(foo_fs.getbytes('bar'), b'barbar')
            self.assertEqual(foo_fs.getbytes('egg'), b'eggegg')

        self.assertTrue(self.fs.isclosed())
项目:deb-python-pykmip    作者:openstack    | 项目源码 | 文件源码
def test_get_attribute_list(self):
        """
        Test that the attribute names of a managed object can be retrieved
        with proper input.
        """
        uid = 'b4faee10-aa2a-4446-8ad4-0881f3422959'
        attribute_names = [
            'Cryptographic Length',
            'Cryptographic Algorithm',
            'State',
            'Digest',
            'Lease Time',
            'Initial Date',
            'Unique Identifier',
            'Name',
            'Cryptographic Usage Mask',
            'Object Type',
            'Contact Information',
            'Last Change Date']
        result = results.GetAttributeListResult(
            contents.ResultStatus(enums.ResultStatus.SUCCESS),
            uid=uid,
            names=attribute_names)

        with ProxyKmipClient() as client:
            client.proxy.get_attribute_list.return_value = result

            result = client.get_attribute_list(uid)
            client.proxy.get_attribute_list.assert_called_with(uid)
            self.assertIsInstance(result, list)
            six.assertCountEqual(self, attribute_names, result)
项目:tus-py-client    作者:tus    | 项目源码 | 文件源码
def test_headers_as_list(self):
        six.assertCountEqual(self, self.uploader.headers_as_list,
                             ["Tus-Resumable: 1.0.0"])

        self.client.set_headers({'foo': 'bar'})
        six.assertCountEqual(self, self.uploader.headers_as_list,
                             ['Tus-Resumable: 1.0.0', 'foo: bar'])
项目:tus-py-client    作者:tus    | 项目源码 | 文件源码
def test_encode_metadata(self):
        self.uploader.metadata = {'foo': 'bar', 'red': 'blue'}
        encoded_metadata = ['foo' + ' ' + b64encode(b'bar').decode('ascii'),
                            'red' + ' ' + b64encode(b'blue').decode('ascii')]
        six.assertCountEqual(self, self.uploader.encode_metadata(), encoded_metadata)

        with pytest.raises(ValueError):
            self.uploader.metadata = {'foo, ': 'bar'}
            self.uploader.encode_metadata()
项目:deb-python-kmip    作者:openstack    | 项目源码 | 文件源码
def test_get_attribute_list(self):
        """
        Test that the attribute names of a managed object can be retrieved
        with proper input.
        """
        uid = 'b4faee10-aa2a-4446-8ad4-0881f3422959'
        attribute_names = [
            'Cryptographic Length',
            'Cryptographic Algorithm',
            'State',
            'Digest',
            'Lease Time',
            'Initial Date',
            'Unique Identifier',
            'Name',
            'Cryptographic Usage Mask',
            'Object Type',
            'Contact Information',
            'Last Change Date']
        result = results.GetAttributeListResult(
            contents.ResultStatus(enums.ResultStatus.SUCCESS),
            uid=uid,
            names=attribute_names)

        with ProxyKmipClient() as client:
            client.proxy.get_attribute_list.return_value = result

            result = client.get_attribute_list(uid)
            client.proxy.get_attribute_list.assert_called_with(uid)
            self.assertIsInstance(result, list)
            six.assertCountEqual(self, attribute_names, result)
项目:django-ios-notifications    作者:nnsnodnb    | 项目源码 | 文件源码
def test_no_device_token(self):
        device_tokens = DeviceToken.objects.all()
        six.assertCountEqual(self, device_tokens, [])
项目:uberlogs    作者:odedlaz    | 项目源码 | 文件源码
def test_no_resize_when_max_items_not_reached(self):
        cd = LRUCache(max_items=2)
        cd["a"] = "test-a"
        self.assertEqual(len(cd), 1)

        cd["b"] = "test-b"
        self.assertEqual(len(cd), 2)
        six.assertCountEqual(self, cd.keys(), ["a", "b"])

        cd["c"] = "test-c"
        self.assertEqual(len(cd), 2)
        self.assertIn("c", cd)
项目:obsoleted-vpduserv    作者:InfraSIM    | 项目源码 | 文件源码
def test_assertCountEqual():
    class TestAssertCountEqual(unittest.TestCase):
        def test(self):
            with self.assertRaises(AssertionError):
                six.assertCountEqual(self, (1, 2), [3, 4, 5])

            six.assertCountEqual(self, (1, 2), [2, 1])

    TestAssertCountEqual('test').test()
项目:component-builder    作者:ployst    | 项目源码 | 文件源码
def test_replace_labels(self):
        to_add, to_del = github.replace_labels(
            self.component_titles, self.current_labels)

        six.assertCountEqual(self, to_add, ['component:library'])
        six.assertCountEqual(self, to_del, ['component:tool'])
项目:mogan    作者:openstack    | 项目源码 | 文件源码
def test_get_flavor_list(self):
        uuids = [self.flavor['uuid']]
        for i in range(1, 6):
            flavor = utils.create_test_flavor(
                uuid=uuidutils.generate_uuid(),
                name=six.text_type(i))
            uuids.append(six.text_type(flavor['uuid']))
        res = self.dbapi.flavor_get_all(self.context)
        res_uuids = [r['uuid'] for r in res]
        six.assertCountEqual(self, uuids, res_uuids)
项目:mogan    作者:openstack    | 项目源码 | 文件源码
def test_quota_get_all(self):
        ids_project_1 = []
        ids_project_2 = []
        ids_project_all = []
        resource_names = ['servers', 'servers_type', 'test_resource']
        for i in range(0, 3):
            quota = utils.create_test_quota(project_id='project_1',
                                            resource_name=resource_names[i])
            ids_project_1.append(quota['id'])
        for i in range(3, 5):
            resource_name = resource_names[i - 3]
            quota = utils.create_test_quota(project_id='project_2',
                                            resource_name=resource_name)
            ids_project_2.append(quota['id'])
        ids_project_all.extend(ids_project_1)
        ids_project_all.extend(ids_project_2)

        # Set project_only to False
        # get all quotas from all projects
        res = self.dbapi.quota_get_all(self.context, project_only=False)
        res_ids = [r.id for r in res]
        six.assertCountEqual(self, ids_project_all, res_ids)

        # Set project_only to True
        # get quotas from current project (project_1)
        self.context.tenant = 'project_1'
        res = self.dbapi.quota_get_all(self.context, project_only=True)
        res_ids = [r.id for r in res]
        six.assertCountEqual(self, ids_project_1, res_ids)

        # Set project_only to True
        # get quotas from current project (project_2)
        self.context.tenant = 'project_2'
        res = self.dbapi.quota_get_all(self.context, project_only=True)
        res_ids = [r.id for r in res]
        six.assertCountEqual(self, ids_project_2, res_ids)
项目:mogan    作者:openstack    | 项目源码 | 文件源码
def test_get_aggregate_list(self):
        uuids = [self.aggregate['uuid']]
        for i in range(1, 6):
            aggregate = utils.create_test_aggregate(
                uuid=uuidutils.generate_uuid(),
                name=six.text_type(i))
            uuids.append(six.text_type(aggregate['uuid']))
        res = self.dbapi.aggregate_get_all(self.context)
        res_uuids = [r['uuid'] for r in res]
        six.assertCountEqual(self, uuids, res_uuids)
项目:intern    作者:jhuapl-boss    | 项目源码 | 文件源码
def test_add_role(self):
        self.rmt.add_user_role(self.user, self.rsrc_mgr)

        expected = [self.rsrc_mgr]
        actual = self.rmt.get_user_roles(self.user)

        six.assertCountEqual(self, expected, actual)
项目:intern    作者:jhuapl-boss    | 项目源码 | 文件源码
def test_collection(self):
        actual_list = self.rmt.list_metadata(self.coll)
        self.assertEqual([], actual_list)

        keys_vals = {'red': 'green', 'two': 'four', 'inside': 'out'}
        self.rmt.create_metadata(self.coll, keys_vals)

        actual = self.rmt.get_metadata(self.coll, list(keys_vals.keys()))
        six.assertCountEqual(self,keys_vals, actual)

        with self.assertRaises(HTTPErrorList):
            # Should fail when trying create keys that already exist.
            self.rmt.create_metadata(self.coll, keys_vals)

        update = {'two': 'six', 'inside': 'upside-down'}
        self.rmt.update_metadata(self.coll, update)

        actual_upd = self.rmt.get_metadata(self.coll, list(update.keys()))
        six.assertCountEqual(self, update, actual_upd)

        actual_list_upd = self.rmt.list_metadata(self.coll)
        six.assertCountEqual(self, list(keys_vals.keys()), actual_list_upd)

        with self.assertRaises(HTTPErrorList):
            # Try updating a non-existent key.
            self.rmt.update_metadata(self.coll, {'foo': 'bar'})

        self.rmt.delete_metadata(self.coll, list(keys_vals.keys()))

        with self.assertRaises(HTTPErrorList):
            # Try getting keys that don't exist.
            self.rmt.get_metadata(self.coll, ['foo', 'bar'])

        actual_list_end = self.rmt.list_metadata(self.coll)
        self.assertEqual([], actual_list_end)
项目:intern    作者:jhuapl-boss    | 项目源码 | 文件源码
def test_experiment(self):
        actual_list = self.rmt.list_metadata(self.exp)
        self.assertEqual([], actual_list)

        keys_vals = {'red': 'green', 'two': 'four', 'inside': 'out'}
        self.rmt.create_metadata(self.exp, keys_vals)
        actual = self.rmt.get_metadata(self.exp, list(keys_vals.keys()))
        six.assertCountEqual(self, keys_vals, actual)

        with self.assertRaises(HTTPErrorList):
            # Should fail when trying create keys that already exist.
            self.rmt.create_metadata(self.exp, keys_vals)

        update = { 'two': 'six', 'inside': 'upside-down' }
        self.rmt.update_metadata(self.exp, update)

        actual_upd = self.rmt.get_metadata(self.exp, list(update.keys()))
        six.assertCountEqual(self, update, actual_upd)

        actual_list_upd = self.rmt.list_metadata(self.exp)
        six.assertCountEqual(self, list(keys_vals.keys()), actual_list_upd)

        with self.assertRaises(HTTPErrorList):
            # Try updating a non-existent key.
            self.rmt.update_metadata(self.exp, {'foo': 'bar'})

        self.rmt.delete_metadata(self.exp, list(keys_vals.keys()))

        with self.assertRaises(HTTPErrorList):
            # Try getting keys that don't exist.
            self.rmt.get_metadata(self.exp, ['foo', 'bar'])

        actual_list_end = self.rmt.list_metadata(self.exp)
        self.assertEqual([], actual_list_end)
项目:intern    作者:jhuapl-boss    | 项目源码 | 文件源码
def test_channel(self):
        actual_list = self.rmt.list_metadata(self.chan)
        self.assertEqual([], actual_list)

        keys_vals = { 'red': 'green', 'two': 'four', 'inside': 'out'}
        self.rmt.create_metadata(self.chan, keys_vals)
        actual = self.rmt.get_metadata(self.chan, list(keys_vals.keys()))
        six.assertCountEqual(self, keys_vals, actual)

        with self.assertRaises(HTTPErrorList):
            # Should fail when trying create keys that already exist.
            self.rmt.create_metadata(self.chan, keys_vals)

        update = { 'two': 'six', 'inside': 'upside-down' }
        self.rmt.update_metadata(self.chan, update)

        actual_upd = self.rmt.get_metadata(self.chan, list(update.keys()))
        six.assertCountEqual(self,update, actual_upd)

        actual_list_upd = self.rmt.list_metadata(self.chan)
        six.assertCountEqual(self,keys_vals, actual_list_upd)

        with self.assertRaises(HTTPErrorList):
            # Try updating a non-existent key.
            self.rmt.update_metadata(self.chan, {'foo': 'bar'})

        self.rmt.delete_metadata(self.chan, list(keys_vals.keys()))

        with self.assertRaises(HTTPErrorList):
            # Try getting keys that don't exist.
            self.rmt.get_metadata(self.chan, ['foo', 'bar'])

        actual_list_end = self.rmt.list_metadata(self.chan)
        self.assertEqual([], actual_list_end)
项目:intern    作者:jhuapl-boss    | 项目源码 | 文件源码
def test_get_success(self, mock_session, mock_resp):
        expected = ['default']
        mock_resp.status_code = 200
        mock_resp.json.return_value = expected
        mock_session.prepare_request.return_value = PreparedRequest()
        mock_session.send.return_value = mock_resp

        url_prefix = 'https://api.theboss.io'
        auth = 'mytoken'
        send_opts = {}

        actual = self.prj.get_user(
            'johndoe', url_prefix, auth, mock_session, send_opts)
        six.assertCountEqual(self, expected, actual)
项目:Deploy_XXNET_Server    作者:jzp820927    | 项目源码 | 文件源码
def test_assertCountEqual():
    class TestAssertCountEqual(unittest.TestCase):
        def test(self):
            with self.assertRaises(AssertionError):
                six.assertCountEqual(self, (1, 2), [3, 4, 5])

            six.assertCountEqual(self, (1, 2), [2, 1])

    TestAssertCountEqual('test').test()
项目:resolwe-bio-py    作者:genialis    | 项目源码 | 文件源码
def test_iterate_fields(self):
        result = list(iterate_fields(OUTPUT, PROCESS_OUTPUT_SCHEMA))
        # result object is iterator - we use lists to pull all elements

        expected = [
            ({
                'type': 'basic:string:',
                'name': 'id',
                'label': 'ID'
            }, {
                'k': 123,
                'id': 'abc'
            }), ({
                'type': 'basic:string:',
                'name': 'bases',
                'label': 'Number of bases'
            }, {
                'options': {
                    'k': 123,
                    'id': 'abc'
                },
                'bases': '75',
                'fastq': {
                    'file': 'example.fastq.gz'
                }
            }), ({
                'type': 'basic:file:',
                'name': 'fastq',
                'label': 'Reads file'
            }, {
                'options': {
                    'k': 123,
                    'id': 'abc'
                },
                'bases': '75',
                'fastq': {
                    'file': 'example.fastq.gz'
                }
            }), ({
                'type': 'basic:integer:',
                'name': 'k',
                'label': 'k-mer size'
            }, {
                'k': 123,
                'id': 'abc'
            })
        ]

        six.assertCountEqual(self, result, expected)
项目:resolwe-bio-py    作者:genialis    | 项目源码 | 文件源码
def test_files(self):
        data = Data(id=123, resolwe=MagicMock())
        data._get_dir_files = MagicMock(
            side_effect=[['first_dir/file1.txt'], ['fastq_dir/file2.txt']])

        data.annotation = {
            'output.list': {'value': [{'file': "element.gz"}], 'type': 'list:basic:file:'},
            'output.dir_list': {'value': [{'dir': "first_dir"}], 'type': 'list:basic:dir:'},
            'output.fastq': {'value': {'file': "file.fastq.gz"}, 'type': 'basic:file:fastq'},
            'output.fastq_archive': {'value': {'file': "archive.gz"}, 'type': 'basic:file:'},
            'output.fastq_dir': {'value': {'dir': "fastq_dir"}, 'type': 'basic:dir:'},
            'input.fastq_url': {'value': {'file': "blah"}, 'type': 'basic:url:'},
            'input.blah': {'value': "blah.gz", 'type': 'basic:file:'}
        }

        file_list = data.files()
        six.assertCountEqual(self, file_list, [
            'element.gz',
            'archive.gz',
            'file.fastq.gz',
            'first_dir/file1.txt',
            'fastq_dir/file2.txt'
        ])
        file_list = data.files(file_name='element.gz')
        self.assertEqual(file_list, ['element.gz'])
        file_list = data.files(field_name='output.fastq')
        self.assertEqual(file_list, ['file.fastq.gz'])

        data.annotation = {
            'output.list': {'value': [{'no_file_field_here': "element.gz"}],
                            'type': 'list:basic:file:'},
        }
        with six.assertRaisesRegex(self, KeyError, "does not contain 'file' key."):
            data.files()

        data = Data(resolwe=MagicMock(), id=None)
        with six.assertRaisesRegex(self, ValueError, "must be saved before"):
            data.files()
项目:paleo    作者:TalwalkarLab    | 项目源码 | 文件源码
def test_block(self):
        two_towers = """{
            "name" : "test",
            "layers" : {
                "data": {
                    "parents": []
                },
                "conv1": {
                    "parents": ["data"]
                },
                "conv2": {
                    "type": "Block",
                    "parents": ["conv1"],
                    "endpoint": "concat",
                    "layers": {
                        "conv2a": {
                            "parents": []
                        },
                        "conv2b" : {
                            "parents": []
                        },
                        "concat": {
                            "parents": ["conv2a", "conv2b"]
                        }
                    }
                },
                "output": {
                    "parents" : ["conv2"]
                }
            }
        }
        """
        self.graph.load_from_string(two_towers)
        nested_list = self._to_strings(self.graph.nested_list)
        self.assertEqual(nested_list[0], 'data')
        self.assertEqual(nested_list[1], 'conv1')
        six.assertCountEqual(self, nested_list[2], (['conv2/conv2a'],
                                                    ['conv2/conv2b']))
        self.assertEqual(nested_list[3], 'conv2/concat')
        self.assertEqual(nested_list[4], 'output')
项目:six    作者:benjaminp    | 项目源码 | 文件源码
def test_assertCountEqual():
    class TestAssertCountEqual(unittest.TestCase):
        def test(self):
            with self.assertRaises(AssertionError):
                six.assertCountEqual(self, (1, 2), [3, 4, 5])

            six.assertCountEqual(self, (1, 2), [2, 1])

    TestAssertCountEqual('test').test()
项目:incubator-airflow-old    作者:apache    | 项目源码 | 文件源码
def test_execute_task_instances(self):
        dag_id = 'SchedulerJobTest.test_execute_task_instances'
        task_id_1 = 'dummy_task'
        task_id_2 = 'dummy_task_nonexistent_queue'
        # important that len(tasks) is less than concurrency
        # because before scheduler._execute_task_instances would only
        # check the num tasks once so if concurrency was 3,
        # we could execute arbitrarily many tasks in the second run
        dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=3)
        task1 = DummyOperator(dag=dag, task_id=task_id_1)
        task2 = DummyOperator(dag=dag, task_id=task_id_2)
        dagbag = self._make_simple_dag_bag([dag])

        scheduler = SchedulerJob(**self.default_scheduler_args)
        session = settings.Session()

        # create first dag run with 1 running and 1 queued
        dr1 = scheduler.create_dag_run(dag)
        ti1 = TI(task1, dr1.execution_date)
        ti2 = TI(task2, dr1.execution_date)
        ti1.refresh_from_db()
        ti2.refresh_from_db()
        ti1.state = State.RUNNING
        ti2.state = State.RUNNING
        session.merge(ti1)
        session.merge(ti2)
        session.commit()

        self.assertEqual(State.RUNNING, dr1.state)
        self.assertEqual(2, DAG.get_num_task_instances(dag_id, dag.task_ids,
            states=[State.RUNNING], session=session))

        # create second dag run
        dr2 = scheduler.create_dag_run(dag)
        ti3 = TI(task1, dr2.execution_date)
        ti4 = TI(task2, dr2.execution_date)
        ti3.refresh_from_db()
        ti4.refresh_from_db()
        # manually set to scheduled so we can pick them up
        ti3.state = State.SCHEDULED
        ti4.state = State.SCHEDULED
        session.merge(ti3)
        session.merge(ti4)
        session.commit()

        self.assertEqual(State.RUNNING, dr2.state)

        res = scheduler._execute_task_instances(dagbag, [State.SCHEDULED])

        # check that concurrency is respected
        ti1.refresh_from_db()
        ti2.refresh_from_db()
        ti3.refresh_from_db()
        ti4.refresh_from_db()
        self.assertEqual(3, DAG.get_num_task_instances(dag_id, dag.task_ids,
            states=[State.RUNNING, State.QUEUED], session=session))
        self.assertEqual(State.RUNNING, ti1.state)
        self.assertEqual(State.RUNNING, ti2.state)
        six.assertCountEqual(self, [State.QUEUED, State.SCHEDULED], [ti3.state, ti4.state])
        self.assertEqual(1, res)
项目:pyfilesystem2    作者:PyFilesystem    | 项目源码 | 文件源码
def test_listdir(self):
        # Check listing directory that doesn't exist
        with self.assertRaises(errors.ResourceNotFound):
            self.fs.listdir('foobar')

        # Check aliases for root
        self.assertEqual(self.fs.listdir('/'), [])
        self.assertEqual(self.fs.listdir('.'), [])
        self.assertEqual(self.fs.listdir('./'), [])

        # Make a few files
        self.fs.setbytes('foo', b'egg')
        self.fs.setbytes('bar', b'egg')
        self.fs.setbytes('baz', b'egg')

        # Check list works
        six.assertCountEqual(self,
                             self.fs.listdir('/'),
                             ['foo', 'bar', 'baz'])
        six.assertCountEqual(self,
                             self.fs.listdir('.'),
                             ['foo', 'bar', 'baz'])
        six.assertCountEqual(self,
                             self.fs.listdir('./'),
                             ['foo', 'bar', 'baz'])

        # Check paths are unicode strings
        for name in self.fs.listdir('/'):
            self.assertIsInstance(name, text_type)

        # Create a subdirectory
        self.fs.makedir('dir')

        # Should start empty
        self.assertEqual(self.fs.listdir('/dir'), [])

        # Write some files
        self.fs.setbytes('dir/foofoo', b'egg')
        self.fs.setbytes('dir/barbar', b'egg')

        # Check listing subdirectory
        six.assertCountEqual(self,
                             self.fs.listdir('dir'),
                             ['foofoo', 'barbar'])
        # Make sure they are unicode stringd
        for name in self.fs.listdir('dir'):
            self.assertIsInstance(name, text_type)

        self.fs.create('notadir')
        with self.assertRaises(errors.DirectoryExpected):
            self.fs.listdir('notadir')