Python uuid 模块,uuid4() 实例源码

我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用uuid.uuid4()

项目:alfred-mpd    作者:deanishe    | 项目源码 | 文件源码
def session_id(self):
        """A unique session ID every time the user uses the workflow.

        .. versionadded:: 1.25

        The session ID persists while the user is using this workflow.
        It expires when the user runs a different workflow or closes
        Alfred.

        """
        if not self._session_id:
            sid = os.getenv('_WF_SESSION_ID')
            if not sid:
                from uuid import uuid4
                sid = uuid4().hex
                self.setvar('_WF_SESSION_ID', sid)

            self._session_id = sid

        return self._session_id
项目:socialhome    作者:jaywink    | 项目源码 | 文件源码
def test_share_target_is_fetched_if_no_target_found(self, mock_retrieve):
        entity = base.Share(
            guid=str(uuid4()), handle=self.remote_profile.handle, target_guid="notexistingguid",
            target_handle=self.remote_profile2.handle, public=True,
        )
        mock_retrieve.return_value = entities.PostFactory(
            guid=entity.target_guid, handle=self.remote_profile2.handle,
        )
        process_entity_share(entity, self.remote_profile)
        mock_retrieve.assert_called_once_with(entity.target_id, sender_key_fetcher=sender_key_fetcher)
        self.assertTrue(Content.objects.filter(guid=entity.target_guid, content_type=ContentType.CONTENT).exists())
        self.assertTrue(
            Content.objects.filter(
                guid=entity.guid, share_of__guid=entity.target_guid, content_type=ContentType.SHARE
            ).exists()
        )
项目:socialhome    作者:jaywink    | 项目源码 | 文件源码
def save(self, *args, **kwargs):
        if self.parent and self.share_of:
            raise ValueError("Can't be both a reply and a share!")
        self.cache_data()

        if self.parent:
            self.content_type = ContentType.REPLY
            # Ensure replies have sane values
            self.visibility = self.parent.visibility
            self.pinned = False
        elif self.share_of:
            self.content_type = ContentType.SHARE

        if not self.pk:
            if not self.guid:
                self.guid = uuid4()
            if self.pinned:
                max_order = Content.objects.top_level().filter(author=self.author).aggregate(Max("order"))["order__max"]
                if max_order is not None:  # If max_order is None, there is likely to be no content yet
                    self.order = max_order + 1

        self.fix_local_uploads()
        super().save(*args, **kwargs)
        self.cache_related_object_data()
项目:flash_services    作者:textbook    | 项目源码 | 文件源码
def define_services(config):
    """Define the service settings for the current app.

    Arguments:
      config (:py:class:`list`): The service configuration required.

    Returns:
      :py:class:`collections.OrderedDict`: Configured services.

    Raises:
      :py:class:`ValueError`: If a non-existent service is requested.

    """
    services = OrderedDict()
    for settings in config:
        name = settings['name']
        if name not in SERVICES:
            logger.warning('unknown service %r', name)
            continue
        services[uuid4().hex] = SERVICES[name].from_config(**settings)
    return services
项目:monasca-transform    作者:openstack    | 项目源码 | 文件源码
def test_as_dict(self):
        topic_1 = str(uuid.uuid4())
        partition_1 = random.randint(0, 1024)
        until_offset_1 = random.randint(0, sys.maxsize)
        from_offset_1 = random.randint(0, sys.maxsize)
        app_name_1 = str(uuid.uuid4())

        offset_spec = OffsetSpec(
            app_name=app_name_1, topic=topic_1, partition=partition_1,
            from_offset=from_offset_1, until_offset=until_offset_1)
        offset_spec_dict = JSONOffsetSpecs.as_dict(offset_spec)
        self.assertions_on_offset(
            used_value={
                "topic": topic_1, "partition": partition_1,
                "app_name": app_name_1, "from_offset": from_offset_1,
                "until_offset": until_offset_1},
            offset_value=offset_spec_dict)
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def _fake_vif(cls=osv_vif.VIFOpenVSwitch):
    vif = cls(
        id=uuid.uuid4(),
        vif_name='h_interface',
        bridge_name='bridge',
        address='3e:94:b7:31:a0:83',
        port_profile=osv_objects.vif.VIFPortProfileOpenVSwitch(
            interface_id='89eccd45-43e9-43d8-b4cc-4c13db13f782',
            profile_id=str(uuid.uuid4()),
        ),
    )
    vif.network = osv_objects.network.Network(id=uuid.uuid4(), mtu=1)
    subnet = osv_objects.subnet.Subnet(
        uuid=uuid.uuid4(),
        dns=['192.168.0.1'],
        cidr='192.168.0.0/24',
        gateway='192.168.0.1',
        routes=osv_objects.route.RouteList(objects=[]),
    )
    subnet.ips = osv_objects.fixed_ip.FixedIPList(objects=[])
    subnet.ips.objects.append(
        osv_objects.fixed_ip.FixedIP(address='192.168.0.2'))
    vif.network.subnets.objects.append(subnet)
    return vif
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def setUp(self):
        super(TestDriverMixin, self).setUp()
        self.instance_info = osv_objects.instance_info.InstanceInfo(
            uuid=uuid.uuid4(), name='foo')
        self.ifname = 'c_interface'
        self.netns = '/proc/netns/1234'

        # Mock IPDB context managers
        self.ipdbs = {}
        self.m_bridge_iface = mock.Mock(__exit__=mock.Mock())
        self.m_c_iface = mock.Mock()
        self.m_h_iface = mock.Mock()
        self.h_ipdb, self.h_ipdb_exit = self._mock_ipdb_context_manager(None)
        self.c_ipdb, self.c_ipdb_exit = self._mock_ipdb_context_manager(
            self.netns)
        self.m_create = mock.Mock()
        self.c_ipdb.create = mock.Mock(
            return_value=mock.Mock(
                __enter__=mock.Mock(return_value=self.m_create),
                __exit__=mock.Mock()))
