Python sqlalchemy.orm.exc 模块,FlushError() 实例源码

我们从Python开源项目中,提取了以下7个代码示例,用于说明如何使用sqlalchemy.orm.exc.FlushError()

项目:lol-data-aggregator    作者:StickmanVentures    | 项目源码 | 文件源码
def get_summoners(self, match):
        self.delay()

        participants = self.watcher.get_match(match)['participantIdentities']
        for participant in participants:
            summoner = Summoner(participant['player']['summonerId'])
            self.session.add(summoner)

            try:
                self.session.commit()
                print('New Summoner - %s' % summoner)
            except IntegrityError:
                self.session.rollback()
                print('Duplicate Summoner - %s' % summoner)
            except FlushError:
                self.session.rollback()
                print('FlushError Summoner - %s' % summoner)
项目:rucio    作者:rucio01    | 项目源码 | 文件源码
def __bulk_add_new_file_dids(files, account, dataset_meta=None, session=None):
    """
    Bulk add new dids.

    :param dids: the list of new files.
    :param account: The account owner.
    :param session: The database session in use.
    :returns: True is successful.
    """
    for file in files:
        new_did = models.DataIdentifier(scope=file['scope'], name=file['name'],
                                        account=file.get('account') or account,
                                        did_type=DIDType.FILE, bytes=file['bytes'],
                                        md5=file.get('md5'), adler32=file.get('adler32'),
                                        is_new=None)
        for key in file.get('meta', []):
            new_did.update({key: file['meta'][key]})
        for key in dataset_meta or {}:
            new_did.update({key: dataset_meta[key]})

        new_did.save(session=session, flush=False)
    try:
        session.flush()
    except IntegrityError, error:
        raise exception.RucioException(error.args)
    except DatabaseError, error:
        raise exception.RucioException(error.args)
    except FlushError, error:
        if match('New instance .* with identity key .* conflicts with persistent instance', error.args[0]):
            raise exception.DataIdentifierAlreadyExists('Data Identifier already exists!')
        raise exception.RucioException(error.args)
    return True
项目:ranger    作者:openstack    | 项目源码 | 文件源码
def test_add_regions_errors(self, mock_gfbu, mock_strin):
        ret_flavor = MagicMock()
        ret_flavor.flavor.regions = [Region(name='test_region')]
        mock_gfbu.return_value = ret_flavor
        global error

        error = 1
        injector.override_injected_dependency(('data_manager', get_datamanager_mock))
        self.assertRaises(flavor_logic.ErrorStatus, flavor_logic.add_regions,
                          'uuid', RegionWrapper([Region(name='test_region')]),
                          'transaction')

        error = 4
        injector.override_injected_dependency(
            ('data_manager', get_datamanager_mock))

        mock_strin.side_effect = exc.FlushError()
        self.assertRaises(exc.FlushError, flavor_logic.add_regions,
                          'uuid', RegionWrapper([Region(name='test_region')]),
                          'transaction')
        mock_strin.side_effect = exc.FlushError(
            'conflicts with persistent instance')
        self.assertRaises(flavor_logic.ErrorStatus, flavor_logic.add_regions,
                          'uuid', RegionWrapper([Region(name='test_region')]),
                          'transaction')
        mock_strin.side_effect = ValueError()
        self.assertRaises(ValueError, flavor_logic.add_regions,
                          'uuid', RegionWrapper([Region(name='test_region')]),
                          'transaction')
        mock_strin.side_effect = ValueError(
            'conflicts with persistent instance')
        self.assertRaises(flavor_logic.ConflictError, flavor_logic.add_regions,
                          'uuid', RegionWrapper([Region(name='test_region')]),
                          'transaction')

        self.assertRaises(flavor_logic.ErrorStatus, flavor_logic.add_regions,
                          'uuid', RegionWrapper([Region(name='')]),
                          'transaction')
        self.assertRaises(flavor_logic.ErrorStatus, flavor_logic.add_regions,
                          'uuid', RegionWrapper([Region(name='test_region', type='group')]),
                          'transaction')
项目:lol-data-aggregator    作者:StickmanVentures    | 项目源码 | 文件源码
def get_teams(self, summoners):
        self.delay()

        error = True

        while error:
            try:
                teams = self.watcher.get_teams_for_summoners(summoners)
                error = False
            except LoLException:
                pass

        for key, value in teams.items():
            for item in value:
                try:
                    # Check if the team is 5x5.
                    if len(item['roster']['memberList']) < 5:
                        continue
                except KeyError:
                    # LoL api is stupid and sometimes forgets to put a key
                    # in it's api return when it should put null.
                    continue

                team = Team(item['fullId'])
                self.session.add(team)

                # Add all members in the 5x5 team to the summoners table.
                for member in item['roster']['memberList']:
                    roster_member = TeamRosterMember(team.id, member['playerId'])
                    self.session.add(roster_member)

                try:
                    self.session.commit()
                    print('New Team - %s' % team)
                except IntegrityError:
                    self.session.rollback()
                    print('Duplicate Team - %s' % team)
                    continue
                except FlushError:
                    self.session.rollback()
                    print('FlushError Team - %s' % team)
