我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用uuid.uuid4()。
def session_id(self): """A unique session ID every time the user uses the workflow. .. versionadded:: 1.25 The session ID persists while the user is using this workflow. It expires when the user runs a different workflow or closes Alfred. """ if not self._session_id: sid = os.getenv('_WF_SESSION_ID') if not sid: from uuid import uuid4 sid = uuid4().hex self.setvar('_WF_SESSION_ID', sid) self._session_id = sid return self._session_id
def test_share_target_is_fetched_if_no_target_found(self, mock_retrieve): entity = base.Share( guid=str(uuid4()), handle=self.remote_profile.handle, target_guid="notexistingguid", target_handle=self.remote_profile2.handle, public=True, ) mock_retrieve.return_value = entities.PostFactory( guid=entity.target_guid, handle=self.remote_profile2.handle, ) process_entity_share(entity, self.remote_profile) mock_retrieve.assert_called_once_with(entity.target_id, sender_key_fetcher=sender_key_fetcher) self.assertTrue(Content.objects.filter(guid=entity.target_guid, content_type=ContentType.CONTENT).exists()) self.assertTrue( Content.objects.filter( guid=entity.guid, share_of__guid=entity.target_guid, content_type=ContentType.SHARE ).exists() )
def save(self, *args, **kwargs): if self.parent and self.share_of: raise ValueError("Can't be both a reply and a share!") self.cache_data() if self.parent: self.content_type = ContentType.REPLY # Ensure replies have sane values self.visibility = self.parent.visibility self.pinned = False elif self.share_of: self.content_type = ContentType.SHARE if not self.pk: if not self.guid: self.guid = uuid4() if self.pinned: max_order = Content.objects.top_level().filter(author=self.author).aggregate(Max("order"))["order__max"] if max_order is not None: # If max_order is None, there is likely to be no content yet self.order = max_order + 1 self.fix_local_uploads() super().save(*args, **kwargs) self.cache_related_object_data()
def define_services(config): """Define the service settings for the current app. Arguments: config (:py:class:`list`): The service configuration required. Returns: :py:class:`collections.OrderedDict`: Configured services. Raises: :py:class:`ValueError`: If a non-existent service is requested. """ services = OrderedDict() for settings in config: name = settings['name'] if name not in SERVICES: logger.warning('unknown service %r', name) continue services[uuid4().hex] = SERVICES[name].from_config(**settings) return services
def test_as_dict(self): topic_1 = str(uuid.uuid4()) partition_1 = random.randint(0, 1024) until_offset_1 = random.randint(0, sys.maxsize) from_offset_1 = random.randint(0, sys.maxsize) app_name_1 = str(uuid.uuid4()) offset_spec = OffsetSpec( app_name=app_name_1, topic=topic_1, partition=partition_1, from_offset=from_offset_1, until_offset=until_offset_1) offset_spec_dict = JSONOffsetSpecs.as_dict(offset_spec) self.assertions_on_offset( used_value={ "topic": topic_1, "partition": partition_1, "app_name": app_name_1, "from_offset": from_offset_1, "until_offset": until_offset_1}, offset_value=offset_spec_dict)
def _fake_vif(cls=osv_vif.VIFOpenVSwitch): vif = cls( id=uuid.uuid4(), vif_name='h_interface', bridge_name='bridge', address='3e:94:b7:31:a0:83', port_profile=osv_objects.vif.VIFPortProfileOpenVSwitch( interface_id='89eccd45-43e9-43d8-b4cc-4c13db13f782', profile_id=str(uuid.uuid4()), ), ) vif.network = osv_objects.network.Network(id=uuid.uuid4(), mtu=1) subnet = osv_objects.subnet.Subnet( uuid=uuid.uuid4(), dns=['192.168.0.1'], cidr='192.168.0.0/24', gateway='192.168.0.1', routes=osv_objects.route.RouteList(objects=[]), ) subnet.ips = osv_objects.fixed_ip.FixedIPList(objects=[]) subnet.ips.objects.append( osv_objects.fixed_ip.FixedIP(address='192.168.0.2')) vif.network.subnets.objects.append(subnet) return vif
def setUp(self): super(TestDriverMixin, self).setUp() self.instance_info = osv_objects.instance_info.InstanceInfo( uuid=uuid.uuid4(), name='foo') self.ifname = 'c_interface' self.netns = '/proc/netns/1234' # Mock IPDB context managers self.ipdbs = {} self.m_bridge_iface = mock.Mock(__exit__=mock.Mock()) self.m_c_iface = mock.Mock() self.m_h_iface = mock.Mock() self.h_ipdb, self.h_ipdb_exit = self._mock_ipdb_context_manager(None) self.c_ipdb, self.c_ipdb_exit = self._mock_ipdb_context_manager( self.netns) self.m_create = mock.Mock() self.c_ipdb.create = mock.Mock( return_value=mock.Mock( __enter__=mock.Mock(return_value=self.m_create), __exit__=mock.Mock()))
def execute(self): try: name = self.params['name'] guid = uuid.uuid4() query = """ INSERT INTO test_table(name, guid) values(%(name)s, %(guid)s); """ query_get = """ SELECT id, name, guid from test_table where guid = %(guid)s; """ ins = await self.app.db.execute('test_db', query, {'name': name, 'guid': guid}, 'insert') _data = await self.app.db.execute('test_db', query_get, {'guid':guid}, 'select') self.result = [dict(d) for d in _data] except Exception as e: self.errors.append({ 'code': 502, 'message': '{}'.format(e) }) return self.result
def slack(text: hug.types.text): """Returns JSON containing an attachment with an image url for the Slack integration""" title = text if text == 'top250': top250_res = requests.get(IMDB_URL + '/chart/toptv', headers={'Accept-Language': 'en'}) top250_page = html.fromstring(top250_res.text) candidates = top250_page.xpath('//*[@data-caller-name="chart-top250tv"]//tr/td[2]/a') title = random.choice(candidates).text return dict( response_type='in_channel', attachments=[ dict(image_url=GRAPH_URL + f'/graph?title={quote(title)}&uuid={uuid.uuid4()}') ] )
def test_unregistered_event(self): project = self.project # force creation url = '/plugins/github/organizations/{}/webhook/'.format( project.organization.id, ) secret = 'b3002c3e321d4b7880360d397db2ccfd' OrganizationOption.objects.set_value( organization=project.organization, key='github:webhook_secret', value=secret, ) response = self.client.post( path=url, data=PUSH_EVENT_EXAMPLE, content_type='application/json', HTTP_X_GITHUB_EVENT='UnregisteredEvent', HTTP_X_HUB_SIGNATURE='sha1=98196e70369945ffa6b248cf70f7dc5e46dff241', HTTP_X_GITHUB_DELIVERY=six.text_type(uuid4()) ) assert response.status_code == 204
def test_invalid_signature_event(self): project = self.project # force creation url = '/plugins/github/organizations/{}/webhook/'.format( project.organization.id, ) secret = '2d7565c3537847b789d6995dca8d9f84' OrganizationOption.objects.set_value( organization=project.organization, key='github:webhook_secret', value=secret, ) response = self.client.post( path=url, data=PUSH_EVENT_EXAMPLE, content_type='application/json', HTTP_X_GITHUB_EVENT='push', HTTP_X_HUB_SIGNATURE='sha1=33521abeaaf9a57c2abf486e0ccd54d23cf36fec', HTTP_X_GITHUB_DELIVERY=six.text_type(uuid4()) ) assert response.status_code == 401
def test_simple(self): url = '/plugins/github/installations/webhook/' response = self.client.post( path=url, data=INSTALLATION_EVENT_EXAMPLE, content_type='application/json', HTTP_X_GITHUB_EVENT='installation', HTTP_X_HUB_SIGNATURE='sha1=348e46312df2901e8cb945616ee84ce30d9987c9', HTTP_X_GITHUB_DELIVERY=six.text_type(uuid4()) ) assert response.status_code == 204 assert Integration.objects.filter( provider='github_apps', external_id=2, name='octocat', ).exists()
def get_webhook_secret(self, organization): lock = locks.get('github:webhook-secret:{}'.format(organization.id), duration=60) with lock.acquire(): # TODO(dcramer): get_or_create would be a useful native solution secret = OrganizationOption.objects.get_value( organization=organization, key='github:webhook_secret', ) if secret is None: secret = uuid4().hex + uuid4().hex OrganizationOption.objects.set_value( organization=organization, key='github:webhook_secret', value=secret, ) return secret
def test_delete_upload_file(self, mock_open): resource_id = str(uuid.uuid4()) path = '/doesnt_exist/resources/{}/{}/{}'.format( resource_id[0:3], resource_id[3:6], resource_id[6:] ) patcher = fake_filesystem_unittest.Patcher() patcher.setUp() patcher.fs.CreateFile(path) assert os.path.exists(path) delete_local_uploaded_file(resource_id) assert not os.path.exists(path) patcher.tearDown()
def test_delete_file_not_deleted_if_resources_first(self, mock_open): resource_id = str(uuid.uuid4()) path = '/doesnt_exist/resources/{}'.format(resource_id) patcher = fake_filesystem_unittest.Patcher() patcher.setUp() patcher.fs.CreateFile(path) assert os.path.exists(path) with mock.patch('ckanext.validation.utils.get_local_upload_path', return_value=path): delete_local_uploaded_file(resource_id) assert not os.path.exists(path) assert os.path.exists('/doesnt_exist/resources') patcher.tearDown()
def test_delete_file_not_deleted_if_resources_second(self, mock_open): resource_id = str(uuid.uuid4()) path = '/doesnt_exist/resources/data/{}'.format(resource_id) patcher = fake_filesystem_unittest.Patcher() patcher.setUp() patcher.fs.CreateFile(path) assert os.path.exists(path) with mock.patch('ckanext.validation.utils.get_local_upload_path', return_value=path): delete_local_uploaded_file(resource_id) assert not os.path.exists(path) assert os.path.exists('/doesnt_exist/resources') patcher.tearDown()
def test_delete_passes_if_os_exeception(self, mock_open): resource_id = str(uuid.uuid4()) path = '/doesnt_exist/resources/{}/{}/{}'.format( resource_id[0:3], resource_id[3:6], resource_id[6:] ) patcher = fake_filesystem_unittest.Patcher() patcher.setUp() patcher.fs.CreateFile(path) assert os.path.exists(path) with mock.patch('ckanext.validation.utils.os.remove', side_effect=OSError): delete_local_uploaded_file(resource_id) patcher.tearDown()
def get_ghost_replay(self, login): replay_name = 'dedimania_{}.Replay.Gbx'.format(uuid.uuid4().hex) try: await self.instance.gbx('SaveBestGhostsReplay', login, replay_name) except: return None try: async with self.instance.storage.open('UserData/Replays/{}'.format(replay_name)) as ghost_file: return await ghost_file.read() except FileNotFoundError as e: message = '$f00Error: Dedimania requires you to have file access on the server. We can\'t fetch' \ 'the driven replay!' logger.error('Please make sure we can access the dedicated files. Configure your storage driver correctly! ' '{}'.format(str(e))) await self.instance.chat(message) raise DedimaniaException('Can\'t access replay file') except PermissionError as e: message = '$f00Error: Dedimania requires you to have file access on the server. We can\'t fetch' \ 'the driven replay because of an permission problem!' logger.error('We can\'t read files in the dedicated folder, your permissions don\'t allow us to read it! ' '{}'.format(str(e))) await self.instance.chat(message) raise DedimaniaException('Can\'t access files due to permission problems')
def sync_rings_request(self, broker_token, builders_only=False): """Request for peers to sync rings from leader. NOTE: this action must only be performed by the cluster leader. :param broker_token: token to identify sync request. :param builders_only: if False, tell peers to sync builders only (not rings). """ if not is_elected_leader(SWIFT_HA_RES): errmsg = "Leader function called by non-leader" raise SwiftProxyCharmException(errmsg) rq = self.template() rq['trigger'] = str(uuid.uuid4()) if builders_only: rq['sync-only-builders'] = 1 rq['broker-token'] = broker_token rq['broker-timestamp'] = "{:f}".format(time.time()) rq['builder-broker'] = self._hostname return rq
def notify_leader_changed(self, token): """Notify peers that leader has changed. The token passed in must be that associated with the sync we claim to have been interrupted. It will be re-used by the restored leader once it receives this notification. NOTE: this action must only be performed by the cluster leader that has relinquished it's leader status as part of the current hook context. """ if not is_elected_leader(SWIFT_HA_RES): errmsg = "Leader function called by non-leader" raise SwiftProxyCharmException(errmsg) rq = self.template() rq['trigger'] = str(uuid.uuid4()) rq[self.KEY_NOTIFY_LEADER_CHANGED] = token return rq
def notify_storage_rings_available(): """Notify peer swift-storage relations that they should synchronise ring and builder files. Note that this should only be called from the leader unit. """ if not is_elected_leader(SWIFT_HA_RES): log("Ring availability storage-relation broadcast requested by " "non-leader - skipping", level=WARNING) return hostname = get_hostaddr() hostname = format_ipv6_addr(hostname) or hostname path = os.path.basename(get_www_dir()) rings_url = 'http://{}/{}'.format(hostname, path) trigger = uuid.uuid4() # Notify storage nodes that there is a new ring to fetch. log("Notifying storage nodes that new rings are ready for sync.", level=INFO) for relid in relation_ids('swift-storage'): relation_set(relation_id=relid, swift_hash=get_swift_hash(), rings_url=rings_url, trigger=trigger)
def test_cluster_rpc_stop_proxy_ack(self, mock_uuid): mock_uuid.uuid4.return_value = 'token2' rpc = swift_utils.SwiftProxyClusterRPC() rq = rpc.stop_proxy_ack(echo_token='token1', echo_peers_only='1') self.assertEqual({'trigger': 'token2', 'broker-token': None, 'builder-broker': None, 'broker-timestamp': None, 'peers-only': '1', 'leader-changed-notification': None, 'resync-request': None, 'stop-proxy-service': None, 'stop-proxy-service-ack': 'token1', 'sync-only-builders': None}, rq) template_keys = set(rpc.template()) self.assertTrue(set(rq.keys()).issubset(template_keys))
def test_cluster_rpc_sync_request(self, mock_uuid, mock_time): mock_time.time = mock.Mock(return_value=float(1.234)) mock_uuid.uuid4.return_value = 'token2' rpc = swift_utils.SwiftProxyClusterRPC() rq = rpc.sync_rings_request('token1') self.assertEqual({'trigger': 'token2', 'broker-token': 'token1', 'broker-timestamp': '1.234000', 'builder-broker': '1.2.3.4', 'peers-only': None, 'leader-changed-notification': None, 'resync-request': None, 'stop-proxy-service': None, 'stop-proxy-service-ack': None, 'sync-only-builders': None}, rq) template_keys = set(rpc.template()) self.assertTrue(set(rq.keys()).issubset(template_keys))
def test_cluster_rpc_notify_leader_changed(self, mock_uuid): mock_uuid.uuid4.return_value = 'e4b67426-6cc0-4aa3-829d-227999cd0a75' rpc = swift_utils.SwiftProxyClusterRPC() rq = rpc.notify_leader_changed('token1') self.assertEqual({'trigger': 'e4b67426-6cc0-4aa3-829d-227999cd0a75', 'broker-token': None, 'builder-broker': None, 'broker-timestamp': None, 'peers-only': None, 'leader-changed-notification': 'token1', 'stop-proxy-service': None, 'stop-proxy-service-ack': None, 'resync-request': None, 'sync-only-builders': None}, rq) template_keys = set(rpc.template().keys()) self.assertTrue(set(rq.keys()).issubset(template_keys))
def test_disable_enable_user(event_loop): async with base.CleanController() as controller: username = 'test-disable{}'.format(uuid.uuid4()) user = await controller.add_user(username) await user.disable() assert not user.enabled assert user.disabled fresh = await controller.get_user(username) # fetch fresh copy assert not fresh.enabled assert fresh.disabled await user.enable() assert user.enabled assert not user.disabled fresh = await controller.get_user(username) # fetch fresh copy assert fresh.enabled assert not fresh.disabled
def test_change_user_password(event_loop): async with base.CleanController() as controller: username = 'test-password{}'.format(uuid.uuid4()) user = await controller.add_user(username) await user.set_password('password') # Check that we can connect with the new password. new_connection = None try: kwargs = controller.connection().connect_params() kwargs['username'] = username kwargs['password'] = 'password' new_connection = await Connection.connect(**kwargs) except JujuAPIError: raise AssertionError('Unable to connect with new password') finally: if new_connection: await new_connection.close()
def test_grant_revoke(event_loop): async with base.CleanController() as controller: username = 'test-grant{}'.format(uuid.uuid4()) user = await controller.add_user(username) await user.grant('superuser') assert user.access == 'superuser' fresh = await controller.get_user(username) # fetch fresh copy assert fresh.access == 'superuser' await user.grant('login') assert user.access == 'login' fresh = await controller.get_user(username) # fetch fresh copy assert fresh.access == 'login' await user.revoke() assert user.access is '' fresh = await controller.get_user(username) # fetch fresh copy assert fresh.access is ''
def _(node, dask, scope): def retrieve(term): try: return scope[term] except KeyError: scope[term] = ret = _ltree_to_dask(term, dask, scope) return ret name = '%s-%s' % (node.func, uuid4()) dask[name] = ( apply, retrieve(node.func), list(map(retrieve, node.args)), (dict, list(map(list, valmap(retrieve, node.kwargs).items()))), ) scope[node] = name return name
def test_get_saved_query(mocked_url, mocked_rw_apikey, mocked_account_resource_id, capsys): saved_query_id = str(uuid.uuid4()) mocked_url.return_value = '', MOCK_API_URL mocked_rw_apikey.return_value = str(uuid.uuid4()) mocked_account_resource_id.return_value = str(uuid.uuid4()) httpretty.register_uri(httpretty.GET, MOCK_API_URL, status=200, content_type='application/json', body=json.dumps({'saved_query': SAVED_QUERY_RESPONSE})) api.get_saved_query(saved_query_id) out, err = capsys.readouterr() assert "Name:" in out assert "Logs:" in out assert "ID:" in out assert "Statement:" in out assert "Time range:" in out assert "From:" in out assert "To:" in out
def test_patch_saved_query_none_fields(mocked_url, mocked_rw_apikey, mocked_account_resource_id, capsys): test_saved_query_id = str(uuid.uuid4()) mocked_url.return_value = '', MOCK_API_URL mocked_rw_apikey.return_value = str(uuid.uuid4()) mocked_account_resource_id.return_value = str(uuid.uuid4()) httpretty.register_uri(httpretty.PATCH, MOCK_API_URL, status=200, content_type='application/json', body=json.dumps({"saved_query": SAVED_QUERY_RESPONSE})) api.update_saved_query(test_saved_query_id, name=None, statement="new_statement") out, err = capsys.readouterr() assert "Saved query with id %s updated" % test_saved_query_id in out body = json.loads(httpretty.last_request().body)['saved_query'] assert "name" not in body assert "statement" in body['leql']
def test_rename_log(mocked_url, mocked_rw_apikey, mocked_ro_apikey, capsys): test_log_id = str(uuid.uuid4()) mocked_url.return_value = '', MOCK_API_URL mocked_rw_apikey.return_value = ID_WITH_VALID_LENGTH mocked_ro_apikey.return_value = ID_WITH_VALID_LENGTH request_body = '{"log": {"name": "test.log", "logsets_info": [], "source_type": "token"}}' expected_result = '{"log": {"name": "new_test_log_name", "logsets_info": [], "source_type": "token"}}' httpretty.register_uri(httpretty.GET, MOCK_API_URL, status=200, content_type='application/json', body=request_body) httpretty.register_uri(httpretty.PUT, MOCK_API_URL, status=200, body = expected_result, content_type='application/json') new_name_for_log = "new_test_log_name" api.rename_log(test_log_id, new_name_for_log) out, err = capsys.readouterr() assert new_name_for_log in out
def test_delete_api_key(mocked_url, mocked_owner_apikey, mocked_owner_apikey_id, mocked_account_resource_id, capsys): api_key_id = str(uuid.uuid4()) mocked_url.return_value = '', MOCK_API_URL + '/' + api_key_id mocked_owner_apikey.return_value = str(uuid.uuid4()) mocked_owner_apikey_id.return_value = str(uuid.uuid4()) mocked_account_resource_id.return_value = str(uuid.uuid4()) httpretty.register_uri(httpretty.DELETE, MOCK_API_URL + '/' + api_key_id, status=204, content_type='application/json') api.delete(api_key_id) out, err = capsys.readouterr() assert not err assert 'Deleted api key with id: %s' % api_key_id in out
def test_disable_api_key(mocked_url, mocked_owner_apikey, mocked_owner_apikey_id, mocked_account_resource_id, capsys): api_key_id = str(uuid.uuid4()) mocked_url.return_value = '', MOCK_API_URL mocked_owner_apikey.return_value = str(uuid.uuid4()) mocked_owner_apikey_id.return_value = str(uuid.uuid4()) mocked_account_resource_id.return_value = str(uuid.uuid4()) httpretty.register_uri(httpretty.PATCH, MOCK_API_URL, status=200, content_type='application/json', body=json.dumps({})) api.update(api_key_id, False) out, err = capsys.readouterr() assert {'apikey': {'active': False}} == json.loads(httpretty.last_request().body) assert not err assert 'Disabled api key with id: %s' % api_key_id in out
def test_enable_api_key(mocked_url, mocked_owner_apikey, mocked_owner_apikey_id, mocked_account_resource_id, capsys): api_key_id = str(uuid.uuid4()) mocked_url.return_value = '', MOCK_API_URL mocked_owner_apikey.return_value = str(uuid.uuid4()) mocked_owner_apikey_id.return_value = str(uuid.uuid4()) mocked_account_resource_id.return_value = str(uuid.uuid4()) httpretty.register_uri(httpretty.PATCH, MOCK_API_URL, status=200, content_type='application/json', body=json.dumps({})) api.update(api_key_id, True) out, err = capsys.readouterr() assert {'apikey': {'active': True}} == json.loads(httpretty.last_request().body) assert not err assert 'Enabled api key with id: %s' % api_key_id in out
def test_create_logset_from_file(mocked_url, mocked_rw_apikey, capsys): mocked_url.return_value = '', MOCK_API_URL mocked_rw_apikey.return_value = str(uuid.uuid4()) httpretty.register_uri(httpretty.POST, MOCK_API_URL, status=201, content_type='application/json', body=json.dumps(LOGSET_RESPONSE)) params = { "logset": { "name": "Test Logset" } } api.create_logset(params=params) out, err = capsys.readouterr() assert 'Test Logset' in out
def test_create_logset_invalid_json(mocked_url, mocked_rw_apikey, capsys): with pytest.raises(SystemExit) as exit: mocked_url.return_value = '', MOCK_API_URL mocked_rw_apikey.return_value = str(uuid.uuid4()) httpretty.register_uri(httpretty.POST, MOCK_API_URL, status=400, content_type='application/json', body='Client Error: Bad Request for url: https://rest.logentries.com/management/logsets') invalid_params = { "logset": { "id": "12341234-XXXX-YYYY-XXXX-12341234", "unknown_field": "unknown value" } } api.create_logset(params=invalid_params) out, err = capsys.readouterr() assert exit.code is 1 assert "Creating logset failed, status code: 400" in out
def test_rename_logset(mocked_url, mocked_rw_apikey, mocked_ro_apikey, capsys): mocked_url.return_value = '', MOCK_API_URL mocked_rw_apikey.return_value = str(uuid.uuid4()) mocked_ro_apikey.return_value = str(uuid.uuid4()) response_body = '{"logset": {"id": "XXXXXXXX-XXXX-YYYY-XXXX-XXXXXXXX","logs_info": [],"name": "old logset name"}}' expected_result = '{"logset": {"id": "XXXXXXXX-XXXX-YYYY-XXXX-XXXXXXXX","logs_info": [],"name": "new logset name"}}' httpretty.register_uri(httpretty.GET, MOCK_API_URL, status=200, content_type='application/json', body=response_body) httpretty.register_uri(httpretty.PUT, MOCK_API_URL, status=200, content_type='application/json', body=expected_result) api.rename_logset('XXXXXXXX-XXXX-YYYY-XXXX-XXXXXXXX', 'new logset name') out, err = capsys.readouterr() assert "new logset name" in out
def test_add_unknown_log_to_logset(mocked_url, mocked_ro_apikey, mocked_rw_apikey, capsys): with pytest.raises(SystemExit) as exit: mocked_url.return_value = '', MOCK_API_URL mocked_rw_apikey.return_value = str(uuid.uuid4()) mocked_ro_apikey.return_value = str(uuid.uuid4()) httpretty.register_uri(httpretty.GET, MOCK_API_URL, status=200, content_type='application/json', body=json.dumps(LOGSET_RESPONSE)) httpretty.register_uri(httpretty.PUT, MOCK_API_URL, status=400, content_type='application/json') api.add_log('XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXX', 'unknown_log') out, err = capsys.readouterr() assert "400" in out assert exit.code is 1
def test_remove_log_from_logset(mocked_url, mocked_ro_apikey, mocked_rw_apikey, capsys): mocked_url.return_value = '', MOCK_API_URL mocked_rw_apikey.return_value = str(uuid.uuid4()) mocked_ro_apikey.return_value = str(uuid.uuid4()) httpretty.register_uri(httpretty.GET, MOCK_API_URL , status=200, content_type='application/json', body=json.dumps(LOGSET_RESPONSE)) httpretty.register_uri(httpretty.PUT, MOCK_API_URL, status=200, content_Type='application/json', body=json.dumps(LOGSET_RESPONSE)) api.delete_log('123', str(uuid.uuid4())) out, err = capsys.readouterr() assert not err
def test_delete_logset_with_log_in_another_logset(mocked_url, mocked_rw_apikey, mocked_ro_apikey, capsys): mocked_url.return_value = '', MOCK_API_URL mocked_rw_apikey.return_value = str(uuid.uuid4()) mocked_ro_apikey.return_value = str(uuid.uuid4()) httpretty.register_uri(httpretty.DELETE, MOCK_API_URL, status=204, content_type='application/json') httpretty.register_uri(httpretty.GET, MOCK_API_URL, status=200, content_type='application/json', body=json.dumps({})) api.delete_logset('123') api.get_logset('456') out, err = capsys.readouterr() assert not err
def test_delete_user_from_team(mocked_url, mocked_rw_apikey, mocked_account_resource_id, capsys): test_team_id = str(uuid.uuid4()) mocked_url.return_value = '', MOCK_API_URL mocked_rw_apikey.return_value = str(uuid.uuid4()) mocked_account_resource_id.return_value = str(uuid.uuid4()) httpretty.register_uri(httpretty.GET, MOCK_API_URL, status=200, body=json.dumps({'team': TEAM_RESPONSE}), content_type='application/json') httpretty.register_uri(httpretty.PUT, MOCK_API_URL, status=200, content_type='application/json') user_id_to_add = str(uuid.uuid4()) api.delete_user_from_team(test_team_id, user_id_to_add) out, err = capsys.readouterr() assert "Deleted user with key: '%s' from team: %s\n" % (user_id_to_add, test_team_id) == out
def upload_file(upload_file_name, temp): # upload_file_name????? # ??? saveas??? # ?????????,??git???saveas #key = md5(str(time.time())+''.join(random.sample(string.letters, 12))).hexdigest() # key ?????? print u"??????: ", pic_name = raw_input() uuid_6 = uuid.uuid4().get_hex()[:8] #????? key = pic_name+"_"+uuid_6+".png" copyfile(upload_file_name,join(saveas,key)) mime_type = 'image/png' token = q.upload_token(bucket, key) ret, info = put_file(token, key, upload_file_name, mime_type=mime_type, check_crc=True) print 'upload qiniu result:', info assert ret['key'] == key assert ret['hash'] == etag(upload_file_name) os.rename(upload_file_name, upload_file_name+'.old') return domain+'/'+key
def compare_component_output(self, input_path, expected_output_path): rendering_engine = self.get_rendering_engine() temp_dir = tempfile.gettempdir() output_dir = os.path.join(temp_dir, str(uuid.uuid4())) process_sketch_archive(zip_path=input_path, compress_zip=False, output_path=output_dir, engine=rendering_engine) self.assertTrue(dircmp.is_same(expected_output_path, output_dir)) shutil.rmtree(output_dir) storage.clear() output_zip = os.path.join(temp_dir, "{}.zip".format(str(uuid.uuid4()))) process_sketch_archive(zip_path=input_path, compress_zip=True, output_path=output_zip, engine=rendering_engine) z = zipfile.ZipFile(output_zip) z.extractall(output_dir) self.assertTrue(dircmp.is_same(expected_output_path, output_dir)) shutil.rmtree(output_dir) os.remove(output_zip)
def remote_restart(rel_name, remote_service=None): trigger = { 'restart-trigger': str(uuid.uuid4()), } if remote_service: trigger['remote-service'] = remote_service for rid in relation_ids(rel_name): # This subordinate can be related to two seperate services using # different subordinate relations so only issue the restart if # the principle is conencted down the relation we think it is if related_units(relid=rid): relation_set(relation_id=rid, relation_settings=trigger, )
def get_uuid_epoch_stamp(self): """Returns a stamp string based on uuid4 and epoch time. Useful in generating test messages which need to be unique-ish.""" return '[{}-{}]'.format(uuid.uuid4(), time.time()) # amulet juju action helpers:
def setUpTestData(cls): super().setUpTestData() cls.create_local_and_remote_user() cls.local_content = LocalContentFactory() cls.local_content2 = LocalContentFactory(guid=str(uuid4())) cls.remote_content = PublicContentFactory() cls.remote_profile2 = PublicProfileFactory()
def test_does_not_forward_share_if_not_local_content(self, mock_rq): entity = base.Share( guid=str(uuid4()), handle=self.remote_profile.handle, target_guid=self.remote_content.guid, target_handle=self.remote_content.author.handle, public=True, ) process_entity_share(entity, self.remote_profile) self.assertFalse(mock_rq.called)
def test_forwards_share_if_local_content(self, mock_rq): entity = base.Share( guid=str(uuid4()), handle=self.remote_profile.handle, target_guid=self.local_content.guid, target_handle=self.local_content.author.handle, public=True, ) process_entity_share(entity, self.remote_profile) mock_rq.assert_called_once_with(forward_entity, entity, self.local_content.id)