项目:djaio    作者:Sberned    | 项目源码 | 文件源码
def execute(self):
        try:
            name = self.params['name']
            guid =  uuid.uuid4()
            query = """
            INSERT INTO test_table(name, guid) values(%(name)s, %(guid)s);
            """
            query_get = """
            SELECT id, name, guid from test_table where guid = %(guid)s;
            """
            ins = await self.app.db.execute('test_db', query, {'name': name, 'guid': guid}, 'insert')
            _data = await self.app.db.execute('test_db', query_get, {'guid':guid}, 'select')
            self.result = [dict(d) for d in _data]
        except Exception as e:
            self.errors.append({
                'code': 502,
                'message': '{}'.format(e)
            })
        return self.result
项目:graph    作者:noxern    | 项目源码 | 文件源码
def slack(text: hug.types.text):
    """Returns JSON containing an attachment with an image url for the Slack integration"""
    title = text

    if text == 'top250':
        top250_res = requests.get(IMDB_URL + '/chart/toptv', headers={'Accept-Language': 'en'})
        top250_page = html.fromstring(top250_res.text)
        candidates = top250_page.xpath('//*[@data-caller-name="chart-top250tv"]//tr/td[2]/a')

        title = random.choice(candidates).text

    return dict(
        response_type='in_channel',
        attachments=[
            dict(image_url=GRAPH_URL + f'/graph?title={quote(title)}&uuid={uuid.uuid4()}')
        ]
    )
项目:sentry-plugins    作者:getsentry    | 项目源码 | 文件源码
def test_unregistered_event(self):
        project = self.project  # force creation
        url = '/plugins/github/organizations/{}/webhook/'.format(
            project.organization.id,
        )

        secret = 'b3002c3e321d4b7880360d397db2ccfd'

        OrganizationOption.objects.set_value(
            organization=project.organization,
            key='github:webhook_secret',
            value=secret,
        )

        response = self.client.post(
            path=url,
            data=PUSH_EVENT_EXAMPLE,
            content_type='application/json',
            HTTP_X_GITHUB_EVENT='UnregisteredEvent',
            HTTP_X_HUB_SIGNATURE='sha1=98196e70369945ffa6b248cf70f7dc5e46dff241',
            HTTP_X_GITHUB_DELIVERY=six.text_type(uuid4())
        )

        assert response.status_code == 204
项目:sentry-plugins    作者:getsentry    | 项目源码 | 文件源码
def test_invalid_signature_event(self):
        project = self.project  # force creation

        url = '/plugins/github/organizations/{}/webhook/'.format(
            project.organization.id,
        )

        secret = '2d7565c3537847b789d6995dca8d9f84'

        OrganizationOption.objects.set_value(
            organization=project.organization,
            key='github:webhook_secret',
            value=secret,
        )

        response = self.client.post(
            path=url,
            data=PUSH_EVENT_EXAMPLE,
            content_type='application/json',
            HTTP_X_GITHUB_EVENT='push',
            HTTP_X_HUB_SIGNATURE='sha1=33521abeaaf9a57c2abf486e0ccd54d23cf36fec',
            HTTP_X_GITHUB_DELIVERY=six.text_type(uuid4())
        )

        assert response.status_code == 401
