我们从Python开源项目中,提取了以下47个代码示例,用于说明如何使用mock.patch.dict()。
def test_get_jira_user_multi(self): """ Tests the _get_jira_user method when JIRA returns more than one matching user. """ self.mock_auth.search_users = Mock(return_value=[self.mock_jira_user_1, self.mock_jira_user_2]) with patch('ambassador.transport.Transport.get_key_cert', return_value=self.mock_cert): with patch('platforms.jira.handlers.jira.JIRA', return_value=self.mock_auth): with patch.dict('platforms.jira.handlers.settings.JIRA', self.mock_settings): handler_w_user = IssueAPI(endpoint=self.endpoint, user=self.user) actual = handler_w_user._get_jira_user() expected = None self.assertEqual(actual, expected)
def test_trims_path(self, virtual_env): # pylint: disable=no-self-use venv_path = '/Y1w4sD/DELETE/THIS/ISUy0r' values = { 'VIRTUAL_ENV': venv_path, 'PATH': ':'.join(('/0ho8Ke/hL9DsW/ymH81W', '/L51pua', '/Vngp3V/G2m7Ih/05m7qW/LyrYkK/l5NuwA/oq1DPp', venv_path, '/Zpdhu4/bjvuqt', '/STGtcb/FhAnWH/HwTvOr/gngGiB', '/Zizj4D/szncsv/O5wO6X/joFHVT')) } with patch.dict('os.environ', values=values, clear=True): env = virtual_env.sanitized_env() assert venv_path not in env['PATH'] assert env['PATH'] == ( '/0ho8Ke/hL9DsW/ymH81W:/L51pua:/Vngp3V/G2m7Ih/05m7qW/LyrYkK/' 'l5NuwA/oq1DPp:/Zpdhu4/bjvuqt:/STGtcb/FhAnWH/HwTvOr/gngGiB:' '/Zizj4D/szncsv/O5wO6X/joFHVT')
def test_disqus_sso_payload_auth_user(self, mock_b64encode, mock_hmac): """Test Disqus SSO payload auth works.""" user = UserFactory.create() DISQUS_PUBLIC_KEY = 'public' DISQUS_SECRET_KEY = 'secret' patch_dict = {'DISQUS_PUBLIC_KEY': DISQUS_PUBLIC_KEY, 'DISQUS_SECRET_KEY': DISQUS_SECRET_KEY} data = json.dumps({'id': user.id, 'username': user.name, 'email': user.email_addr}) mock_b64encode.return_value = data with patch.dict(self.flask_app.config, patch_dict): message, timestamp, sig, pub_key = util.get_disqus_sso_payload(user) mock_b64encode.assert_called_with(data) mock_hmac.assert_called_with(DISQUS_SECRET_KEY, '%s %s' % (data, timestamp), hashlib.sha1) assert timestamp assert sig assert pub_key == DISQUS_PUBLIC_KEY
def test_disqus_sso_payload_anon_user(self, mock_b64encode, mock_hmac): """Test Disqus SSO payload anon works.""" DISQUS_PUBLIC_KEY = 'public' DISQUS_SECRET_KEY = 'secret' patch_dict = {'DISQUS_PUBLIC_KEY': DISQUS_PUBLIC_KEY, 'DISQUS_SECRET_KEY': DISQUS_SECRET_KEY} data = json.dumps({}) mock_b64encode.return_value = data with patch.dict(self.flask_app.config, patch_dict): message, timestamp, sig, pub_key = util.get_disqus_sso_payload(None) mock_b64encode.assert_called_with(data) mock_hmac.assert_called_with(DISQUS_SECRET_KEY, '%s %s' % (data, timestamp), hashlib.sha1) assert timestamp assert sig assert pub_key == DISQUS_PUBLIC_KEY
def test_handle_content_type_json_error(self, mocklast, mockjsonify, mockrender, mockrequest): fake_d = {'Content-Type': 'application/json'} mockrequest.headers.__getitem__.side_effect = fake_d.__getitem__ mockrequest.headers.get.side_effect = fake_d.get mockrequest.headers.__iter__.side_effect = fake_d.__iter__ mockjsonify.side_effect = myjsonify res, code = util.handle_content_type( dict( template='example.html', code=404, description="Not found")) err_msg = "template key should exist" assert res.get('template') == 'example.html', err_msg err_msg = "jsonify should be called" assert mockjsonify.called, err_msg err_msg = "Error code should exist" assert res.get('code') == 404, err_msg assert code == 404, err_msg err_msg = "Error description should exist" assert res.get('description') is not None, err_msg
def test_handle_content_type_json_form(self, mocklast, mockcsrf, mockjsonify, mockrender, mockrequest): fake_d = {'Content-Type': 'application/json'} mockrequest.headers.__getitem__.side_effect = fake_d.__getitem__ mockrequest.headers.get.side_effect = fake_d.get mockrequest.headers.__iter__.side_effect = fake_d.__iter__ mockjsonify.side_effect = myjsonify mockcsrf.return_value = "yourcsrf" form = MagicMock(spec=Form, data=dict(foo=1), errors=None) res = util.handle_content_type(dict(template='example.html', form=form)) err_msg = "template key should exist" assert res.get('template') == 'example.html', err_msg err_msg = "jsonify should be called" assert mockjsonify.called, err_msg err_msg = "Form should exist" assert res.get('form'), err_msg err_msg = "Form should have a csrf key/value" assert res.get('form').get('csrf') == 'yourcsrf', err_msg err_msg = "There should be the keys of the form" keys = ['foo', 'errors', 'csrf'] assert res.get('form').keys().sort() == keys.sort(), err_msg
def test_handle_content_type_html(self, mockjsonify, mockrender, mockrequest): fake_d = {'Content-Type': 'text/html'} mockrequest.headers.__getitem__.side_effect = fake_d.__getitem__ mockrequest.headers.get.side_effect = fake_d.get mockrequest.headers.__iter__.side_effect = fake_d.__iter__ mockjsonify.side_effect = myjsonify mockrender.side_effect = myrender pagination = util.Pagination(page=1, per_page=5, total_count=10) template, data = util.handle_content_type(dict(template='example.html', pagination=pagination)) err_msg = "Template should be rendered" assert template == 'example.html', err_msg err_msg = "Template key should not exist" assert data.get('template') is None, err_msg err_msg = "jsonify should not be called" assert mockjsonify.called is False, err_msg err_msg = "render_template should be called" assert mockrender.called is True, err_msg
def test_handle_content_type_html_error(self, mockjsonify, mockrender, mockrequest): fake_d = {'Content-Type': 'text/html'} mockrequest.headers.__getitem__.side_effect = fake_d.__getitem__ mockrequest.headers.get.side_effect = fake_d.get mockrequest.headers.__iter__.side_effect = fake_d.__iter__ mockjsonify.side_effect = myjsonify mockrender.side_effect = myrender template, code = util.handle_content_type(dict(template='example.html', code=404)) data = template[1] template = template[0] err_msg = "Template should be rendered" assert template == 'example.html', err_msg err_msg = "Template key should not exist" assert data.get('template') is None, err_msg err_msg = "jsonify should not be called" assert mockjsonify.called is False, err_msg err_msg = "render_template should be called" assert mockrender.called is True, err_msg err_msg = "There should be an error" assert code == 404, err_msg err_msg = "There should not be code key" assert data.get('code') is None, err_msg
def test_UnicodeWriter(self): """Test UnicodeWriter class works.""" tmp = tempfile.NamedTemporaryFile() uw = util.UnicodeWriter(tmp) fake_csv = ['one, two, three, {"i": 1}'] for row in csv.reader(fake_csv): # change it for a dict row[3] = dict(i=1) uw.writerow(row) tmp.seek(0) err_msg = "It should be the same CSV content" with open(tmp.name, 'rb') as f: reader = csv.reader(f) for row in reader: for item in row: assert item in fake_csv[0], err_msg
def test_register_json_errors_get(self): """Test WEB register errors JSON works""" with patch.dict(self.flask_app.config, {'WTF_CSRF_ENABLED': True}): csrf = self.get_csrf('/account/register') userdict = {'fullname': 'a', 'name': 'name', 'email_addr': None, 'password': 'p'} res = self.app.post('/account/register', data=json.dumps(userdict), content_type='application/json', headers={'X-CSRFToken': csrf}) # The output should have a mime-type: application/json errors = json.loads(res.data).get('form').get('errors') assert res.mimetype == 'application/json', res.data err_msg = "There should be an error with the email" assert errors.get('email_addr'), err_msg err_msg = "There should be an error with fullname" assert errors.get('fullname'), err_msg err_msg = "There should be an error with password" assert errors.get('password'), err_msg err_msg = "There should NOT be an error with name" assert errors.get('name') is None, err_msg
def test_register_confirmation_validates_email(self, fake_signer): """Test WEB validates email""" self.register() user = db.session.query(User).get(1) user.valid_email = False user.confirmation_email_sent = True db.session.commit() fake_signer.loads.return_value = dict(fullname=user.fullname, name=user.name, email_addr=user.email_addr) self.app.get('/account/register/confirmation?key=valid-key') user = db.session.query(User).get(1) assert user is not None msg = "Email has not been validated" assert user.valid_email, msg msg = "Confirmation email flag has not been restored" assert user.confirmation_email_sent is False, msg
def test_confirm_account_newsletter(self, fake_signer, url_for, newsletter): """Test WEB confirm email shows newsletter or home.""" newsletter.ask_user_to_subscribe.return_value = True self.register() user = db.session.query(User).get(1) user.valid_email = False db.session.commit() fake_signer.loads.return_value = dict(fullname=user.fullname, name=user.name, email_addr=user.email_addr) self.app.get('/account/register/confirmation?key=valid-key') url_for.assert_called_with('account.newsletter_subscribe', next=None) newsletter.ask_user_to_subscribe.return_value = False self.app.get('/account/register/confirmation?key=valid-key') url_for.assert_called_with('home.home')
def test_account_upload_avatar(self): """Test WEB Account upload avatar.""" import io owner = UserFactory.create() url = '/account/%s/update?api_key=%s' % (owner.name, owner.api_key) avatar = (io.BytesIO(b'test'), 'test_file.jpg') payload = dict(btn='Upload', avatar=avatar, id=owner.id, x1=0, y1=0, x2=100, y2=100) res = self.app.post(url, follow_redirects=True, content_type="multipart/form-data", data=payload) assert res.status_code == 200 u = user_repo.get(owner.id) assert u.info['avatar'] is not None assert u.info['container'] is not None avatar_url = '/uploads/%s/%s' % (u.info['container'], u.info['avatar']) assert u.info['avatar_url'] == avatar_url
def test_update_project_json_as_user(self): """Test WEB JSON update project as user.""" admin = UserFactory.create() owner = UserFactory.create() user = UserFactory.create() project = ProjectFactory.create(owner=owner) url = '/project/%s/update?api_key=%s' % (project.short_name, user.api_key) res = self.app_get_json(url) data = json.loads(res.data) assert data['code'] == 403, data old_data = dict() old_data['description'] = 'foobar' res = self.app_post_json(url, data=old_data) data = json.loads(res.data) assert data['code'] == 403, data
def test_26_tutorial_signed_user(self): """Test WEB tutorials work as signed in user""" self.create() project1 = db.session.query(Project).get(1) project1.info = dict(tutorial="some help", task_presenter="presenter") db.session.commit() self.register() # First time accessing the project should redirect me to the tutorial res = self.app.get('/project/test-app/newtask', follow_redirects=True) err_msg = "There should be some tutorial for the project" assert "some help" in res.data, err_msg # Second time should give me a task, and not the tutorial res = self.app.get('/project/test-app/newtask', follow_redirects=True) assert "some help" not in res.data # Check if the tutorial can be accessed directly res = self.app.get('/project/test-app/tutorial', follow_redirects=True) err_msg = "There should be some tutorial for the project" assert "some help" in res.data, err_msg
def test_26_tutorial_signed_user_json(self): """Test WEB tutorials work as signed in user""" self.create() project1 = db.session.query(Project).get(1) project1.info = dict(tutorial="some help", task_presenter="presenter") db.session.commit() self.register() # First time accessing the project should redirect me to the tutorial res = self.app.get('/project/test-app/newtask', follow_redirects=True) err_msg = "There should be some tutorial for the project" assert "some help" in res.data, err_msg # Second time should give me a task, and not the tutorial res = self.app.get('/project/test-app/newtask', follow_redirects=True) assert "some help" not in res.data # Check if the tutorial can be accessed directly res = self.app_get_json('/project/test-app/tutorial') data = json.loads(res.data) err_msg = 'key missing' assert 'owner' in data, err_msg assert 'project' in data, err_msg assert 'template' in data, err_msg assert 'title' in data, err_msg err_msg = 'project tutorial missing' assert 'My New Project' in data['title'], err_msg
def test_27_tutorial_anonymous_user_json(self): """Test WEB tutorials work as an anonymous user""" self.create() project = db.session.query(Project).get(1) project.info = dict(tutorial="some help", task_presenter="presenter") db.session.commit() # First time accessing the project should redirect me to the tutorial res = self.app.get('/project/test-app/newtask', follow_redirects=True) err_msg = "There should be some tutorial for the project" assert "some help" in res.data, err_msg # Second time should give me a task, and not the tutorial res = self.app.get('/project/test-app/newtask', follow_redirects=True) assert "some help" not in res.data # Check if the tutorial can be accessed directly res = self.app_get_json('/project/test-app/tutorial') data = json.loads(res.data) err_msg = 'key missing' assert 'owner' in data, err_msg assert 'project' in data, err_msg assert 'template' in data, err_msg assert 'title' in data, err_msg err_msg = 'project tutorial missing' assert 'My New Project' in data['title'], err_msg
def test_anon(self): """Test with user authenticated.""" url = 'api/disqus/sso' DISQUS_PUBLIC_KEY = 'public' DISQUS_SECRET_KEY = 'secret' patch_dict = {'DISQUS_PUBLIC_KEY': DISQUS_PUBLIC_KEY, 'DISQUS_SECRET_KEY': DISQUS_SECRET_KEY} with patch.dict(self.flask_app.config, patch_dict): res = self.app.get(url) data = json.loads(res.data) assert res.status_code == 200, res.status_code assert data['remote_auth_s3'] is not None, data assert data['api_key'] is not None, data
def test_get_jobs_only_on_sunday_variant(self, mock_datetime): """Test JOB get jobs for weekly stats works only on Sunday variant.""" user = UserFactory.create(pro=True) pr = ProjectFactory(owner=user) task = TaskFactory.create(project=pr) TaskRunFactory.create(project=pr, task=task) mock_date = MagicMock() mock_date.strftime.return_value = 'Sunday' mock_datetime.today.return_value = mock_date jobs = get_weekly_stats_update_projects() for job in jobs: assert type(job) == dict, type(job) assert job['name'] == send_weekly_stats_project assert job['args'] == [pr.id] assert job['kwargs'] == {} assert job['timeout'] == self.flask_app.config.get('TIMEOUT') assert job['queue'] == 'low'
def test_get_jobs_no_pro_feature_for_everyone(self, mock_datetime): """Test JOB get jobs for weekly stats works for non pros if feature is only for everyone.""" user = UserFactory.create(pro=False) pr = ProjectFactory(owner=user) task = TaskFactory.create(project=pr) TaskRunFactory.create(project=pr, task=task) mock_date = MagicMock() mock_date.strftime.return_value = 'Sunday' mock_datetime.today.return_value = mock_date jobs = [job for job in get_weekly_stats_update_projects()] assert len(jobs) == 1 for job in jobs: assert type(job) == dict, type(job) assert job['name'] == send_weekly_stats_project assert job['args'] == [pr.id] assert job['kwargs'] == {} assert job['timeout'] == self.flask_app.config.get('TIMEOUT') assert job['queue'] == 'low'
def test_get_jobs_only_on_featured_variant(self, mock_datetime): """Test JOB get jobs for weekly stats works for featured.""" user = UserFactory.create(pro=False) pr = ProjectFactory(owner=user, featured=True) task = TaskFactory.create(project=pr) TaskRunFactory.create(project=pr, task=task) mock_date = MagicMock() mock_date.strftime.return_value = 'Sunday' mock_datetime.today.return_value = mock_date jobs = get_weekly_stats_update_projects() for job in jobs: assert type(job) == dict, type(job) assert job['name'] == send_weekly_stats_project assert job['args'] == [pr.id] assert job['kwargs'] == {} assert job['timeout'] == self.flask_app.config.get('TIMEOUT') assert job['queue'] == 'low'
def test_autoimport_jobs_with_autoimporter(self): """Test JOB autoimport jobs returns projects with autoimporter.""" user = UserFactory.create(pro=True) project = ProjectFactory.create(owner=user,info=dict(autoimporter='foobar')) jobs_generator = get_autoimport_jobs() jobs = [] for job in jobs_generator: jobs.append(job) msg = "There should be 1 job." assert len(jobs) == 1, msg job = jobs[0] msg = "It sould be the same project." assert job['args'][0] == project.id, msg msg = "It sould be created as an auto import job." assert job['args'][1] == True, msg msg = "There sould be the kwargs." assert job['kwargs'] == 'foobar', msg
def test_autoimport_jobs_without_pro_when_for_everyone(self): """Test JOB autoimport jobs returns normal user owned projects if autoimporter is enabled for everyone.""" project = ProjectFactory.create(info=dict(autoimporter='foobar')) jobs_generator = get_autoimport_jobs() jobs = [] for job in jobs_generator: jobs.append(job) msg = "There should be 1 job." assert len(jobs) == 1, msg job = jobs[0] msg = "It sould be the same project." assert job['args'][0] == project.id, msg msg = "It sould be created as an auto import job." assert job['args'][1] == True, msg msg = "There sould be the kwargs." assert job['kwargs'] == 'foobar', msg
def test_rackspace_uploader_init(self, Mock): """Test RACKSPACE UPLOADER init works.""" new_extensions = ['pdf', 'doe'] with patch('pybossa.uploader.rackspace.pyrax.cloudfiles', return_value=cloudfiles_mock): with patch.dict(self.flask_app.config, {'ALLOWED_EXTENSIONS': new_extensions}): with patch('pybossa.uploader.rackspace.pyrax.cloudfiles') as mycf: mycf.get_container.return_value = True u = RackspaceUploader() res = u.init_app(self.flask_app, cont_name='mycontainer') err_msg = "It should return the container." assert res is True, err_msg err_msg = "The container name should be updated." assert u.cont_name == 'mycontainer', err_msg for ext in new_extensions: err_msg = "The .%s extension should be allowed" % ext assert ext in u.allowed_extensions, err_msg
def test_local_uploader_init(self): """Test LOCAL UPLOADER init works.""" u = LocalUploader() u.init_app(self.flask_app) new_extensions = ['pdf', 'doe'] new_upload_folder = '/tmp/' new_config_ext = {'ALLOWED_EXTENSIONS': new_extensions} new_config_uf = {'UPLOAD_FOLDER': new_upload_folder} with patch.dict(self.flask_app.config, new_config_ext): with patch.dict(self.flask_app.config, new_config_uf): new_uploader = LocalUploader() new_uploader.init_app(self.flask_app) expected_extensions = set.union(u.allowed_extensions, new_extensions) err_msg = "The new uploader should support two extra extensions" assert expected_extensions == new_uploader.allowed_extensions, err_msg err_msg = "Upload folder /tmp should be existing" assert os.path.isdir(new_uploader.upload_folder) is True, err_msg err_msg = "Upload folder by default is /tmp/" assert new_uploader.upload_folder == '/tmp/', err_msg
def test_stdout_chunk(self, get_stdout, get_info_for_rj): """Test RunningJob() stdout_chunk""" get_info_for_rj.return_value = dict([('stdout_size', 10)]) get_stdout.return_value = "line1\nline2\nline3\nline4\nline5\nline6" running_job = pygenie.jobs.RunningJob('1234-stdout-chunk', info={'status': 'SUCCEEDED'}) running_job.stdout_chunk(10, offset=0) assert_equals( [ call('1234-stdout-chunk', headers={'Range': 'bytes=0-10'}) ], get_stdout.call_args_list )
def test_stdout_chunk_zero_size(self, get_stdout, get_info_for_rj): """Test RunningJob() stdout_chunk_zero_size""" get_info_for_rj.return_value = dict([('stdout_size', 0)]) get_stdout.return_value = "line1\nline2\nline3\nline4\nline5\nline6" running_job = pygenie.jobs.RunningJob('1234-stdout-chunk', info={'status': 'SUCCEEDED'}) chunk = running_job.stdout_chunk(10, offset=0) assert_equals( [], get_stdout.call_args_list ) assert_equals(chunk, None)
def invoke_3play_callback(self, state='complete'): """ Make request to 3PlayMedia callback handler, this invokes callback with all the necessary parameters. Arguments: state(str): state of the callback """ response = self.client.post( # `build_url` strips `/`, putting it back and add necessary query params. '/{}'.format(build_url( self.url, edx_video_id=self.video.studio_id, org=self.org, lang_code=self.video_source_language )), content_type='application/x-www-form-urlencoded', data=urllib.urlencode(dict(file_id=self.file_id, status=state)) ) return response
def test_adapter_mro(context): from pylogctx import log_adapter class Parent(object): pass class Child(Parent): pass @log_adapter(Parent) def parent_log_maker(instance): return dict(parent=id(instance)) context.update(Child()) data = context.as_dict() assert 'parent' in data
def test_adapter_manager(context): from pylogctx import log_adapter class Parent(object): pass class Child(Parent): pass @log_adapter(Parent) def parent_log_maker(instance): return dict(parent=id(instance)) with context(Child()): data = context.as_dict() assert 'parent' in data
def setup(): """setup temporary env for tests""" global tmp tmp = tempfile.mkdtemp() patchers[:] = [ patch.dict(os.environ, { 'HOME': tmp, # Let tests work with --user install when HOME is changed: 'PYTHONPATH': os.pathsep.join(sys.path), }), ] for p in patchers: p.start() # install IPython in the temp home: install(user=True)
def test_format_descr_wo_notes(self): """ Tests the _format_description method when the alert has no notes. """ self.mock_settings['INCLUDE_FULL_DESCRIPTION'] = False with patch('ambassador.transport.Transport.get_key_cert', return_value=self.mock_cert): with patch('platforms.jira.handlers.jira.JIRA', return_value=self.mock_auth): with patch.dict('platforms.jira.handlers.settings.JIRA', self.mock_settings): handler_w_user = IssueAPI(endpoint=self.endpoint, user=self.user) alert = Alert.objects.get(pk=2) actual = handler_w_user._format_description(alert) expected = '' self.assertEqual(actual, expected)
def test_allowed_file(self): """ Tests the get_file_path function for an allowed file type. """ self.msg.attach(self.image) attachment = get_first_attachment(self.msg) mock_uuid = Mock() mock_uuid.hex = self.uuid with patch('sifter.mailsifter.attachments.uuid.uuid4', return_value=mock_uuid): with patch.dict('sifter.mailsifter.attachments.settings.MAILSIFTER', self.mock_mailsifter_settings): actual = attachments.get_file_path(attachment, self.company) expected = '%s/test-attachments/%s.jpeg' \ % (self.company.uuid.hex, self.uuid) self.assertEqual(actual, expected)
def test_unallowed_file(self): """ Tests the get_file_path function for an unallowed file type. """ self.mock_mailsifter_settings['ALLOWED_EMAIL_ATTACHMENTS'] = ('application/java',) with patch.dict('sifter.mailsifter.accessors.settings.MAILSIFTER', self.mock_mailsifter_settings): self.msg.attach(self.java) attachment = get_first_attachment(self.msg) with patch('sifter.mailsifter.attachments.settings', return_value=self.mock_settings): with LogCapture() as log_capture: actual = attachments.get_file_path(attachment) expected = None self.assertEqual(actual, expected) msg = 'The attachment %s is not an allowed file type' \ % self.java_file log_capture.check( ('sifter.mailsifter.attachments', 'WARNING', msg), )
def test_no_file_path(self): """ Tests the save_attachment function. """ mock_settings_1 = { 'ALLOWED_EMAIL_ATTACHMENTS': ('application/java',) } with patch.dict('sifter.mailsifter.accessors.settings.MAILSIFTER', mock_settings_1): self.msg.attach(self.java) attachment = get_first_attachment(self.msg) with patch('sifter.mailsifter.attachments.settings', self.mock_settings): with LogCapture() as log_capture: actual = attachments.save_attachment(attachment) expected = None self.assertEqual(actual, expected) msg = 'The attachment %s is not an allowed file type' \ % self.java_file log_capture.check( ('sifter.mailsifter.attachments', 'WARNING', msg), )
def test_match_with_default(self): """ Tests the process_email receiver for an email that matches an existing MailChute. """ doc_obj = self.doc_obj doc_obj.data['Subject'] = 'critical alert' mock_config = { 'DEFAULT_MUNGER': 'default_mail', 'DEFAULT_MUNGER_ENABLED': True } with patch('distilleries.models.Distillery.save_data', return_value='id_123') as mock_save: with patch.dict('sifter.mailsifter.mailchutes.models.conf.MAILSIFTER', mock_config): with patch('sifter.mailsifter.mailchutes.models.MailChuteManager._process_with_default') \ as mock_catch_email: MailChute.objects.process(doc_obj) self.assertIs(mock_save.called, True) self.assertIs(mock_catch_email.called, False)
def test_no_match_with_default(self): """ Tests the process_email receiver for an email that doesn't match an existing MailChute when a default MailMunger is enabled. """ doc_obj = self.doc_obj doc_obj.data['Subject'] = 'nothing to see here' mock_config = { 'DEFAULT_MUNGER': 'default_mail', 'DEFAULT_MUNGER_ENABLED': True } with patch.dict('sifter.mailsifter.mailchutes.models.conf.MAILSIFTER', mock_config): with patch('sifter.mailsifter.mailchutes.models.MailChuteManager._process_with_default') \ as mock_default_process: MailChute.objects.process(doc_obj) mock_default_process.assert_called_once_with(doc_obj)
def test_no_match_missing_munger(self): """ Tests the process_email receiver for an email that doesn't match an existing MailChute when a default MailChute is enabled but the defaul MailMunger can't be found. """ doc_obj = self.doc_obj doc_obj.data['Subject'] = 'nothing to see here' mock_config = { 'DEFAULT_MUNGER': 'missing_munger', 'DEFAULT_MUNGER_ENABLED': True } with patch.dict('sifter.mailsifter.mailchutes.models.conf.MAILSIFTER', mock_config): with LogCapture() as log_capture: msg = 'Default MailMunger "missing_munger" is not configured.' MailChute.objects.process(doc_obj) log_capture.check( ('sifter.chutes.models', 'ERROR', msg), )
def test_get_default_no_chute(self): """ Tests the _default_munger function when the default LogMunger does not exist. """ mock_config = { 'DEFAULT_MUNGER': 'dummy_munger', 'DEFAULT_MUNGER_ENABLED': True } with patch.dict('sifter.logsifter.logchutes.models.conf.LOGSIFTER', mock_config): with LogCapture() as log_capture: actual = LogChute.objects._default_munger expected = None self.assertEqual(actual, expected) self.assertFalse(LogChute.objects._default_munger_enabled) log_capture.check( ('sifter.chutes.models', 'ERROR', 'Default LogMunger "dummy_munger" is not configured.'), )
def test_raise_no_supported_snmp_backend_found_raised_if_no_snmp_libraries_are_available( self): modules = { 'pynetsnmp': None, } with patch.dict(sys.modules, modules): self._patch_cleaning(sys.modules) pytest.raises(ImportError, 'import pynetsnmp') pytest.raises(ImportError, 'from nav.Snmp.pynetsnmp import *') try: from nav.Snmp import Snmp pytest.fail("Should never happen") except ImportError as foo: assert str(foo) == 'No supported SNMP backend was found'
def setUp(self): self.plugin_a = Mock( name='snmptrapd plugin a') self.plugin_a.strip.return_value = 'nav.snmptrapd.handlers.foo' self.plugin_b = Mock( name='snmptrapd plguin b') self.plugin_b.strip.return_value = 'nav.snmptrapd.handlers.bar' del self.plugin_b.initialize def raise_exception(): raise Exception('boom') self.bad_plugin = Mock(name='snmptrapd plugin which is bad') self.bad_plugin.strip.return_value = 'nav.snmptrapd.handlers.bad_plugin' self.bad_plugin.initialize=raise_exception self.patcher = patch.dict(sys.modules, { 'nav.snmptrapd.handlers.foo': self.plugin_a, 'nav.snmptrapd.handlers.bar': self.plugin_b, 'nav.snmptrapd.handlers.bad_plugin': self.bad_plugin, 'nav.snmptrapd.handlers.non_existent': None }) self.patcher.start()
def test_bouncer_env(self): """ Validate the environment variable loader works as expected. """ # Test it doesn't leak with patch.dict('lambada.os.environ', dict(BLAH='asdf'), clear=True): config = lambada.get_config_from_env() self.assertFalse(config) # Test it works with default with patch.dict( 'lambada.os.environ', dict(BOUNCER_BLAH='asdf'), clear=True ): config = lambada.get_config_from_env() self.assertDictEqual(config, dict(blah='asdf')) # Test that we can override the prefix with patch.dict( 'lambada.os.environ', dict(STUFF_BLAH='asdf', STUFFS_BLAH='jkl;'), clear=True ): config = lambada.get_config_from_env('STUFF_') self.assertDictEqual(config, dict(blah='asdf'))
def test_dancer_class(self): """ Verify the dancer class constructor, props, and call. """ dancer = lambada.Dancer(self.sample_dancer, 'foo', 'bar', memory=128) # Confirm __init__ self.assertEqual(dancer.name, 'foo') self.assertEqual(dancer.description, 'bar') self.assertDictEqual(dancer.override_config, dict(memory=128)) # Validate config merge property self.assertDictEqual( dancer.config, dict(name='foo', description='bar', memory=128) ) self.assertEqual((1, 2), dancer(1, 2))
def test_keeptemp_via_env_variable(): if os.environ.get('NICEMAN_TESTS_KEEPTEMP'): raise SkipTest("We have env variable set to preserve tempfiles") files = [] @with_tempfile() def check(f): open(f, 'w').write("LOAD") files.append(f) with patch.dict('os.environ', {}): check() with patch.dict('os.environ', {'NICEMAN_TESTS_KEEPTEMP': '1'}): check() eq_(len(files), 2) ok_(not exists(files[0]), msg="File %s still exists" % files[0]) ok_( exists(files[1]), msg="File %s not exists" % files[1]) rmtemp(files[-1])
def test_skip_if_no_network(): cleaned_env = os.environ.copy() cleaned_env.pop('NICEMAN_TESTS_NONETWORK', None) # we need to run under cleaned env to make sure we actually test in both conditions with patch('os.environ', cleaned_env): @skip_if_no_network def somefunc(a1): return a1 eq_(somefunc.tags, ['network']) with patch.dict('os.environ', {'NICEMAN_TESTS_NONETWORK': '1'}): assert_raises(SkipTest, somefunc, 1) with patch.dict('os.environ', {}): eq_(somefunc(1), 1) # and now if used as a function, not a decorator with patch.dict('os.environ', {'NICEMAN_TESTS_NONETWORK': '1'}): assert_raises(SkipTest, skip_if_no_network) with patch.dict('os.environ', {}): eq_(skip_if_no_network(), None)
def test_load_settings_from_env(tmpdir): with patch.dict('os.environ', {}) as e: load_settings_from_env(str(tmpdir)) assert os.environ.get('FOO', None) is None p = tmpdir.join(".env") p.write("FOO = bar") with patch.dict('os.environ', {}) as e: load_settings_from_env(str(tmpdir)) assert os.environ.get('FOO', None) == 'bar'
def test_handle_content_type_json(self, mocklast, mockjsonify, mockrender, mockrequest): fake_d = {'Content-Type': 'application/json'} mockrequest.headers.__getitem__.side_effect = fake_d.__getitem__ mockrequest.headers.get.side_effect = fake_d.get mockrequest.headers.__iter__.side_effect = fake_d.__iter__ mockjsonify.side_effect = myjsonify res = util.handle_content_type(dict(template='example.html')) err_msg = "template key should exist" assert res.get('template') == 'example.html', err_msg err_msg = "jsonify should be called" assert mockjsonify.called, err_msg