我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用unittest.mock()。
def tests_event_registration(self): """Tests that events register correctly.""" # Get the event controller events = self.abode.events self.assertIsNotNone(events) # Create mock callback callback = Mock() # Test that a valid event registers self.assertTrue( events.add_event_callback(TIMELINE.ALARM_GROUP, callback)) # Test that no event group returns false self.assertFalse(events.add_event_callback(None, callback)) # Test that an invalid event throws exception with self.assertRaises(abodepy.AbodeException): events.add_event_callback("lol", callback)
def tests_timeline_registration(self): """Tests that timeline events register correctly.""" # Get the event controller events = self.abode.events self.assertIsNotNone(events) # Create mock callback callback = Mock() # Test that a valid timeline event registers self.assertTrue( events.add_timeline_callback( TIMELINE.CAPTURE_IMAGE, callback)) # Test that no timeline event returns false self.assertFalse(events.add_timeline_callback(None, callback)) # Test that an invalid timeline event string throws exception with self.assertRaises(abodepy.AbodeException): events.add_timeline_callback("lol", callback) # Test that an invalid timeline event dict throws exception with self.assertRaises(abodepy.AbodeException): events.add_timeline_callback({"lol": "lol"}, callback)
def tests_multi_events_callback(self): """Tests that multiple event updates callback correctly.""" # Get the event controller events = self.abode.events self.assertIsNotNone(events) # Create mock callback callback = Mock() # Register our events self.assertTrue( events.add_event_callback( [TIMELINE.ALARM_GROUP, TIMELINE.CAPTURE_GROUP], callback)) # Call our events callback method and trigger a capture group event # pylint: disable=protected-access event_json = json.loads(IRCAMERA.timeline_event()) events._on_timeline_update(event_json) # Ensure our callback was called callback.assert_called_with(event_json)
def tests_multi_timeline_callback(self): """Tests that multiple timeline updates callback correctly.""" # Get the event controller events = self.abode.events self.assertIsNotNone(events) # Create mock callback callback = Mock() # Register our events self.assertTrue( events.add_timeline_callback( [TIMELINE.CAPTURE_IMAGE, TIMELINE.OPENED], callback)) # Call our events callback method and trigger a capture group event # pylint: disable=protected-access event_json = json.loads(IRCAMERA.timeline_event()) events._on_timeline_update(event_json) # Ensure our callback was called callback.assert_called_with(event_json)
def tests_automations_callback(self): """Tests that automation updates callback correctly.""" # Get the event controller events = self.abode.events self.assertIsNotNone(events) # Create mock callbacks automation_callback = Mock() # Register our events self.assertTrue( events.add_event_callback( TIMELINE.AUTOMATION_EDIT_GROUP, automation_callback)) # Call our events callback method and trigger a capture group event # pylint: disable=protected-access events._on_automation_update('{}') # Our capture callback should get one, but our alarm should not automation_callback.assert_called_with('{}')
def test_subscription_recording_processor_for_request(self): replaced_subscription_id = str(uuid.uuid4()) rp = SubscriptionRecordingProcessor(replaced_subscription_id) uri_templates = ['https://management.azure.com/subscriptions/{}/providers/Microsoft.ContainerRegistry/' 'checkNameAvailability?api-version=2017-03-01', 'https://graph.windows.net/{}/applications?api-version=1.6'] for template in uri_templates: mock_sub_id = str(uuid.uuid4()) mock_request = mock.Mock() mock_request.uri = template.format(mock_sub_id) mock_request.body = self._mock_subscription_request_body(mock_sub_id) rp.process_request(mock_request) self.assertEqual(mock_request.uri, template.format(replaced_subscription_id)) self.assertEqual(mock_request.body, self._mock_subscription_request_body(replaced_subscription_id))
def setUp(self): # disable logging while testing logging.disable(logging.CRITICAL) self.patched = {} if hasattr(self, 'patch_these'): for patch_this in self.patch_these: namespace = patch_this[0] if isinstance(patch_this, (list, set)) else patch_this patcher = mock.patch(namespace) mocked = patcher.start() mocked.reset_mock() self.patched[namespace] = mocked if isinstance(patch_this, (list, set)) and len(patch_this) > 0: retval = patch_this[1] if callable(retval): retval = retval() mocked.return_value = retval
def test_reset_kill_reconnect(self): client = AprsClient(aprs_user='testuser', aprs_filter='') client.connect() # .run() should be allowed to execute after .connect() mock_callback = mock.MagicMock( side_effect=lambda raw_msg: client.disconnect()) self.assertFalse(client._kill) client.run(callback=mock_callback, autoreconnect=True) # After .disconnect(), client._kill should be True self.assertTrue(client._kill) self.assertEqual(mock_callback.call_count, 1) # After we reconnect, .run() should be able to run again mock_callback.reset_mock() client.connect() client.run(callback=mock_callback, autoreconnect=True) self.assertEqual(mock_callback.call_count, 1)
def test_normal_execution(self, mock_isfile, mock_glob): """ Test the normal behaviour of the function. """ # Set the mocked function returned values. mock_isfile.side_effect = [True] mock_glob.return_value = ["/my/path/mock_output"] # Test execution fslreorient2std(**self.kwargs) self.assertEqual([ mock.call(["which", "fslreorient2std"], env={}, stderr=-1, stdout=-1), mock.call(["fslreorient2std", self.kwargs["input_image"], self.kwargs["output_image"]], cwd=None, env={}, stderr=-1, stdout=-1)], self.mock_popen.call_args_list) self.assertEqual(len(self.mock_env.call_args_list), 1)
def mock_ioctl(fd, command, msg): print("Mocking ioctl") assert fd == MOCK_FD assert command is not None # Reproduce ioctl read operations if command == I2C_SMBUS and msg.read_write == I2C_SMBUS_READ: offset = msg.command if msg.size == I2C_SMBUS_BYTE_DATA: msg.data.contents.byte = test_buffer[offset] elif msg.size == I2C_SMBUS_WORD_DATA: msg.data.contents.word = test_buffer[offset+1]*256 + test_buffer[offset] elif msg.size == I2C_SMBUS_I2C_BLOCK_DATA: for k in range(msg.data.contents.byte): msg.data.contents.block[k+1] = test_buffer[offset+k] # Override open, close and ioctl with our mock functions
def test_checkOtp(self, mock): reply = {"requestId": "123", "status": "0000", "statusMessage": "Success", "credentialId": "", "credentialType": "STANDARD_OTP", "authContext": {"params": {"Key": "authLevel.level", "Value": 1}}} mock.SymantecUserServices.checkOtp.return_value = Mock() mock.checkOtp.return_value.hash.return_value = reply response = symantec_package.lib.userService.SymantecUserServices.checkOut("checkOutyou") self.assertTrue(response.hash() != reply) response = symantec_package.lib.userService.SymantecUserServices.checkOtp("PARAMS") self.assertTrue((response.hash()) == reply) self.assertTrue(response.hash()["status"] == "0000") self.assertTrue(response.hash()['requestId'] == "123") self.assertTrue(response.hash()['statusMessage'] == "Success") self.assertTrue(response.hash()['credentialId'] == "") self.assertTrue(response.hash()['credentialType'] == "STANDARD_OTP") self.assertTrue(response.hash()['authContext']['params']['Key'] == "authLevel.level") self.assertTrue(response.hash()['authContext']['params']['Value'] == 1) pass
def test_poll_in_Push(self, mock): reply = {"requestId": "ac123", "status": "6040", "statusMessage": "Mobile push request sent", "pushDetail": {"pushCredentialId": "133709001", "pushSent": True}, "transactionId": "RealTransactionId", "authContext": {"params": {"Key": "authLevel.level", "Value": 10}}} mock.SymantecServices.authenticateUserWithPushThenPolling.return_value = Mock() mock.authenticateUserWithPushThenPolling.return_value.hash.return_value = reply response = symantec_package.lib.allServices.SymantecServices.authenticateUserWithNothing() self.assertTrue(response.hash() != reply) response = symantec_package.lib.allServices.SymantecServices.authenticateUserWithPushThenPolling("Parameters Here!") self.assertTrue((response.hash()) == reply) self.assertTrue(response.hash()["status"] == "6040") self.assertTrue(response.hash()['requestId'] == "ac123") self.assertTrue(response.hash()['statusMessage'] == "Mobile push request sent") self.assertTrue(response.hash()["pushDetail"]['pushCredentialId'] == "133709001") self.assertTrue(response.hash()["pushDetail"]['pushSent'] is True) self.assertTrue(response.hash()['transactionId'] == "RealTransactionId") self.assertTrue(response.hash()['authContext']['params']['Key'] == "authLevel.level") self.assertTrue(response.hash()['authContext']['params']['Value'] == 10) pass
def test_mock_create_user(self, mock_managementservices): reply = {'requestId': 'create_123', 'status': '0000', 'statusMessage': 'Success'} # Configure the mock to return a response with an OK status code. Also, the mock should have # a `json()` method that returns a list of todos. mock_managementservices.createUser.return_value = Mock() mock_managementservices.createUser.return_value.json.return_value = reply # Call the service, which will send a request to the server. response = symantec_package.lib.managementService.SymantecManagementServices.createUser("create_123", "new_user3") print(response.json()) # If the request is sent successfully, then I expect a response to be returned. self.assertTrue((response.json()) == reply) self.assertTrue((response.json()["status"] == "0000")) pass
def test_mock_add_STANDARDOTP_credential(self, mock_managementservices): reply = {'statusMessage': "Success", 'requestId': 'add_otp_cred', 'status': '0000'} # Configure the mock to return a response with an OK status code. Also, the mock should have # a `json()` method that returns a list of todos. mock_managementservices.addCredentialOtp.return_value = Mock() mock_managementservices.addCredentialOtp.return_value.json.return_value = reply response = symantec_package.lib.managementService.SymantecManagementServices.addCredentialOtp("add_otp_cred", "new_user3", "", "STANDARD_OTP", \ "678066") # change with what's on your device print(response.json()) # If the request is sent successfully, then I expect a response to be returned. self.assertTrue((response.json()) == reply) self.assertTrue((response.json()["status"] == "0000")) pass
def test_mock_update_STANDARDOTP_credential(self, mock_managementservices): reply = {'statusMessage': 'Success', 'requestId': 'update_123', 'status': '0000'} # Configure the mock to return a response with an OK status code. Also, the mock should have # a `json()` method that returns a list of todos. mock_managementservices.updateCredential.return_value = Mock() mock_managementservices.updateCredential.return_value.json.return_value = reply response = symantec_package.lib.managementService.SymantecManagementServices.updateCredential("update_123", "gabe_phone", "", "STANDARD_OTP", "My personal cell phone") print(response.json()) # If the request is sent successfully, then I expect a response to be returned. self.assertTrue((response.json()) == reply) self.assertTrue((response.json()["status"] == "0000")) pass
def test_mock_setTemporaryPasswordSMSDelivery(self, mock_managementservices): reply = {'status': '0000', 'requestId': 'setTempPWD', 'statusMessage': 'Success', 'temporaryPassword': '998241'} # Configure the mock to return a response with an OK status code. Also, the mock should have # a `json()` method that returns a list of todos. mock_managementservices.setTemporaryPasswordSMSDelivery.return_value = Mock() mock_managementservices.setTemporaryPasswordSMSDelivery.return_value.json.return_value = reply response = symantec_package.lib.managementService.SymantecManagementServices.setTemporaryPasswordSMSDelivery("setTempPWD", "gabe_phone", "12313608781", "17879481605") print(response.json()) # If the request is sent successfully, then I expect a response to be returned. self.assertTrue((response.json()) == reply) self.assertTrue((response.json()["status"] == "0000")) pass
def request_callback(self, request): logger.debug('Mock request {} {}'.format(request.method, request.url)) path = self.build_path(request.method, request.url) if os.path.exists(path): # Load local file logger.info('Using mock file {}'.format(path)) with gzip.open(path, 'rb') as f: response = pickle.load(f) else: # Build from actual request logger.info('Building mock file {}'.format(path)) response = self.real_request(request) # Save in local file for future use with gzip.open(path, 'wb') as f: # Use old pickle ascii protocol (default) # to be compatible with Python 2 f.write(pickle.dumps(response, protocol=2)) return ( response['status'], response['headers'], response['body'], )
def test_mkdir_p(self, path_mocker_stopall): # mock mock_logger_info = path_mocker_stopall.MagicMock(name="mock_logger_info") mock_dir_exists = path_mocker_stopall.MagicMock(name="mock_dir_exists") mock_path = path_mocker_stopall.MagicMock(name="mock_path") # patch path_mocker_stopall.patch.object(scarlett_os.subprocess.logging.Logger, 'info', mock_logger_info) path_mocker_stopall.patch.object(scarlett_os.internal.path, 'dir_exists', mock_dir_exists) path_mocker_stopall.patch.object(scarlett_os.internal.path, 'Path', mock_path) path = '/home/pi/dev/bossjones-github/scarlett_os/_debug' mock_dir_exists.return_value = True # run test s_path.mkdir_p(path) # assert assert mock_logger_info.call_count == 1 mock_path.assert_called_once_with(path) # from scarlett_os.internal.debugger import dump mock_path().mkdir.assert_any_call(parents=True, exist_ok=True) mock_logger_info.assert_any_call("Verify mkdir_p ran: {}".format(mock_dir_exists.return_value))
def test_dir_exists_false(self, path_mocker_stopall): # mock mock_logger_error = path_mocker_stopall.MagicMock(name="mock_logger_error") mock_path = path_mocker_stopall.MagicMock(name="mock_path") # patch path_mocker_stopall.patch.object(scarlett_os.subprocess.logging.Logger, 'error', mock_logger_error) path_mocker_stopall.patch.object(scarlett_os.internal.path, 'Path', mock_path) path = '/home/pi/dev/bossjones-github/scarlett_os/_debug' mock_path_instance = mock_path() mock_path_instance.is_dir.return_value = False # run test s_path.dir_exists(path) # assert assert mock_logger_error.call_count == 1 assert mock_path_instance.is_dir.call_count == 2 mock_logger_error.assert_any_call("This is not a dir: {}".format(path))
def test_dir_exists_true(self, path_mocker_stopall): # mock mock_logger_error = path_mocker_stopall.MagicMock(name="mock_logger_error") mock_path = path_mocker_stopall.MagicMock(name="mock_path") # patch path_mocker_stopall.patch.object(scarlett_os.subprocess.logging.Logger, 'error', mock_logger_error) path_mocker_stopall.patch.object(scarlett_os.internal.path, 'Path', mock_path) path = '/home/pi/dev/bossjones-github/scarlett_os/_debug' mock_path_instance = mock_path() # mock_path_instance.is_dir.return_value = True # run test s_path.dir_exists(path) # assert assert mock_logger_error.call_count == 0 assert mock_path_instance.is_dir.call_count == 2 mock_logger_error.assert_not_called()
def test_mkdir_if_does_not_exist_false(self, path_mocker_stopall): # mock mock_mkdir_p = path_mocker_stopall.MagicMock(name="mock_mkdir_p") mock_dir_exists = path_mocker_stopall.MagicMock(name="mock_dir_exists", return_value=False) # patch path_mocker_stopall.patch.object(scarlett_os.internal.path, 'mkdir_p', mock_mkdir_p) path_mocker_stopall.patch.object(scarlett_os.internal.path, 'dir_exists', mock_dir_exists) path = '/home/pi/dev/bossjones-github/scarlett_os/_debug' # run test result = s_path.mkdir_if_does_not_exist(path) # assert assert mock_mkdir_p.call_count == 1 assert mock_dir_exists.call_count == 1 assert result == True
def test_mkdir_if_does_not_exist_true(self, path_mocker_stopall): # mock mock_mkdir_p = path_mocker_stopall.MagicMock(name="mock_mkdir_p") mock_dir_exists = path_mocker_stopall.MagicMock(name="mock_dir_exists", return_value=True) # patch path_mocker_stopall.patch.object(scarlett_os.internal.path, 'mkdir_p', mock_mkdir_p) path_mocker_stopall.patch.object(scarlett_os.internal.path, 'dir_exists', mock_dir_exists) path = '/home/pi/dev/bossjones-github/scarlett_os/_debug' # run test result = s_path.mkdir_if_does_not_exist(path) # assert assert mock_mkdir_p.call_count == 0 assert mock_dir_exists.call_count == 1 assert result == False
def test_fname_exists_false(self, path_mocker_stopall): # mock mock_path = path_mocker_stopall.MagicMock(name="mock_path") # patch path_mocker_stopall.patch.object(scarlett_os.internal.path, 'Path', mock_path) path = '/home/pi/dev/bossjones-github/scarlett_os/_debug/generator.dot' mock_path_instance = mock_path() mock_path_instance.exists.return_value = False # run test result = s_path.fname_exists(path) assert mock_path_instance.exists.call_count == 1 mock_path_instance.exists.assert_called_once_with() mock_path.assert_any_call(path) assert result == False
def test_dir_isWritable(self, path_mocker_stopall): # mock mock_os_access = path_mocker_stopall.MagicMock(name="mock_os_access") mock_os_path_isdir = path_mocker_stopall.MagicMock(name="mock_os_path_isdir") # patch path_mocker_stopall.patch.object(scarlett_os.internal.path.os, 'access', mock_os_access) path_mocker_stopall.patch.object(scarlett_os.internal.path.os.path, 'isdir', mock_os_path_isdir) path = 'file:///tmp' # patch return values mock_os_path_isdir.return_value = True mock_os_access.return_value = True # run test result = s_path.isWritable(path) # tests mock_os_path_isdir.assert_called_once_with('file:///tmp') mock_os_access.assert_called_once_with('file:///tmp', os.W_OK) assert result == True
def test_unicode_decode_error_isWritable(self, path_mocker_stopall): # mock mock_os_path_isdir = path_mocker_stopall.MagicMock(name="mock_os_path_isdir") mock_os_access = path_mocker_stopall.MagicMock(name="mock_os_access") mock_unicode_error_dialog = path_mocker_stopall.MagicMock(name="mock_unicode_error_dialog") mock_logger_error = path_mocker_stopall.MagicMock(name="mock_logger_error") # patch path_mocker_stopall.patch.object(scarlett_os.internal.path.os.path, 'isdir', mock_os_path_isdir) path_mocker_stopall.patch.object(scarlett_os.internal.path.os, 'access', mock_os_access) path_mocker_stopall.patch.object(scarlett_os.internal.path, 'unicode_error_dialog', mock_unicode_error_dialog) path_mocker_stopall.patch.object(scarlett_os.subprocess.logging.Logger, 'error', mock_logger_error) path = b'file:///tmp/fake_file' # patch return values mock_os_path_isdir.side_effect = UnicodeDecodeError('', b'', 1, 0, '') s_path.isWritable(path) assert mock_unicode_error_dialog.call_count == 1
def test_unicode_error_dialog(self, path_mocker_stopall): # mock mock_logger_error = path_mocker_stopall.MagicMock(name="mock_logger_error") # patch path_mocker_stopall.patch.object(scarlett_os.subprocess.logging.Logger, 'error', mock_logger_error) s_path.unicode_error_dialog() assert mock_logger_error.call_count == 1 _message = _("The system's locale that you are using is not UTF-8 capable. " "Unicode support is required for Python3 software like Pitivi. " "Please correct your system settings; if you try to use Pitivi " "with a broken locale, weird bugs will happen.") mock_logger_error.assert_any_call(_message)
def __get_result_set_from_mock(mock, query): result_wrapper = Mock() influx_points = [] result = re.search(r"(\d{19}).*?(\d{19})", query) start, end = result.groups() # extract range points = list(filter(lambda point: point["time"] > int(start) and point["time"] < int(end), mock.points)) country_result = re.search(r"\"country\"=\'(\w+)\'", query) if country_result: country = country_result.groups()[0] points = list(filter(lambda point: point["tags"]["country"] == country, points)) for point in points: d = {**point["fields"], **point.get("tags", {})} d["time"] = datetime.utcfromtimestamp(point["time"] // 1000000000).strftime('%Y-%m-%dT%H:%M:%SZ') influx_points.append(d) result_wrapper.get_points.return_value = influx_points return result_wrapper
def test_get_all(self): with patch( 'pynetbox.lib.query.requests.get', return_value=Response(fixture='{}/{}.json'.format( self.app, self.name )) ) as mock: ret = getattr(nb, self.name).all() self.assertTrue(ret) self.assertTrue(isinstance(ret, list)) self.assertTrue(isinstance(ret[0], self.ret)) mock.assert_called_with( 'http://localhost:8000/api/{}/{}/'.format( self.app, self.name.replace('_', '-') ), headers=HEADERS )
def test_filter(self): with patch( 'pynetbox.lib.query.requests.get', return_value=Response(fixture='{}/{}.json'.format( self.app, self.name )) ) as mock: ret = getattr(nb, self.name).filter(pk=1) self.assertTrue(ret) self.assertTrue(isinstance(ret, list)) self.assertTrue(isinstance(ret[0], self.ret)) mock.assert_called_with( 'http://localhost:8000/api/{}/{}/?pk=1'.format( self.app, self.name.replace('_', '-') ), headers=HEADERS )
def test_get(self): with patch( 'pynetbox.lib.query.requests.get', return_value=Response(fixture='{}/{}.json'.format( self.app, self.name[:-1] )) ) as mock: ret = getattr(nb, self.name).get(1) self.assertTrue(ret) self.assertTrue(isinstance(ret, self.ret)) self.assertTrue(isinstance(str(ret), str)) self.assertTrue(isinstance(dict(ret), dict)) mock.assert_called_with( 'http://localhost:8000/api/{}/{}/1/'.format( self.app, self.name.replace('_', '-') ), headers=HEADERS )
def test_delete(self): with patch( 'pynetbox.lib.query.requests.get', return_value=Response(fixture='{}/{}.json'.format( self.app, self.name[:-1] )) ) as mock, patch('pynetbox.lib.query.requests.delete') as delete: ret = getattr(nb, self.name).get(1) self.assertTrue(ret.delete()) mock.assert_called_with( 'http://localhost:8000/api/{}/{}/1/'.format( self.app, self.name.replace('_', '-') ), headers=HEADERS ) delete.assert_called_with( 'http://localhost:8000/api/{}/{}/1/'.format( self.app, self.name.replace('_', '-') ), headers=AUTH_HEADERS )
def test_create_device_bulk(self, mock): data = [ { 'name': 'test-device', 'site': 1, 'device_type': 1, 'device_role': 1, }, { 'name': 'test-device1', 'site': 1, 'device_type': 1, 'device_role': 1, }, ] ret = nb.devices.create(data) self.assertTrue(ret) self.assertTrue(len(ret), 2)
def test_get(self): with patch( 'pynetbox.lib.query.requests.get', return_value=Response(fixture='{}/{}.json'.format( self.app, self.name[:-1] )) ) as mock: ret = getattr(nb, self.name).get(1) self.assertTrue(ret) self.assertTrue(isinstance(ret, self.ret)) mock.assert_called_with( 'http://localhost:8000/api/{}/{}/1/'.format( self.app, self.name.replace('_', '-') ), headers=HEADERS )
def test_update_no_active_player_makes_next_active(self): # Setup, add some players to waiting list. laser_players = players.Players(1) laser_players.add_player('192.168.0.1') # Check none are yet active. self.assertEqual(None, laser_players.active_player()) # Setup, create mock callbacks that will remember how they were called. start_active = unittest.mock.Mock() end_active = unittest.mock.Mock() # Call update with 1 second of time elapsed. laser_players.update(0.5, start_active, end_active) # Check start active was called with the first in line IP, and that the # currently active player is the first one added (with 1 second of playtime # left). start_active.assert_called_once_with('192.168.0.1') end_active.assert_not_called() self.assertEqual(('192.168.0.1', 1), laser_players.active_player())
def test_update_moves_to_next_after_playetime_elapsed(self): # Setup, add two players and make the first active. laser_players = players.Players(1) laser_players.add_player('192.168.0.1') laser_players.add_player('192.168.0.2') start_active = unittest.mock.Mock() end_active = unittest.mock.Mock() laser_players.update(1, start_active, end_active) start_active.reset_mock() end_active.reset_mock() # Update with two seconds of time so the first player time is up, then # check next player is active. laser_players.update(2, start_active, end_active) end_active.assert_called_once_with('192.168.0.1') start_active.assert_called_once_with('192.168.0.2') self.assertEqual(('192.168.0.2', 1), laser_players.active_player())
def test_uploading_syslog(self): '''Tests syslog upload command''' syslog_uploader_cli = ["syslog_uploader", "-c", self._ndr_config_file, TEST_SYSLOG_DATA] with unittest.mock.patch.object(sys, 'argv', syslog_uploader_cli): ndr.tools.syslog_uploader.main() # Make sure there's only one file in the queue outbound_queue = os.listdir(self._ncc.outgoing_upload_spool) self.assertEqual(len(outbound_queue), 1) this_msg = self._ncc.outgoing_upload_spool + "/" + outbound_queue[0] loaded_msg = ndr.IngestMessage.verify_and_load_message( self._ncc, this_msg, only_accept_cn="ndr_test_suite") os.remove(this_msg) self.assertEqual(loaded_msg.message_type, ndr.IngestMessageTypes.SYSLOG_UPLOAD) syslog = ndr.SyslogUploadMessage().from_message(loaded_msg)
def test_uploading_status(self): '''Tests uploading status messages''' status_uploader_cli = ["status_uploader", "-c", self._ndr_config_file] with unittest.mock.patch.object(sys, 'argv', status_uploader_cli): ndr.tools.status.main() # Make sure there's only one file in the queue outbound_queue = os.listdir(self._ncc.outgoing_upload_spool) self.assertEqual(len(outbound_queue), 1) this_msg = self._ncc.outgoing_upload_spool + "/" + outbound_queue[0] loaded_msg = ndr.IngestMessage.verify_and_load_message( self._ncc, this_msg, only_accept_cn="ndr_test_suite") os.remove(this_msg) self.assertEqual(loaded_msg.message_type, ndr.IngestMessageTypes.STATUS)
def test_alert_tester(self): '''Tests alert tester messages''' alert_tester_cli = ["alert_tester", "-c", self._ndr_config_file] with unittest.mock.patch.object(sys, 'argv', alert_tester_cli): ndr.tools.alert_tester.main() # Make sure there's only one file in the queue outbound_queue = os.listdir(self._ncc.outgoing_upload_spool) self.assertEqual(len(outbound_queue), 1) this_msg = self._ncc.outgoing_upload_spool + "/" + outbound_queue[0] loaded_msg = ndr.IngestMessage.verify_and_load_message( self._ncc, this_msg, only_accept_cn="ndr_test_suite") os.remove(this_msg) self.assertEqual(loaded_msg.message_type, ndr.IngestMessageTypes.TEST_ALERT)
def test_pcap_processing(self): '''Tests uploading and processing a pcap message''' alert_tester_cli = ["pcap_processor", "-k", "-c", self._ndr_config_file, TSHARK_PCAP] with unittest.mock.patch.object(sys, 'argv', alert_tester_cli): ndr.tools.pcap_to_traffic_report.main() # Make sure there's only one file in the queue outbound_queue = os.listdir(self._ncc.outgoing_upload_spool) self.assertEqual(len(outbound_queue), 1) this_msg = self._ncc.outgoing_upload_spool + "/" + outbound_queue[0] loaded_msg = ndr.IngestMessage.verify_and_load_message( self._ncc, this_msg, only_accept_cn="ndr_test_suite") os.remove(this_msg) self.assertEqual(loaded_msg.message_type, ndr.IngestMessageTypes.TRAFFIC_REPORT)
def test_recursive_query_basic_failure(self): resolver = dns.resolver.Resolver() domain = dns.name.from_text('example.com.') record_type = 'NS' with unittest.mock.patch.object(fierce, 'query', return_value=None) as mock_method: result = fierce.recursive_query(resolver, domain, record_type=record_type) expected = [ unittest.mock.call(resolver, 'example.com.', record_type), unittest.mock.call(resolver, 'com.', record_type), unittest.mock.call(resolver, '', record_type), ] mock_method.assert_has_calls(expected) self.assertIsNone(result)
def test_recursive_query_long_domain_failure(self): resolver = dns.resolver.Resolver() domain = dns.name.from_text('sd1.sd2.example.com.') record_type = 'NS' with unittest.mock.patch.object(fierce, 'query', return_value=None) as mock_method: result = fierce.recursive_query(resolver, domain, record_type=record_type) expected = [ unittest.mock.call(resolver, 'sd1.sd2.example.com.', record_type), unittest.mock.call(resolver, 'sd2.example.com.', record_type), unittest.mock.call(resolver, 'example.com.', record_type), unittest.mock.call(resolver, 'com.', record_type), unittest.mock.call(resolver, '', record_type), ] mock_method.assert_has_calls(expected) self.assertIsNone(result)
def test_recursive_query_basic_success(self): resolver = dns.resolver.Resolver() domain = dns.name.from_text('example.com.') record_type = 'NS' good_response = unittest.mock.MagicMock() side_effect = [ None, good_response, None, ] with unittest.mock.patch.object(fierce, 'query', side_effect=side_effect) as mock_method: result = fierce.recursive_query(resolver, domain, record_type=record_type) expected = [ unittest.mock.call(resolver, 'example.com.', record_type), unittest.mock.call(resolver, 'com.', record_type), ] mock_method.assert_has_calls(expected) self.assertEqual(result, good_response)