项目:sentry-plugins    作者:getsentry    | 项目源码 | 文件源码
def test_simple(self):
        url = '/plugins/github/installations/webhook/'

        response = self.client.post(
            path=url,
            data=INSTALLATION_EVENT_EXAMPLE,
            content_type='application/json',
            HTTP_X_GITHUB_EVENT='installation',
            HTTP_X_HUB_SIGNATURE='sha1=348e46312df2901e8cb945616ee84ce30d9987c9',
            HTTP_X_GITHUB_DELIVERY=six.text_type(uuid4())
        )

        assert response.status_code == 204

        assert Integration.objects.filter(
            provider='github_apps',
            external_id=2,
            name='octocat',
        ).exists()
项目:sentry-plugins    作者:getsentry    | 项目源码 | 文件源码
def get_webhook_secret(self, organization):
        lock = locks.get('github:webhook-secret:{}'.format(organization.id), duration=60)
        with lock.acquire():
            # TODO(dcramer): get_or_create would be a useful native solution
            secret = OrganizationOption.objects.get_value(
                organization=organization,
                key='github:webhook_secret',
            )
            if secret is None:
                secret = uuid4().hex + uuid4().hex
                OrganizationOption.objects.set_value(
                    organization=organization,
                    key='github:webhook_secret',
                    value=secret,
                )
        return secret
项目:ckanext-validation    作者:frictionlessdata    | 项目源码 | 文件源码
def test_delete_upload_file(self, mock_open):

        resource_id = str(uuid.uuid4())
        path = '/doesnt_exist/resources/{}/{}/{}'.format(
            resource_id[0:3], resource_id[3:6], resource_id[6:]
        )

        patcher = fake_filesystem_unittest.Patcher()
        patcher.setUp()
        patcher.fs.CreateFile(path)

        assert os.path.exists(path)

        delete_local_uploaded_file(resource_id)

        assert not os.path.exists(path)

        patcher.tearDown()
项目:ckanext-validation    作者:frictionlessdata    | 项目源码 | 文件源码
def test_delete_file_not_deleted_if_resources_first(self, mock_open):

        resource_id = str(uuid.uuid4())
        path = '/doesnt_exist/resources/{}'.format(resource_id)

        patcher = fake_filesystem_unittest.Patcher()
        patcher.setUp()
        patcher.fs.CreateFile(path)

        assert os.path.exists(path)
        with mock.patch('ckanext.validation.utils.get_local_upload_path',
                        return_value=path):
            delete_local_uploaded_file(resource_id)

        assert not os.path.exists(path)
        assert os.path.exists('/doesnt_exist/resources')

        patcher.tearDown()
项目:ckanext-validation    作者:frictionlessdata    | 项目源码 | 文件源码
def test_delete_file_not_deleted_if_resources_second(self, mock_open):

        resource_id = str(uuid.uuid4())
        path = '/doesnt_exist/resources/data/{}'.format(resource_id)

        patcher = fake_filesystem_unittest.Patcher()
        patcher.setUp()
        patcher.fs.CreateFile(path)

        assert os.path.exists(path)
        with mock.patch('ckanext.validation.utils.get_local_upload_path',
                        return_value=path):
            delete_local_uploaded_file(resource_id)

        assert not os.path.exists(path)
        assert os.path.exists('/doesnt_exist/resources')

        patcher.tearDown()
项目:ckanext-validation    作者:frictionlessdata    | 项目源码 | 文件源码
def test_delete_passes_if_os_exeception(self, mock_open):

        resource_id = str(uuid.uuid4())
        path = '/doesnt_exist/resources/{}/{}/{}'.format(
            resource_id[0:3], resource_id[3:6], resource_id[6:]
        )

        patcher = fake_filesystem_unittest.Patcher()
        patcher.setUp()
        patcher.fs.CreateFile(path)

        assert os.path.exists(path)
        with mock.patch('ckanext.validation.utils.os.remove',
                        side_effect=OSError):

            delete_local_uploaded_file(resource_id)


        patcher.tearDown()