项目:snovault    作者:ENCODE-DCC    | 项目源码 | 文件源码
def test_keys(session):
    from sqlalchemy.orm.exc import FlushError
    from snovault.storage import (
        Resource,
        Key,
    )
    name = 'testdata'
    props1 = {'foo': 'bar'}
    resource = Resource('test_item', {name: props1})
    session.add(resource)
    session.flush()
    resource = session.query(Resource).one()

    testname = 'foo'
    key = Key(rid=resource.rid, name=testname, value=props1[testname])
    session.add(key)
    session.flush()
    assert session.query(Key).count() == 1
    othertest = 'foofoo'
    othervalue = 'barbar'
    key2 = Key(rid=resource.rid, name=othertest, value=othervalue)
    session.add(key2)
    session.flush()
    assert session.query(Key).count() == 2
    props2 = {'foofoo': 'barbar'}
    resource2 = Resource('test_item', {name: props2})
    session.add(resource2)
    session.flush()
    key3 = Key(rid=resource2.rid, name=testname, value=props1[testname])
    session.add(key3)
    with pytest.raises(FlushError):
        session.flush()
项目:HTSOHM-dev    作者:akaija    | 项目源码 | 文件源码
def calculate_mutation_strength(run_id, generation, mutation_strength_bin):
    """Query mutation_strength for bin and adjust as necessary.

    Args:
        run_id (str): identification string for run.
        generation (int): iteration in bin-mutate-simulate routine.
        parent (sqlalchemy.orm.query.Query): parent-material corresponding to
            the bin being queried.

    Returns:
        mutation_strength.strength (float): mutation strength to be used for
        parents in the bin being queried. If the fraction of children from
        previous generation which populate the SAME bin as their parent is LESS
        THAN 10% then the mutation strength is REDUCED BY 5%. If the fraction
        of these children populating the SAME bin as their parent is GREATER
        THAN 50% then the mutation strength is INCREASED BY 5%.

    """
    mutation_strength_key = [run_id, generation] + mutation_strength_bin
    mutation_strength = session.query(MutationStrength).get(mutation_strength_key)

    if mutation_strength:
        print("Mutation strength already calculated for this bin and generation.")
    else:
        print("Calculating mutation strength...")
        mutation_strength = MutationStrength.get_prior(*mutation_strength_key).clone()
        if config['interactive_mode'] == 'on':
            print('Prior mutation strength :\t{}'.format(mutation_strength.strength))
        mutation_strength.generation = generation

        try:
            fraction_in_parent_bin = calculate_percent_children_in_bin(run_id, generation, mutation_strength_bin)

            if config['interactive_mode'] != 'on':
                if fraction_in_parent_bin < 0.1 and mutation_strength.strength - 0.05 > 0:
                    mutation_strength.strength -= 0.05
                elif fraction_in_parent_bin > 0.5 and mutation_strength.strength + 0.05 < 1:
                    mutation_strength.strength += 0.05

            elif config['interactive_mode'] == 'on':
                print('\tFor adaptive mutation strength(s), DECREASE strength \n' +
                        '\tby 5% if the fraction of children in parent-bin is \n' +
                        '\tLESS THAN 0.1; INCREASE strength by 5% if the \n' +
                        '\tfraction of children in parent-bin is GREATER THAN \n' +
                        '\t0.5.')
                mutation_strength.strength = set_variable(
                    'Please Enter new mutation strength :\t', 'mutation_strength.strength')
        except ZeroDivisionError:
            print("No prior generation materials in this bin with children.")

        try:
            session.add(mutation_strength)
            session.commit()
        except (FlushError, IntegrityError) as e:
            print("Somebody beat us to saving a row with this generation. That's ok!")
            session.rollback()
            # it's ok b/c this calculation should always yield the exact same result!
    sys.stdout.flush()
    return mutation_strength.strength
项目:ranger    作者:openstack    | 项目源码 | 文件源码
def test_add_tenants_errors(self, mock_gfbu, mock_strin):
        ret_flavor = MagicMock()
        tenants = ['test_tenant']
        ret_flavor.flavor.tenants = tenants
        mock_gfbu.return_value = ret_flavor
        global error

        error = 1
        injector.override_injected_dependency(
            ('data_manager', get_datamanager_mock))
        self.assertRaises(flavor_logic.ErrorStatus,
                          flavor_logic.add_tenants, 'uuid',
                          TenantWrapper(tenants),
                          'transaction')

        # Flavor is public
        error = 5
        self.assertRaises(flavor_logic.ErrorStatus,
                          flavor_logic.add_tenants, 'uuid',
                          TenantWrapper(tenants),
                          'transaction')

        error = 31
        moq = MagicMock()
        moq.tenants = [1337]
        self.assertRaises(ValueError,
                          flavor_logic.add_tenants, 'uuid',
                          moq,
                          'transaction')

        mock_strin.side_effect = exc.FlushError(
            'conflicts with persistent instance')
        self.assertRaises(flavor_logic.ConflictError,
                          flavor_logic.add_tenants, 'uuid',
                          TenantWrapper(tenants),
                          'transaction')

        mock_strin.side_effect = exc.FlushError('')
        self.assertRaises(exc.FlushError,
                          flavor_logic.add_tenants, 'uuid',
                          TenantWrapper(tenants),
                          'transaction')