我们从Python开源项目中,提取了以下29个代码示例,用于说明如何使用sqlalchemy.exc.DataError()。
def __init__(self, request): self.request = request self.entity_id = request.matchdict.get('id') self.entity = None if self.entity_id: from stalker import SimpleEntity from sqlalchemy.exc import DataError try: self.entity = SimpleEntity.query\ .filter(SimpleEntity.id == self.entity_id).first() except DataError: self.entity = None if not self.entity: from pyramid.exceptions import HTTPNotFound raise HTTPNotFound('Entity not found!')
def insert(employee: dict) -> dict: """ Insert a new employee profile :return: created employee profile """ logger.info('New employee profile: {profile}'.format(profile=employee)) if Employee.query.filter(Employee.name == employee.get('name')).first(): logger.warning(warning.ALREADY_EXISTS) abort(400, {'message': warning.ALREADY_EXISTS}) else: try: employee.pop('id', None) employee['registered'] = datetime.utcnow() db.session.add(Employee(**employee)) db.session.commit() logger.info('Profile saved!') except DataError: abort(400, {'message': warning.INVALID_DATA_TYPE}) return NoContent, 200
def update(employee_id: int, employee: dict) -> dict: """ Update employee profile by id :param employee_id: employee id :param employee: employee profile :return: updated employee profile """ try: logger.info('Employee id that want to be updated: {id}'.format(id=employee_id)) selected_employee = Employee.query.filter(Employee.id == employee_id).first_or_404() selected_employee.update(**employee) db.session.commit() logger.info('Updated!') return NoContent, 201 except DataError: abort(400, {'message': warning.INVALID_DATA_TYPE})
def create_or_update_time_spent(task_id, person_id, date, duration, add=False): try: time_spent = TimeSpent.get_by( task_id=task_id, person_id=person_id, date=date ) except DataError: raise WrongDateFormatException if time_spent is not None: if add: time_spent.update({"duration": time_spent.duration + duration}) else: time_spent.update({"duration": duration}) else: time_spent = TimeSpent.create( task_id=task_id, person_id=person_id, date=date, duration=duration ) return time_spent.serialize()
def login_user(cls, name, password): session = SessionManager.Session() try: user = session.query(User).filter(User.name == name).one() if check_password_hash(user.password, password): credential = cls(user) SessionManager.Session.remove() return credential else: raise ClientError(ClientError.LOGIN_FAIL) except NoResultFound: raise ClientError(ClientError.LOGIN_FAIL) except DataError: raise ClientError(ClientError.LOGIN_FAIL) except ClientError as error: raise error except Exception as error: raise ServerError(error.message) finally: SessionManager.Session.remove()
def setEmployeePosition(): '''This method is used to store the new position for the employee Example : POST /api/v1/employeepostions {"name" : "Cook", "description" : "THis is desription"} ''' with SessionManager(Session) as session: try: name = request.json['name'] description = request.json.get('description', 'NA') sql_pos = EmployeePosition(name=name, description=description) session.add(sql_pos) session.commit() return jsonify(post_envelop(200, data = request.json)) except DataError: #this excepyion might probably occur if the value key has a value of non integer return jsonify(error_envelop(400, 'DataError', 'Use the correct value')) except IntegrityError: return jsonify(error_envelop(400, 'IntegrityError','Value : {0} already exists'.format(name))) except: return jsonify(error_envelop(400, 'UnknownError', 'Error need to be identified'))
def test_blogpost_title_length(self): """Test BLOGPOST model title length has a limit""" self.configure_fixtures() valid_title = 'a' * 255 invalid_title = 'a' * 256 blogpost = Blogpost(title=valid_title, body="body", project=self.project) db.session.add(blogpost) assert_not_raises(DataError, db.session.commit) blogpost.title = invalid_title assert_raises(DataError, db.session.commit)
def _check_if_pod_exists(pod_id): user = KubeUtils.get_current_user() try: pod = Pod.filter(Pod.owner_id == user.id, Pod.id == pod_id).first() if pod is None: raise PodNotFound except DataError: # pod_id is not uuid raise PodNotFound
def test_invalid_date(self): self.insert_employee() payload = {'birthdate': 'test'} res = self.test_app.put(self.url, headers=self.headers, data=json.dumps(payload)) self.assertEqual(res.status_code, 400) from sqlalchemy.exc import DataError self.assertRaises(DataError)
def test_invalid_date(self): payload = {'name': 'budi2', 'gender': Employee.Gender.MALE, 'birthdate': 'test', 'active': True} res = self.test_app.post('/employee', headers=self.headers, data=json.dumps(payload)) self.assertEqual(res.status_code, 400) from sqlalchemy.exc import DataError self.assertRaises(DataError)
def test_country_name_overflow_error(session, country_data): """Country 003: Verify error if country name exceeds field length.""" too_long_country = Countries(**country_data['overflow']) with pytest.raises(DataError): session.add(too_long_country) session.commit()
def test_competition_name_overflow_error(session, comp_data): """Competition 003: Verify error if competition name exceeds field length.""" overflow_record = Competitions(**comp_data['overflow']) with pytest.raises(DataError): session.add(overflow_record) session.commit()
def registerReferenceSet(self, guid, assembly_id, source_accessions=None, description=None, references=OrderedDict()): """ ReferenceSet registration for MAF occurs from an assembly config file. See hg19.json for example. ReferenceSet registration for VCF occurs from reading VCF contig tags in header. Requires assembly ids and guids to be unique. """ referenceSet = self.session.query(ReferenceSet).filter( or_(ReferenceSet.assembly_id == assembly_id, ReferenceSet.guid == guid))\ .first() if referenceSet is None: try: referenceSet = ReferenceSet( guid=guid, assembly_id=assembly_id, description=description) self.session.add(referenceSet) self.session.commit() except exc.DataError as e: self.session.rollback() raise ValueError("{0} : {1} ".format(str(e), guid)) if len(references) > 0: # use pyvcf like ordered dict to avoiding having to specify # reference order manually refs = sortReferences(references) for ref in refs: self.registerReference( str(uuid.uuid4()), referenceSet.id, ref, refs[ref].length) return referenceSet
def registerReference(self, guid, reference_set_id, name, length): """ Registers a Reference. Most often called by registerReferenceSet. Requires a Reference name be unique for all references in a reference set """ # contig MT is same as contig M, in meta db we will always use M to be # consistent if name == 'MT': name = 'M' reference = self.session.query(Reference).filter( and_(Reference.reference_set_id == reference_set_id,Reference.name == name))\ .first() if reference is None: try: reference = Reference( name=name, reference_set_id=reference_set_id, length=length, guid=guid ) self.session.add(reference) self.session.commit() except exc.DataError as e: self.session.rollback() raise ValueError("{0} : {1} ".format(str(e), guid)) return reference
def registerWorkspace(self, guid, name): """ Registers a workspace. Workspace name is the path to the workspace directory. This is assumed unique per metadb instance. """ # if the name ends with a / then remove it from the name. # This is done only for consistency in workspace name # since users could have / or not for the workspace. name = name.rstrip('/') workspace = self.session.query(Workspace).filter( and_(Workspace.name == name))\ .first() if workspace is None: try: workspace = Workspace( guid=guid, name=name ) self.session.add(workspace) self.session.commit() except exc.DataError as e: self.session.rollback() raise ValueError("{0} : {1} ".format(str(e), guid)) return workspace
def registerDBArray(self, guid, reference_set_id, workspace_id, name): """ Registers a DBArray. An array is unique named folder in a unique workspace path and a given reference id. """ # array is a unique set of workspace, array, and reference set # association dbarray = self.session.query(DBArray) .filter( and_(DBArray.reference_set_id == reference_set_id,\ DBArray.workspace_id == workspace_id,\ DBArray.name == name))\ .first() if dbarray is None: try: dbarray = DBArray( guid=guid, reference_set_id=reference_set_id, workspace_id=workspace_id, name=name ) self.session.add(dbarray) self.session.commit() except exc.DataError as e: self.session.rollback() raise ValueError("{0} : {1} ".format(str(e), guid)) return dbarray
def registerVariantSet(self, guid, reference_set_id, dataset_id=None, metadata=None): """ Register variant set. """ referenceSet = self.session.query(ReferenceSet).filter( ReferenceSet.id == reference_set_id)\ .first() if referenceSet is None: raise ValueError( "ReferenceSet must be registered before registering this VariantSet : {0} ".format(reference_set_id)) variantSet = self.session.query( VariantSet).filter(VariantSet.guid == guid)\ .first() if variantSet is None: try: variantSet = VariantSet( guid=guid, reference_set_id=reference_set_id, dataset_id=dataset_id, variant_set_metadata=metadata ) self.session.add(variantSet) self.session.commit() except exc.DataError as e: self.session.rollback() raise ValueError("{0} : {1} ".format(str(e), guid)) return variantSet
def registerIndividual(self, guid, name, info=None): """ Registration of an individual requires a guid and a name. Name can be None to support retrival from registerSample """ individual = self.session.query(Individual).filter( or_(Individual.guid == guid, Individual.name == name))\ .first() if individual is None: try: individual = Individual( name=name, guid=guid, info=info, record_update_time=strftime("%Y-%m-%d %H:%M:%S%S.%S%S%S"), record_create_time=strftime("%Y-%m-%d %H:%M:%S%S.%S%S%S") ) self.session.add(individual) self.session.commit() except (exc.DataError, exc.IntegrityError) as e: self.session.rollback() raise ValueError("{0} : {1} : {2} ".format(str(e), guid, name)) return individual
def test_MetaTooBig(session, db_library, request): lid = db_library.id v = "!" * (Meta.VERSION_LIMIT + 1) session.add(Meta(version=v)) if "sqlite" in request.keywords: session.commit() assert session.query(Meta).filter_by(version=v).one() else: with pytest.raises(DataError): session.commit()
def test_LibraryTooBig(session, db_library, request): lid = db_library.id v = "!" * (Library.NAME_LIMIT + 1) session.add(Library(name=v)) if "sqlite" in request.keywords: session.commit() assert session.query(Library).filter_by(name=v).one() else: with pytest.raises(DataError): session.commit()
def test_ImageType(session, db_library, request, mp3audiofile): img = Image(type="Beyond", size=666, md5="48a6de198166ad9806f1a2172f2947df", mime_type="img/jpg", description="Iceburn", data=b"\xde\xad\xbe\xef") session.add(img) if "sqlite" in request.keywords: with pytest.raises(IntegrityError): session.commit() else: assert "postgresql" in request.keywords with pytest.raises(DataError): session.commit()
def setVat(): '''This function is used to store the new vat in the database Example : POST /api/v1/vats HTTP/1.1 {"name" : "food", "value":34} Result : { "meta": { "code": 200, "message": "Created Successfully" } } ''' with SessionManager(Session) as session: try: name = request.json['name'] value = request.json['value'] vat = Vat(name=name, value=value) session.add(vat) session.commit() return jsonify(post_envelop(200, data = request.json)) except DataError: #this excepyion might probably occur if the value key has a value of non integer return jsonify(error_envelop(400, 'DataError', 'Use the correct value')) except IntegrityError: return jsonify(error_envelop(400, 'IntegrityError','Value : {0} already exists'.format(name))) except: return jsonify(error_envelop(400,'UnknownError','Error need to be identified'))
def setServiceCharge(): '''This function is used to store the new servicecharge in the database Example : POST /api/v1/service charges HTTP/1.1 {"name" : "Service Charge", "value":10} Result : { "meta": { "code": 200, "message": "Created Successfully" } } ''' with SessionManager(Session) as session: try: name = request.json['name'] value = request.json['value'] service_charge = ServiceCharge(name=name, value=value) session.add(service_charge) session.commit() return jsonify(post_envelop(200, data = request.json)) except DataError: #this excepyion might probably occur if the value key has a value of non integer return jsonify(error_envelop(400, 'DataError', 'Use the correct value')) except IntegrityError: return jsonify(error_envelop(400, 'IntegrityError','Value : {0} already exists'.format(name))) except: return jsonify(error_envelop(400,'UnknownError','Error need to be identified'))
def setPayment(): '''This function is used to store the new payment in the database Example : POST /api/v1/payments HTTP/1.1 { "p_type": " onsite"} Result : { "meta": { "code": 200, "message": "Created Successfully" } } ''' with SessionManager(Session) as session: try: p_type = request.json['p_type'] payment = Payment(p_type=p_type) session.add(payment) session.commit() return jsonify(post_envelop(200, data = request.json)) except DataError: #this excepyion might probably occur if the value key has a value of non integer return jsonify(error_envelop(400, 'DataError', 'Use the correct value')) except IntegrityError: return jsonify(error_envelop(400, 'IntegrityError','Value : {0} already exists'.format(p_type))) except: return jsonify(error_envelop(400,'UnknownError','Error need to be identified'))
def setDineTable(): '''This function is used to store the new DineTable in the database Example : POST /api/v1/dinetables HTTP/1.1 { "capicity": 4, "alias" : "Table1", "status" : "empty"} Result : { "meta": { "code": 200, "message": "Created Successfully" } } ''' with SessionManager(Session) as session: try: capacity = request.json['capacity'] alias = request.json['alias'] status = request.json.get('status', 'empty') dine_table = DineTable(capacity=capacity, alias=alias, status=status) session.add(dine_table) session.commit() return jsonify(post_envelop(200, data = request.json)) except DataError: #this excepyion might probably occur if the value key has a value of non integer return jsonify(error_envelop(400, 'DataError', 'Use the correct value')) except IntegrityError: return jsonify(error_envelop(400, 'IntegrityError','Value : {0} already exists'.format(alias))) except: return jsonify(error_envelop(400,'UnknownError','Error need to be identified'))
def setMembership(): '''This function is used to store the new Membership in the database Example : POST /api/v1/memberships HTTP/1.1 { "m_type": "general", "discount" : 10, "description" : "some description"} Result : { "meta": { "code": 200, "message": "Created Successfully" } } ''' with SessionManager(Session) as session: try: m_type = request.json['m_type'] discount = request.json['discount'] if not 0 <= int(discount) < 100: return jsonify(error_envelop(400, 'DataError', 'Enter the valid discount amount (0 to 100)')) description = request.json.get('description', 'No Description Available') membership = Membership(m_type=m_type, discount=discount, description=description) session.add(membership) session.commit() return jsonify(post_envelop(200, data = request.json)) except DataError: #this excepyion might probably occur if the value key has a value of non integer return jsonify(error_envelop(400, 'DataError', 'Use the correct value')) except IntegrityError: return jsonify(error_envelop(400, 'IntegrityError','Value : {0} already exists'.format(m_type))) except: return jsonify(error_envelop(400,'UnknownError','Error need to be identified'))
def updateMembership(m_id): ''' PUT /api/v1/dientables/6 HTTP/1.1 {"alias" : "new table name", "capacity" : 3} Result : { "data": { "capacity": 3, "alias" : "new table name" }, "meta": { "code": 200, "message": "Updated Successfully" } } ''' with SessionManager(Session) as session: try: sql_membership = session.query(Membership).filter(Membership.id == m_id).one() sql_membership.m_type = request.json.get('m_type', sql_membership.m_type) sql_membership.discount = request.json.get('discount', sql_membership.discount) #check weather the discount is between 0 and 100 if not 0 <= int(sql_membership.discount) < 100: return jsonify(error_envelop(400, 'DataError', 'Enter the valid discount amount (0 to 100)')) sql_membership.description = request.json.get('description', sql_membership.description) session.commit() return jsonify(update_envelop(200, data=request.json)) except IntegrityError: # if name already exsits in database return jsonify(error_envelop(400, 'Integrity Error','Name already Exists')) except NoResultFound: return jsonify(error_envelop(404, 'ValueError', 'Id : {0} not found'.format(m_id))) return jsonify(error_envelop(100, 'UnknownError', 'UnknownError Found')) #now the item is succesfulluy updated
def setCustomersByMembership(m_id): '''This function is used to set the customer based on membership_id as a foriegn key''' with SessionManager(Session) as session: try: first_name = request.json['first_name'] last_name = request.json['last_name'] middle_name = request.json.get('middle_name', 'NA') contact_number = request.json.get('contact_number', 'NA') address = request.json.get('address', 'NA') gender = request.json['gender'] age = request.json['age'] email = request.json.get('email', 'NA') if not 6 <= int(age) < 99: return jsonify(error_envelop(400, "Age Error", "Please enter the age between 6 and 99 ")) c = Customer(first_name = first_name, last_name = last_name, contact_number = contact_number, address = address, gender = gender, age =age, membership_id=m_id, middle_name=middle_name, email=email) session.add(c) session.commit() return jsonify(post_envelop(200, data=request.json)) except DataError: #this excepyion might probably occur if the value key has a value of non integer return jsonify(error_envelop(400, 'DataError', 'Use the correct value')) except IntegrityError: return jsonify(error_envelop(400, 'IntegrityError','Violates foreign key ({0}) constraint'.format(m_id))) except: return jsonify(error_envelop(400,'UnknownError','Error need to be identified'))
def setBill(): with SessionManager(Session) as session: try: total_price = request.json['total_price'] on_site = request.json.get('on_site') customer_id = request.json['customer_id'] dinetable_id = request.json['dinetable_id'] employee_id = request.json['employee_id'] payment_id = request.json['payment_id'] vat_id = request.json['vat_id'] service_charge_id = request.json['service_charge_id'] bill_description = request.json.get('bill_description', 'NA') item_orders = request.json['item_orders'] if len(item_orders) < 1: return jsonify(error_envelop(400, 'BillError', 'Please enter atleast one item!!')) #if there is more than one elements in bills then create a new bill object bill = Bill(total_price=total_price, bill_description=bill_description, on_site=on_site, customer_id=customer_id, dinetable_id=dinetable_id, employee_id=employee_id, payment_id=payment_id, vat_id=vat_id, service_charge_id=service_charge_id) session.add(bill) for item_order in item_orders: item_id = item_order['item_id'] quantity = item_order['quantity'] order_price = item_order['order_price'] session.add(ItemOrder(item_id=item_id, quantity=quantity, order_price=order_price, bill=bill)) session.commit() return jsonify(post_envelop(200, data = request.json)) except DataError: #this excepyion might probably occur if the value key has a value of non integer return jsonify(error_envelop(400, 'DataError', 'Use the correct value')) except IntegrityError: return jsonify(error_envelop(400, 'IntegrityError','Foreign Key violations. Use the correct id')) except: return jsonify(error_envelop(400, 'UnknownError', 'Error need to be identified'))