项目:PyPlanet    作者:PyPlanet    | 项目源码 | 文件源码
def get_ghost_replay(self, login):
        replay_name = 'dedimania_{}.Replay.Gbx'.format(uuid.uuid4().hex)
        try:
            await self.instance.gbx('SaveBestGhostsReplay', login, replay_name)
        except:
            return None
        try:
            async with self.instance.storage.open('UserData/Replays/{}'.format(replay_name)) as ghost_file:
                return await ghost_file.read()
        except FileNotFoundError as e:
            message = '$f00Error: Dedimania requires you to have file access on the server. We can\'t fetch' \
                      'the driven replay!'
            logger.error('Please make sure we can access the dedicated files. Configure your storage driver correctly! '
                         '{}'.format(str(e)))
            await self.instance.chat(message)
            raise DedimaniaException('Can\'t access replay file')
        except PermissionError as e:
            message = '$f00Error: Dedimania requires you to have file access on the server. We can\'t fetch' \
                      'the driven replay because of an permission problem!'
            logger.error('We can\'t read files in the dedicated folder, your permissions don\'t allow us to read it! '
                         '{}'.format(str(e)))
            await self.instance.chat(message)
            raise DedimaniaException('Can\'t access files due to permission problems')
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def sync_rings_request(self, broker_token, builders_only=False):
        """Request for peers to sync rings from leader.

        NOTE: this action must only be performed by the cluster leader.

        :param broker_token: token to identify sync request.
        :param builders_only: if False, tell peers to sync builders only (not
                              rings).
        """
        if not is_elected_leader(SWIFT_HA_RES):
            errmsg = "Leader function called by non-leader"
            raise SwiftProxyCharmException(errmsg)

        rq = self.template()
        rq['trigger'] = str(uuid.uuid4())

        if builders_only:
            rq['sync-only-builders'] = 1

        rq['broker-token'] = broker_token
        rq['broker-timestamp'] = "{:f}".format(time.time())
        rq['builder-broker'] = self._hostname
        return rq
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def notify_leader_changed(self, token):
        """Notify peers that leader has changed.

        The token passed in must be that associated with the sync we claim to
        have been interrupted. It will be re-used by the restored leader once
        it receives this notification.

        NOTE: this action must only be performed by the cluster leader that
              has relinquished it's leader status as part of the current hook
              context.
        """
        if not is_elected_leader(SWIFT_HA_RES):
            errmsg = "Leader function called by non-leader"
            raise SwiftProxyCharmException(errmsg)

        rq = self.template()
        rq['trigger'] = str(uuid.uuid4())
        rq[self.KEY_NOTIFY_LEADER_CHANGED] = token
        return rq
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def notify_storage_rings_available():
    """Notify peer swift-storage relations that they should synchronise ring
    and builder files.

    Note that this should only be called from the leader unit.
    """
    if not is_elected_leader(SWIFT_HA_RES):
        log("Ring availability storage-relation broadcast requested by "
            "non-leader - skipping", level=WARNING)
        return

    hostname = get_hostaddr()
    hostname = format_ipv6_addr(hostname) or hostname
    path = os.path.basename(get_www_dir())
    rings_url = 'http://{}/{}'.format(hostname, path)
    trigger = uuid.uuid4()
    # Notify storage nodes that there is a new ring to fetch.
    log("Notifying storage nodes that new rings are ready for sync.",
        level=INFO)
    for relid in relation_ids('swift-storage'):
        relation_set(relation_id=relid, swift_hash=get_swift_hash(),
                     rings_url=rings_url, trigger=trigger)
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def test_cluster_rpc_stop_proxy_ack(self, mock_uuid):
        mock_uuid.uuid4.return_value = 'token2'
        rpc = swift_utils.SwiftProxyClusterRPC()
        rq = rpc.stop_proxy_ack(echo_token='token1', echo_peers_only='1')
        self.assertEqual({'trigger': 'token2',
                          'broker-token': None,
                          'builder-broker': None,
                          'broker-timestamp': None,
                          'peers-only': '1',
                          'leader-changed-notification': None,
                          'resync-request': None,
                          'stop-proxy-service': None,
                          'stop-proxy-service-ack': 'token1',
                          'sync-only-builders': None}, rq)

        template_keys = set(rpc.template())
        self.assertTrue(set(rq.keys()).issubset(template_keys))
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def test_cluster_rpc_sync_request(self, mock_uuid, mock_time):
        mock_time.time = mock.Mock(return_value=float(1.234))
        mock_uuid.uuid4.return_value = 'token2'
        rpc = swift_utils.SwiftProxyClusterRPC()
        rq = rpc.sync_rings_request('token1')
        self.assertEqual({'trigger': 'token2',
                          'broker-token': 'token1',
                          'broker-timestamp': '1.234000',
                          'builder-broker': '1.2.3.4',
                          'peers-only': None,
                          'leader-changed-notification': None,
                          'resync-request': None,
                          'stop-proxy-service': None,
                          'stop-proxy-service-ack': None,
                          'sync-only-builders': None}, rq)

        template_keys = set(rpc.template())
        self.assertTrue(set(rq.keys()).issubset(template_keys))
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def test_cluster_rpc_notify_leader_changed(self, mock_uuid):
        mock_uuid.uuid4.return_value = 'e4b67426-6cc0-4aa3-829d-227999cd0a75'
        rpc = swift_utils.SwiftProxyClusterRPC()
        rq = rpc.notify_leader_changed('token1')
        self.assertEqual({'trigger': 'e4b67426-6cc0-4aa3-829d-227999cd0a75',
                          'broker-token': None,
                          'builder-broker': None,
                          'broker-timestamp': None,
                          'peers-only': None,
                          'leader-changed-notification': 'token1',
                          'stop-proxy-service': None,
                          'stop-proxy-service-ack': None,
                          'resync-request': None,
                          'sync-only-builders': None}, rq)

        template_keys = set(rpc.template().keys())
        self.assertTrue(set(rq.keys()).issubset(template_keys))
项目:python-libjuju    作者:juju    | 项目源码 | 文件源码
def test_disable_enable_user(event_loop):
    async with base.CleanController() as controller:
        username = 'test-disable{}'.format(uuid.uuid4())
        user = await controller.add_user(username)

        await user.disable()
        assert not user.enabled
        assert user.disabled

        fresh = await controller.get_user(username)  # fetch fresh copy
        assert not fresh.enabled
        assert fresh.disabled

        await user.enable()
        assert user.enabled
        assert not user.disabled

        fresh = await controller.get_user(username)  # fetch fresh copy
        assert fresh.enabled
        assert not fresh.disabled
项目:python-libjuju    作者:juju    | 项目源码 | 文件源码
def test_change_user_password(event_loop):
    async with base.CleanController() as controller:
        username = 'test-password{}'.format(uuid.uuid4())
        user = await controller.add_user(username)
        await user.set_password('password')
        # Check that we can connect with the new password.
        new_connection = None
        try:
            kwargs = controller.connection().connect_params()
            kwargs['username'] = username
            kwargs['password'] = 'password'
            new_connection = await Connection.connect(**kwargs)
        except JujuAPIError:
            raise AssertionError('Unable to connect with new password')
        finally:
            if new_connection:
                await new_connection.close()
项目:python-libjuju    作者:juju    | 项目源码 | 文件源码
def test_grant_revoke(event_loop):
    async with base.CleanController() as controller:
        username = 'test-grant{}'.format(uuid.uuid4())
        user = await controller.add_user(username)
        await user.grant('superuser')
        assert user.access == 'superuser'
        fresh = await controller.get_user(username)  # fetch fresh copy
        assert fresh.access == 'superuser'
        await user.grant('login')
        assert user.access == 'login'
        fresh = await controller.get_user(username)  # fetch fresh copy
        assert fresh.access == 'login'
        await user.revoke()
        assert user.access is ''
        fresh = await controller.get_user(username)  # fetch fresh copy
        assert fresh.access is ''
项目:daisy    作者:llllllllll    | 项目源码 | 文件源码
def _(node, dask, scope):
    def retrieve(term):
        try:
            return scope[term]
        except KeyError:
            scope[term] = ret = _ltree_to_dask(term, dask, scope)
            return ret

    name = '%s-%s' % (node.func, uuid4())
    dask[name] = (
        apply,
        retrieve(node.func),
        list(map(retrieve, node.args)),
        (dict, list(map(list, valmap(retrieve, node.kwargs).items()))),
    )
    scope[node] = name
    return name
项目:lecli    作者:rapid7    | 项目源码 | 文件源码
def test_get_saved_query(mocked_url, mocked_rw_apikey, mocked_account_resource_id, capsys):
    saved_query_id = str(uuid.uuid4())
    mocked_url.return_value = '', MOCK_API_URL
    mocked_rw_apikey.return_value = str(uuid.uuid4())
    mocked_account_resource_id.return_value = str(uuid.uuid4())
    httpretty.register_uri(httpretty.GET, MOCK_API_URL, status=200,
                           content_type='application/json',
                           body=json.dumps({'saved_query': SAVED_QUERY_RESPONSE}))

    api.get_saved_query(saved_query_id)
    out, err = capsys.readouterr()

    assert "Name:" in out
    assert "Logs:" in out
    assert "ID:" in out
    assert "Statement:" in out
    assert "Time range:" in out
    assert "From:" in out
    assert "To:" in out
项目:lecli    作者:rapid7    | 项目源码 | 文件源码
def test_patch_saved_query_none_fields(mocked_url, mocked_rw_apikey, mocked_account_resource_id,
                                       capsys):
    test_saved_query_id = str(uuid.uuid4())
    mocked_url.return_value = '', MOCK_API_URL
    mocked_rw_apikey.return_value = str(uuid.uuid4())
    mocked_account_resource_id.return_value = str(uuid.uuid4())
    httpretty.register_uri(httpretty.PATCH, MOCK_API_URL, status=200,
                           content_type='application/json',
                           body=json.dumps({"saved_query": SAVED_QUERY_RESPONSE}))

    api.update_saved_query(test_saved_query_id, name=None,
                           statement="new_statement")
    out, err = capsys.readouterr()

    assert "Saved query with id %s updated" % test_saved_query_id in out
    body = json.loads(httpretty.last_request().body)['saved_query']
    assert "name" not in body
    assert "statement" in body['leql']
项目:lecli    作者:rapid7    | 项目源码 | 文件源码
def test_rename_log(mocked_url, mocked_rw_apikey, mocked_ro_apikey, capsys):
    test_log_id = str(uuid.uuid4())
    mocked_url.return_value = '', MOCK_API_URL
    mocked_rw_apikey.return_value = ID_WITH_VALID_LENGTH
    mocked_ro_apikey.return_value = ID_WITH_VALID_LENGTH

    request_body = '{"log": {"name": "test.log", "logsets_info": [], "source_type": "token"}}'
    expected_result = '{"log": {"name": "new_test_log_name", "logsets_info": [], "source_type": "token"}}'

    httpretty.register_uri(httpretty.GET, MOCK_API_URL,
                           status=200,
                           content_type='application/json',
                           body=request_body)

    httpretty.register_uri(httpretty.PUT, MOCK_API_URL, status=200,
                           body = expected_result, content_type='application/json')

    new_name_for_log = "new_test_log_name"
    api.rename_log(test_log_id, new_name_for_log)
    out, err = capsys.readouterr()

    assert new_name_for_log in out
项目:lecli    作者:rapid7    | 项目源码 | 文件源码
def test_delete_api_key(mocked_url, mocked_owner_apikey, mocked_owner_apikey_id,
                        mocked_account_resource_id, capsys):
    api_key_id = str(uuid.uuid4())
    mocked_url.return_value = '', MOCK_API_URL + '/' + api_key_id
    mocked_owner_apikey.return_value = str(uuid.uuid4())
    mocked_owner_apikey_id.return_value = str(uuid.uuid4())
    mocked_account_resource_id.return_value = str(uuid.uuid4())
    httpretty.register_uri(httpretty.DELETE, MOCK_API_URL + '/' + api_key_id,
                           status=204,
                           content_type='application/json')

    api.delete(api_key_id)

    out, err = capsys.readouterr()
    assert not err
    assert 'Deleted api key with id: %s' % api_key_id in out
项目:lecli    作者:rapid7    | 项目源码 | 文件源码
def test_disable_api_key(mocked_url, mocked_owner_apikey, mocked_owner_apikey_id,
                         mocked_account_resource_id, capsys):
    api_key_id = str(uuid.uuid4())
    mocked_url.return_value = '', MOCK_API_URL
    mocked_owner_apikey.return_value = str(uuid.uuid4())
    mocked_owner_apikey_id.return_value = str(uuid.uuid4())
    mocked_account_resource_id.return_value = str(uuid.uuid4())
    httpretty.register_uri(httpretty.PATCH, MOCK_API_URL,
                           status=200,
                           content_type='application/json',
                           body=json.dumps({}))

    api.update(api_key_id, False)

    out, err = capsys.readouterr()
    assert {'apikey': {'active': False}} == json.loads(httpretty.last_request().body)
    assert not err
    assert 'Disabled api key with id: %s' % api_key_id in out
项目:lecli    作者:rapid7    | 项目源码 | 文件源码
def test_enable_api_key(mocked_url, mocked_owner_apikey, mocked_owner_apikey_id,
                        mocked_account_resource_id, capsys):
    api_key_id = str(uuid.uuid4())
    mocked_url.return_value = '', MOCK_API_URL
    mocked_owner_apikey.return_value = str(uuid.uuid4())
    mocked_owner_apikey_id.return_value = str(uuid.uuid4())
    mocked_account_resource_id.return_value = str(uuid.uuid4())
    httpretty.register_uri(httpretty.PATCH, MOCK_API_URL,
                           status=200,
                           content_type='application/json',
                           body=json.dumps({}))

    api.update(api_key_id, True)

    out, err = capsys.readouterr()
    assert {'apikey': {'active': True}} == json.loads(httpretty.last_request().body)
    assert not err
    assert 'Enabled api key with id: %s' % api_key_id in out
项目:lecli    作者:rapid7    | 项目源码 | 文件源码
def test_create_logset_from_file(mocked_url, mocked_rw_apikey, capsys):
    mocked_url.return_value = '', MOCK_API_URL
    mocked_rw_apikey.return_value = str(uuid.uuid4())

    httpretty.register_uri(httpretty.POST, MOCK_API_URL, status=201,
                           content_type='application/json',
                           body=json.dumps(LOGSET_RESPONSE))

    params = {
        "logset": {
            "name": "Test Logset"
        }
    }

    api.create_logset(params=params)
    out, err = capsys.readouterr()

    assert 'Test Logset' in out
项目:lecli    作者:rapid7    | 项目源码 | 文件源码
def test_create_logset_invalid_json(mocked_url, mocked_rw_apikey, capsys):
    with pytest.raises(SystemExit) as exit:
        mocked_url.return_value = '', MOCK_API_URL
        mocked_rw_apikey.return_value = str(uuid.uuid4())

        httpretty.register_uri(httpretty.POST, MOCK_API_URL, status=400,
                               content_type='application/json',
                               body='Client Error: Bad Request for url: https://rest.logentries.com/management/logsets')

        invalid_params = {
            "logset": {
                "id": "12341234-XXXX-YYYY-XXXX-12341234",
                "unknown_field": "unknown value"
            }
        }

        api.create_logset(params=invalid_params)
        out, err = capsys.readouterr()

        assert exit.code is 1
        assert "Creating logset failed, status code: 400" in out
项目:lecli    作者:rapid7    | 项目源码 | 文件源码
def test_rename_logset(mocked_url, mocked_rw_apikey, mocked_ro_apikey, capsys):
    mocked_url.return_value = '', MOCK_API_URL
    mocked_rw_apikey.return_value = str(uuid.uuid4())
    mocked_ro_apikey.return_value = str(uuid.uuid4())

    response_body = '{"logset": {"id": "XXXXXXXX-XXXX-YYYY-XXXX-XXXXXXXX","logs_info": [],"name": "old logset name"}}'
    expected_result = '{"logset": {"id": "XXXXXXXX-XXXX-YYYY-XXXX-XXXXXXXX","logs_info": [],"name": "new logset name"}}'

    httpretty.register_uri(httpretty.GET, MOCK_API_URL, status=200,
                           content_type='application/json',
                           body=response_body)

    httpretty.register_uri(httpretty.PUT, MOCK_API_URL, status=200,
                           content_type='application/json',
                           body=expected_result)

    api.rename_logset('XXXXXXXX-XXXX-YYYY-XXXX-XXXXXXXX', 'new logset name')
    out, err = capsys.readouterr()

    assert "new logset name" in out
项目:lecli    作者:rapid7    | 项目源码 | 文件源码
def test_add_unknown_log_to_logset(mocked_url, mocked_ro_apikey, mocked_rw_apikey, capsys):
    with pytest.raises(SystemExit) as exit:
        mocked_url.return_value = '', MOCK_API_URL
        mocked_rw_apikey.return_value = str(uuid.uuid4())
        mocked_ro_apikey.return_value = str(uuid.uuid4())

        httpretty.register_uri(httpretty.GET, MOCK_API_URL, status=200,
                               content_type='application/json', body=json.dumps(LOGSET_RESPONSE))

        httpretty.register_uri(httpretty.PUT, MOCK_API_URL, status=400,
                               content_type='application/json')

        api.add_log('XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXX', 'unknown_log')
        out, err = capsys.readouterr()

        assert "400" in out
        assert exit.code is 1
项目:lecli    作者:rapid7    | 项目源码 | 文件源码
def test_remove_log_from_logset(mocked_url, mocked_ro_apikey, mocked_rw_apikey, capsys):
    mocked_url.return_value = '', MOCK_API_URL
    mocked_rw_apikey.return_value = str(uuid.uuid4())
    mocked_ro_apikey.return_value = str(uuid.uuid4())

    httpretty.register_uri(httpretty.GET, MOCK_API_URL ,
                           status=200,
                           content_type='application/json',
                           body=json.dumps(LOGSET_RESPONSE))

    httpretty.register_uri(httpretty.PUT, MOCK_API_URL,
                           status=200,
                           content_Type='application/json',
                           body=json.dumps(LOGSET_RESPONSE))
    api.delete_log('123', str(uuid.uuid4()))
    out, err = capsys.readouterr()

    assert not err
项目:lecli    作者:rapid7    | 项目源码 | 文件源码
def test_delete_logset_with_log_in_another_logset(mocked_url, mocked_rw_apikey, mocked_ro_apikey, capsys):
    mocked_url.return_value = '', MOCK_API_URL
    mocked_rw_apikey.return_value = str(uuid.uuid4())
    mocked_ro_apikey.return_value = str(uuid.uuid4())

    httpretty.register_uri(httpretty.DELETE, MOCK_API_URL,
                           status=204, content_type='application/json')

    httpretty.register_uri(httpretty.GET, MOCK_API_URL,
                           status=200, content_type='application/json',
                           body=json.dumps({}))

    api.delete_logset('123')
    api.get_logset('456')
    out, err = capsys.readouterr()

    assert not err
项目:lecli    作者:rapid7    | 项目源码 | 文件源码
def test_delete_user_from_team(mocked_url, mocked_rw_apikey, mocked_account_resource_id, capsys):
    test_team_id = str(uuid.uuid4())
    mocked_url.return_value = '', MOCK_API_URL
    mocked_rw_apikey.return_value = str(uuid.uuid4())
    mocked_account_resource_id.return_value = str(uuid.uuid4())
    httpretty.register_uri(httpretty.GET, MOCK_API_URL, status=200,
                           body=json.dumps({'team': TEAM_RESPONSE}),
                           content_type='application/json')
    httpretty.register_uri(httpretty.PUT, MOCK_API_URL, status=200,
                           content_type='application/json')

    user_id_to_add = str(uuid.uuid4())
    api.delete_user_from_team(test_team_id, user_id_to_add)
    out, err = capsys.readouterr()

    assert "Deleted user with key: '%s' from team: %s\n" % (user_id_to_add, test_team_id) == out
项目:figbed    作者:wwj718    | 项目源码 | 文件源码
def upload_file(upload_file_name, temp):
    # upload_file_name?????
    # ??? saveas???
    #  ?????????,??git???saveas
    #key = md5(str(time.time())+''.join(random.sample(string.letters, 12))).hexdigest()
    # key ??????
    print u"??????: ",
    pic_name = raw_input()
    uuid_6 = uuid.uuid4().get_hex()[:8] #?????
    key = pic_name+"_"+uuid_6+".png"
    copyfile(upload_file_name,join(saveas,key))
    mime_type = 'image/png'
    token = q.upload_token(bucket, key)
    ret, info = put_file(token, key, upload_file_name, mime_type=mime_type, check_crc=True)
    print 'upload qiniu result:', info
    assert ret['key'] == key
    assert ret['hash'] == etag(upload_file_name)
    os.rename(upload_file_name, upload_file_name+'.old')
    return domain+'/'+key
项目:sketch-components    作者:ibhubs    | 项目源码 | 文件源码
def compare_component_output(self, input_path, expected_output_path):
        rendering_engine = self.get_rendering_engine()
        temp_dir = tempfile.gettempdir()
        output_dir = os.path.join(temp_dir, str(uuid.uuid4()))
        process_sketch_archive(zip_path=input_path, compress_zip=False,
                               output_path=output_dir, engine=rendering_engine)
        self.assertTrue(dircmp.is_same(expected_output_path, output_dir))
        shutil.rmtree(output_dir)
        storage.clear()
        output_zip = os.path.join(temp_dir, "{}.zip".format(str(uuid.uuid4())))
        process_sketch_archive(zip_path=input_path, compress_zip=True,
                               output_path=output_zip, engine=rendering_engine)
        z = zipfile.ZipFile(output_zip)
        z.extractall(output_dir)
        self.assertTrue(dircmp.is_same(expected_output_path, output_dir))
        shutil.rmtree(output_dir)
        os.remove(output_zip)
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def remote_restart(rel_name, remote_service=None):
    trigger = {
        'restart-trigger': str(uuid.uuid4()),
    }
    if remote_service:
        trigger['remote-service'] = remote_service
    for rid in relation_ids(rel_name):
        # This subordinate can be related to two seperate services using
        # different subordinate relations so only issue the restart if
        # the principle is conencted down the relation we think it is
        if related_units(relid=rid):
            relation_set(relation_id=rid,
                         relation_settings=trigger,
                         )
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def get_uuid_epoch_stamp(self):
        """Returns a stamp string based on uuid4 and epoch time.  Useful in
        generating test messages which need to be unique-ish."""
        return '[{}-{}]'.format(uuid.uuid4(), time.time())

# amulet juju action helpers:
项目:socialhome    作者:jaywink    | 项目源码 | 文件源码
def setUpTestData(cls):
        super().setUpTestData()
        cls.create_local_and_remote_user()
        cls.local_content = LocalContentFactory()
        cls.local_content2 = LocalContentFactory(guid=str(uuid4()))
        cls.remote_content = PublicContentFactory()
        cls.remote_profile2 = PublicProfileFactory()
项目:socialhome    作者:jaywink    | 项目源码 | 文件源码
def test_does_not_forward_share_if_not_local_content(self, mock_rq):
        entity = base.Share(
            guid=str(uuid4()), handle=self.remote_profile.handle, target_guid=self.remote_content.guid,
            target_handle=self.remote_content.author.handle, public=True,
        )
        process_entity_share(entity, self.remote_profile)
        self.assertFalse(mock_rq.called)
项目:socialhome    作者:jaywink    | 项目源码 | 文件源码
def test_forwards_share_if_local_content(self, mock_rq):
        entity = base.Share(
            guid=str(uuid4()), handle=self.remote_profile.handle, target_guid=self.local_content.guid,
            target_handle=self.local_content.author.handle, public=True,
        )
        process_entity_share(entity, self.remote_profile)
        mock_rq.assert_called_once_with(forward_entity, entity, self.local_content.id)