Python uuid 模块,uuid1() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用uuid.uuid1()

项目:serverless-pynamodb-rest-example    作者:helveticafire    | 项目源码 | 文件源码
def test_create(self):
        todo_text = 'text'
        todo_id = str(uuid.uuid1())
        t1 = TodoModel(todo_id=todo_id,
                       text=todo_text,
                       created_at=datetime.now())
        t1.save()
        self.assertEquals(t1.text, todo_text)
        self.assertEquals(t1.checked, False)
        t2 = TodoModel.get(hash_key=todo_id)
        self.assertEquals(t2.text, todo_text)
        r = [result for result in TodoModel.scan()]
        self.assertEquals(len(r), 1)
        self.assertEquals(r[0].text, todo_text)
        self.assertEquals(r[0].todo_id, todo_id)
        t2.delete()
项目:netmiko_tools    作者:ktbyers    | 项目源码 | 文件源码
def test_obtain_netmiko_filename():
    """Test file name and that directory is created."""
    create_dir_test = True
    file_name_test = '/home/gituser/.netmiko/tmp/test_device.txt'
    file_name = obtain_netmiko_filename('test_device')
    assert file_name == file_name_test

    if create_dir_test:
        uuid_str = str(uuid.uuid1())
        junk_dir_base = '/home/gituser/JUNK/netmiko'
        junk_dir = '{}/{}'.format(junk_dir_base, uuid_str)
        base_dir, full_dir = find_netmiko_dir()
        print(base_dir)

        # Move base_dir and recreate it
        if os.path.isdir(base_dir) and os.path.isdir(junk_dir_base):
            shutil.move(src=base_dir, dst=junk_dir)
        assert os.path.exists(base_dir) == False
        assert os.path.exists(full_dir) == False
        file_name = obtain_netmiko_filename('test_device')
        ensure_dir_exists(base_dir)
        ensure_dir_exists(full_dir)
        assert os.path.exists(base_dir) == True
        assert os.path.exists(full_dir) == True
项目:instagram_private_api    作者:ping    | 项目源码 | 文件源码
def generate_uuid(cls, return_hex=False, seed=None):
        """
        Generate uuid

        :param return_hex: Return in hex format
        :param seed: Seed value to generate a consistent uuid
        :return:
        """
        if seed:
            m = hashlib.md5()
            m.update(seed.encode('utf-8'))
            new_uuid = uuid.UUID(m.hexdigest())
        else:
            new_uuid = uuid.uuid1()
        if return_hex:
            return new_uuid.hex
        return str(new_uuid)
项目:weather    作者:awolfly9    | 项目源码 | 文件源码
def __init__(self):
        self.access_key_id = 'LTAIjHdzLIPJXaIZ'
        self.access_key_secret = '6cVfaC47jGhxUAmW3nt14kktGeqvSu'
        self.server_address = 'https://sms.aliyuncs.com'

        self.parameters = {
            'Format': 'JSON',
            'Version': '2016-09-27',
            'AccessKeyId': self.access_key_id,
            'SignatureVersion': '1.0',
            'SignatureMethod': 'HMAC-SHA1',
            'SignatureNonce': str(uuid.uuid1()),
            'Timestamp': time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(time.time()))
        }

        self.user_params = {
            'Action': 'SingleSendSms',
            'ParamString': '',
            'RecNum': '',
            'SignName': '?????',
            'TemplateCode': 'SMS_39185168',
        }
项目:geocoder-ie    作者:devgateway    | 项目源码 | 文件源码
def download(dest_path, url):
    try:
        file_name = url.split('/')[-1]
        path = os.path.realpath(os.path.join(dest_path, unquote_plus(file_name)))
        if not os.path.exists(path):
            f = urlopen(url)
            headers = f.headers['content-type'].split('/')
            md = 'w'
            if 'html' in headers:
                file_name = '{}.html'.format(uuid.uuid1())
            else:
                md = 'wb'
                with open(path, md) as local_file:
                    local_file.write(f.read())

        if os.path.exists(path):
            return path
        else:
            logger.info("Wasn't able to find the file....!")
            return None
    except Exception as error:
        logger.error('download error %s', error)
项目:tie2misp    作者:DCSO    | 项目源码 | 文件源码
def __init__(self, organisation_name, organisation_uuid, threat_level_id, published, info, date):
        dt = datetime.datetime.now()

        if not organisation_name or not organisation_uuid:
            raise ValueError('Organisation Name and UUID must be set')

        if not threat_level_id:
            raise ValueError('Threat Level must be set')

        if not info:
            raise ValueError('Info must be set')

        self.__Info = date.strftime("%Y%m%d ") + info
        self.__PublishTimestamp = dt.strftime("%s")
        self.__Timestamp = dt.strftime("%s")
        self.__Analysis = 2
        self.__Attribute = list()
        self.__Tags = list()
        self.__Published = published
        self.__Orgc = {'name': organisation_name, 'uuid': organisation_uuid}
        self.__Threat_Level_ID = threat_level_id
        self.__UUID = uuid.uuid1()
        self.__Date = dt.strftime("%Y-%m-%d")

    # Getter
项目:xmuBKXK_captcha    作者:smartjinyu    | 项目源码 | 文件源码
def savePositive(imgs, processed, rawImg, captcha):
    """
    save right captcha recognized by Tesseract
    :param imgs: a list of four processed images, with only one digit in each img
    :param processed: full processed img
    :param rawImg: raw image without processing
    :param captcha: result str of the captcha
    :return:
    """
    UUID = uuid.uuid1()
    '''
    for img in imgs:
        filename = savingDir + '/' + captcha[i] + '/' + str(UUID) + '.jpg'
        img.save(filename, 'JPEG')
    '''
    rawFilename = savingDir + '/rawData/' + captcha + '_' + str(UUID) + '.jpg'
    rawImg.save(rawFilename, 'JPEG')
    # processedFilename = savingDir + '/processed/' + captcha + '_' + str(UUID) + '.jpg'
    # processed.save(processedFilename, 'JPEG')
项目:Complete-Bunq-API-Python-Wrapper    作者:PJUllrich    | 项目源码 | 文件源码
def headers(self):
        request_id = str(uuid.uuid1())
        headers = {
            'Cache-Control':            'no-cache',
            'User-Agent':               '%s/%s' % (
                self.__agent_name, self.__agent_version),
            'X-Bunq-Client-Request-Id': request_id,
            'X-Bunq-Geolocation':       '0 0 0 0 NL',
            'X-Bunq-Language':          'en_US',
            'X-Bunq-Region':            'en_US'
        }
        if self.session_token is not None:
            headers['X-Bunq-Client-Authentication'] = self.session_token
        elif self.installation_token is not None:
            headers['X-Bunq-Client-Authentication'] = self.installation_token

        return headers
项目:DCRM    作者:82Flex    | 项目源码 | 文件源码
def write_to_package_job(control, path, callback_version_id):
    # copy to temporary
    """
    This job will be called when any field in .deb file control part
    has been edited.

    :param control: New Control Dict
    :type control: dict
    :param path: Original Package Path
    :type path: str
    :param callback_version_id: Callback Version ID, for callback query
    :type callback_version_id: int
    """
    abs_path = os.path.join(settings.MEDIA_ROOT, path)
    temp_path = os.path.join(settings.TEMP_ROOT, str(uuid.uuid1()) + '.deb')
    shutil.copyfile(abs_path, temp_path)
    # read new package
    temp_package = DebianPackage(temp_path)
    temp_package.control = control
    # save new package
    temp_package.save()
    t_version = Version.objects.get(id=callback_version_id)
    t_version.write_callback(temp_package.path)
项目:DCRM    作者:82Flex    | 项目源码 | 文件源码
def handle_uploaded_file(request):
    """
    :param request: Django Request
    :type request: HttpRequest
    """
    f = request.FILES['package']
    temp_root = settings.TEMP_ROOT
    if not os.path.exists(temp_root):
        mkdir_p(temp_root)
    package_temp_path = os.path.join(temp_root, str(uuid.uuid1()) + '.deb')
    with open(package_temp_path, 'wb+') as destination:
        for chunk in f.chunks():
            destination.write(chunk)
    os.chmod(package_temp_path, 0755)
    if settings.ENABLE_REDIS is True:
        queue = django_rq.get_queue('high')
        return queue.enqueue(handle_uploaded_package, package_temp_path)
    else:
        return handle_uploaded_package(package_temp_path)
项目:normandy-e2e-tests    作者:18epedersen    | 项目源码 | 文件源码
def save_recipe(self, conf):
        """Save recipe with a unique UUID."""
        """Return a recipe page, recipe name, and notification's texts."""
        recipe_additional_filters = conf.get('recipe',
                                             'recipe_additional_filters')
        recipe_action = conf.get('recipe', 'recipe_action')
        recipe_name = str(uuid.uuid1().hex)
        name_field = self.wait.until(EC.element_to_be_clickable(
          self.LOCATORS.name))
        name_field.clear()
        name_field.send_keys(recipe_name)
        self.find_element(*self.LOCATORS.filter_textbox).send_keys(
          recipe_additional_filters)
        self.action_configuration(conf, recipe_action)
        save_new_recipe_button = self.wait.until(EC.element_to_be_clickable(
          self.LOCATORS.save))
        save_new_recipe_button.click()
        messages_list = self.message_alert_helper()
        recipe_page = Recipe(self.selenium, self.base_url)
        return recipe_page.wait_for_request_button(), recipe_name, messages_list # noqa
项目:incubator-milagro-mfa-server    作者:apache    | 项目源码 | 文件源码
def __init__(self, storage, expire_time, **kwargs):
        '''expireTime should be in ISO format'''
        self.__fields = ["_id", "_active", "_expires"]
        self.__storage = storage

        self._id = uuid.uuid1().hex

        if isinstance(expire_time, datetime.datetime):
            self._expires = expire_time.isoformat()
        else:
            self._expires = expire_time

        self._update_item(**kwargs)

        self._expiration_datetime = None
        if self._expires:
            self._expiration_datetime = Time.ISOtoDateTime(self._expires)

        self.__storage.update_index(self)
项目:shirleytoolate    作者:trobanga    | 项目源码 | 文件源码
def on_save(self, button):
        """
        Save and exit.
        """


        vcal = icalendar.Calendar()
        event = icalendar.Event()
        event['uid'] = self.calendar + str(uuid.uuid1())
        # event['dtstart'] = self._datetime_to_ical(datetime.datetime(2016,5,15))
        # event['dtend'] = self._datetime_to_ical(datetime.datetime(2016,5,15))
        event['summary'] = self.msg.edit_text
        vcal.add_component(event)       


        logging.debug("EventWidget:on_save:vcal: {}".format(vcal.to_ical()))
        r = self.server.add_event(vcal.to_ical(), self.calendar)
        logging.debug("EventWidget:on_save:add_event: {}".format(r))
        raise urwid.ExitMainLoop()
项目:fabric8-analytics-stack-analysis    作者:fabric8-analytics    | 项目源码 | 文件源码
def submit_kronos_evaluation():
    app.logger.info("Submitting the evaluation job")
    response = {
        "status_description": "Failed to load model, Kronos Region not available"}

    if not app.scoring_status:
        return flask.jsonify(response)

    result_id = str(uuid1())
    input_json = request.get_json()
    training_data_url = input_json.get("training_data_url")
    response = submit_evaluation_job(input_bootstrap_file='/uranus_bootstrap_action.sh',
                                     input_src_code_file='/tmp/testing.zip',
                                     training_url=training_data_url,
                                     result_id=result_id)
    response["evaluation_S3_result_id"] = result_id
    return flask.jsonify(response)
项目:logistic-regression-sgd-mapreduce    作者:elsevierlabs-os    | 项目源码 | 文件源码
def main(separator='\t'):
    id = str(uuid.uuid1())
    date_created = datetime.datetime.utcnow().isoformat() + 'Z'
    mu = float(os.environ['MU']) if os.environ.has_key('MU') else 0.002
    eta = float(os.environ['ETA']) if os.environ.has_key('ETA') else 0.5
    n_models_key = os.environ['N_MODELS_KEY'] if os.environ.has_key('N_MODELS_KEY') else 'MODEL'
    T = os.environ['T'] if os.environ.has_key('T') else 1
    parameters = {}
    for line in sys.stdin:
        (feature, sigma) = line.strip().split(separator)
        parameters[feature] = float(sigma)
    n_models = float(parameters[n_models_key])
    for f, sigma in parameters.items():
        parameters[f] = parameters[f] / n_models
    del parameters[n_models_key]
    print json.dumps({
        "id": id,
        "date_created": date_created,
        "models": n_models,
        "mu": mu,
        "eta": eta,
        "T": T,
        "parameters": parameters
        })
项目:makiki    作者:faith0811    | 项目源码 | 文件源码
def __init__(
            self, module_name, func_name, args=None, kwargs=None,
            countdown=0, send_after_commit=False,
            apply_queue='queue', extra_celery_kwargs=None,
    ):
        mod = importlib.import_module(module_name)
        if not hasattr(mod, func_name):
            raise ValueError('Invalid API Endpoint is provided.')
        self.task_id = uuid.uuid1().hex
        self.module_name = module_name
        self.func_name = func_name
        self.args = args if args is not None else ()
        self.kwargs = kwargs if kwargs is not None else {}
        self.countdown = countdown if countdown >= 0 else 0
        self.send_after_commit = bool(send_after_commit)
        self.extra_celery_kwargs = extra_celery_kwargs if extra_celery_kwargs is not None else {}
        self.apply_queue = apply_queue
项目:flowder    作者:amir-khakshour    | 项目源码 | 文件源码
def process_in_message(self, message):
        if 'fetch_uri' not in message:
            raise InvalidAMQPMessage('Given message has no fetch_uri value.')

        message_copy = message.copy()
        jobid = uuid.uuid1().hex
        _message = dict()
        _message['job_id'] = jobid
        _message['fetch_uri'] = message_copy.pop('fetch_uri')

        # Encode additional fields to send back to client on fetch
        try:
            _message['settings'] = amqp_message_encode(message_copy)
        except:
            # @TODO define a more specific exception handling here
            raise EncodingError("Can't encode message info. %s" % message)

        self.scheduler.schedule(_message)
        self.signal_manager.send_catch_log(signal=signals.request_received, jobid=jobid)
项目:noopy    作者:acuros    | 项目源码 | 文件源码
def add_permision(self):
        lambda_client = boto3.client('lambda')
        source_arn = 'arn:aws:execute-api:{}:{}:{}/*/*/*'.format(
            self.client._client_config.region_name,
            settings.ACCOUNT_ID,
            self.api_id
        )
        try:
            lambda_client.add_permission(
                FunctionName=self.function_arn,
                StatementId=str(uuid.uuid1()),
                Action='lambda:InvokeFunction',
                Principal='apigateway.amazonaws.com',
                SourceArn=source_arn
            )
        except ClientError:
            pass
项目:miracle    作者:mozilla    | 项目源码 | 文件源码
def upload(processor, data):
    user_token = data['user']
    today = date.today()
    blob = json.dumps(data, separators=(',', ':')).encode('utf-8')
    blob = gzip.compress(blob, 7)
    name = 'v2/sessions/%s/%s/%s/%s.json.gz' % (
        today.year, today.month, user_token, uuid.uuid1().hex)
    try:
        processor.bucket.put(
            name, blob,
            ContentEncoding='gzip',
            ContentType='application/json')
    except ClientError:  # pragma: no cover
        processor.raven.captureException()
        return False

    return True
项目:shelfdb    作者:nitipit    | 项目源码 | 文件源码
def insert(self, entry):
        """Insert an entry. Automatic generate **uuid1** as entry's ID.

        Args:
            ``entry`` (dict): Entry to be inserted.

        Return:
            ``string``: Entry's UUID.
        """

        # Must generate uuid1 here, since id_=str(uuid.uuid1()) in def args
        # will return the same value all the times after first call.
        id_ = str(uuid.uuid1())
        if isinstance(entry, dict):
            self._shelf[id_] = entry
            return id_
        else:
            raise Exception('Entry is not a dict object')
项目:StructEngPy    作者:zhuoju36    | 项目源码 | 文件源码
def __init__(self,mat:Material.material,A,J,I33,I22,W33,W22,name=None):
        """
        mat: material
        A: area
        J: Torsional constant
        I33,I22: Iteria momentum
        W33,W22: Bending modulus
        """        
        self.__mat=mat
        self.__A=A
        self.__J=J
        self.__I33=I33
        self.__I22=I22
        self.__W33=W33
        self.__W22=W22
        self.__name=uuid.uuid1() if name==None else name
项目:StructEngPy    作者:zhuoju36    | 项目源码 | 文件源码
def __init__(self,origin, pt1, pt2, name=None):
        """
        origin: 3x1 vector
        pt1: 3x1 vector
        pt2: 3x1 vector
        """
        self.__origin=origin    
        vec1 = np.array([pt1[0] - origin[0] , pt1[1] - origin[1] , pt1[2] - origin[2]])
        vec2 = np.array([pt2[0] - origin[0] , pt2[1] - origin[1] , pt2[2] - origin[2]])
        cos = np.dot(vec1, vec2)/np.linalg.norm(vec1)/np.linalg.norm(vec2)
        if  cos == 1 or cos == -1:
            raise Exception("Three points should not in a line!!")        
        self.__x = vec1/np.linalg.norm(vec1)
        z = np.cross(vec1, vec2)
        self.__z = z/np.linalg.norm(z)
        self.__y = np.cross(self.z, self.x)
        self.__name=uuid.uuid1() if name==None else name
项目:StructEngPy    作者:zhuoju36    | 项目源码 | 文件源码
def __init__(self,x,y,z,name=None):
        self.__x=x
        self.__y=y
        self.__z=z
        o=[x,y,z]
        pt1=[x+1,y,z]
        pt2=[x,y+1,z]
        self.__local_csys=CoordinateSystem.cartisian(o,pt1,pt2)
        self.__restraint=[False]*6
        self.__load=[0]*6
        self.__disp=[0]*6
        self.__name=uuid.uuid1() if name==None else name
        self.__hid=None #hidden id

        #results
        self.__res_disp=None
        self.__res_force=None
项目:py-restfmclient    作者:pcdummy    | 项目源码 | 文件源码
def setUp(self):
        super(TestCase, self).setUp()

        # We use Yakutsk, Russia timezone to check if they convert right
        # with the local one
        self.other_timezone = pytz.timezone('Asia/Yakutsk')
        if get_localzone() == self.other_timezone:
            self.other_timezone = pytz.timezone('Europe/Vienna')

        self.client = Client(None, 'http://localhost').rest_client
        self.client.timezone = self.other_timezone

        self.a_date = datetime.datetime(
            1986, 3, 6, 10, 28, 47,
            tzinfo=pytz.UTC,
        ).astimezone(pytz.timezone('Europe/Vienna'))\
         .astimezone(get_localzone())

        self.a_uuid = uuid.uuid1()
项目:pysoa    作者:eventbrite    | 项目源码 | 文件源码
def _make_context_header(
        self,
        switches=None,
        correlation_id=None,
        context_extra=None,
    ):
        # Copy the underlying context object, if it was provided
        context = dict(self.context.items()) if self.context else {}
        # Either add on, reuse or generate a correlation ID
        if correlation_id is not None:
            context['correlation_id'] = correlation_id
        elif 'correlation_id' not in context:
            context['correlation_id'] = six.u(uuid.uuid1().hex)
        # Switches can come from three different places, so merge them
        # and ensure that they are unique
        switches = set(switches or [])
        if context_extra:
            switches |= set(context_extra.pop('switches', []))
        context['switches'] = list(set(context.get('switches', [])) | switches)
        # Add any extra stuff
        if context_extra:
            context.update(context_extra)
        return context
项目:pyucwa    作者:tonybaloney    | 项目源码 | 文件源码
def oauth_post_request(uri, oauth_token, origin, msg):
    uid = uuid.uuid1()
    headers = {
        'Authorization': 'Bearer %s' % oauth_token,
        'Origin': origin,
        'Client-Request-Id': 'WebSDK/%s' % uid,
        'Accept': 'application/json',
        'Content-Type': 'application/json',
        'X-MS-Correlation-Id': uid,
        'X-Ms-Namespace': 'internal',
        'X-Ms-SDK-Instance': USER_AGENT,
        'Referer': origin + '/'
    }
    response = requests.post(uri, data=json.dumps(msg), headers=headers, verify=False)
    response.raise_for_status()
    if response.text != '':
        return response.json()
    else:
        return {}
项目:pyucwa    作者:tonybaloney    | 项目源码 | 文件源码
def oauth_post_text_request(uri, oauth_token, origin, data):
    uid = uuid.uuid1()
    headers = {
        'Authorization': 'Bearer %s' % oauth_token,
        'Origin': origin,
        'Client-Request-Id': 'WebSDK/%s' % uid,
        'Accept': 'application/json',
        'Content-Type': 'text/plain',
        'X-MS-Correlation-Id': uid,
        'X-Ms-Namespace': 'internal',
        'X-Ms-SDK-Instance': USER_AGENT,
        'Referer': origin + '/'
    }
    response = requests.post(uri, data=data, headers=headers, verify=False)
    response.raise_for_status()
    if response.text != '':
        return response.json()
    else:
        return {}
项目:pyucwa    作者:tonybaloney    | 项目源码 | 文件源码
def oauth_stream_request(uri, oauth_token, origin):
    uid = uuid.uuid1()
    headers = {
        'Authorization': 'Bearer %s' % oauth_token,
        'Origin': origin,
        'Client-Request-Id': 'WebSDK/%s' % uid,
        'Accept': 'application/json',
        'X-Requested-With': 'XMLHttpRequest',
        'X-MS-Correlation-Id': uid,
        'X-Ms-Namespace': 'internal',
        'X-Ms-SDK-Instance': USER_AGENT,
        'Referer': origin + '/'
    }
    response = requests.get(uri, headers=headers, stream=True, verify=False)
    response.raise_for_status()
    return response
项目:pyucwa    作者:tonybaloney    | 项目源码 | 文件源码
def oauth_request(uri, oauth_token, origin):
    uid = uuid.uuid1()
    headers = {
        'Authorization': 'Bearer %s' % oauth_token,
        'Origin': origin,
        'Client-Request-Id': 'WebSDK/%s' % uid,
        'Accept': 'application/json',
        'X-Requested-With': 'XMLHttpRequest',
        'X-MS-Correlation-Id': uid,
        'X-Ms-Namespace': 'internal',
        'X-Ms-SDK-Instance': USER_AGENT,
        'Referer': origin + '/'
    }
    response = requests.get(uri, headers=headers, verify=False)
    response.raise_for_status()
    return response.json()
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def __init__(self, api_version=1, request_id=None):
        self.api_version = api_version
        if request_id:
            self.request_id = request_id
        else:
            self.request_id = str(uuid.uuid1())
        self.ops = []
项目:flora    作者:Lamden    | 项目源码 | 文件源码
def add_package(self, owner, package, template, example):
        self.prepare_execute_return("INSERT INTO public.contracts (id, owner, package, template, example) \
            VALUES (?, ?, ?, ?, ?) IF NOT EXISTS", (uuid.uuid1(), owner, package, template, example))
        return self.check_package(owner, package)
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def __init__(self, api_version=1, request_id=None):
        self.api_version = api_version
        if request_id:
            self.request_id = request_id
        else:
            self.request_id = str(uuid.uuid1())
        self.ops = []
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def __init__(self, api_version=1, request_id=None):
        self.api_version = api_version
        if request_id:
            self.request_id = request_id
        else:
            self.request_id = str(uuid.uuid1())
        self.ops = []
项目:gimel    作者:Alephbet    | 项目源码 | 文件源码
def acquire(self, blocking=None, blocking_timeout=None):
        """
        Use Redis to hold a shared, distributed lock named ``name``.
        Returns True once the lock is acquired.

        If ``blocking`` is False, always return immediately. If the lock
        was acquired, return True, otherwise return False.

        ``blocking_timeout`` specifies the maximum number of seconds to
        wait trying to acquire the lock.
        """
        sleep = self.sleep
        token = b(uuid.uuid1().hex)
        if blocking is None:
            blocking = self.blocking
        if blocking_timeout is None:
            blocking_timeout = self.blocking_timeout
        stop_trying_at = None
        if blocking_timeout is not None:
            stop_trying_at = mod_time.time() + blocking_timeout
        while 1:
            if self.do_acquire(token):
                self.local.token = token
                return True
            if not blocking:
                return False
            if stop_trying_at is not None and mod_time.time() > stop_trying_at:
                return False
            mod_time.sleep(sleep)
项目:data_pipeline    作者:Yelp    | 项目源码 | 文件源码
def uuid1(self):
        raise NotImplementedError()
项目:data_pipeline    作者:Yelp    | 项目源码 | 文件源码
def uuid1(self):
        _LibUUID._libuuid.uuid_generate_time(self.output)
        return self._get_output_bytes()
项目:data_pipeline    作者:Yelp    | 项目源码 | 文件源码
def uuid1(self):
        return uuid.uuid1().bytes
项目:data_pipeline    作者:Yelp    | 项目源码 | 文件源码
def uuid1(self):
        """Generates a uuid1 - a device specific uuid

        Returns:
            bytes: 16-byte uuid
        """
        return self._uuid_in_use.uuid1()
项目:pythonista-scripts    作者:lukaskollmer    | 项目源码 | 文件源码
def open_url(url, handler):
    global _handler
    global _requestID
    _requestID = uuid.uuid1()
    _handler = handler
    url_with_uuid = url + 'xcallbackresponse-' + str(_requestID)
    webbrowser.open(url_with_uuid)
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def test_times_from_uuid1(self):
        node = uuid.getnode()
        now = time.time()
        u = uuid.uuid1(node, 0)

        t = util.unix_time_from_uuid1(u)
        self.assertAlmostEqual(now, t, 2)

        dt = util.datetime_from_uuid1(u)
        t = calendar.timegm(dt.timetuple()) + dt.microsecond / 1e6
        self.assertAlmostEqual(now, t, 2)
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def test_model_over_write(self):
        """
        Test to ensure overwriting of primary keys in model inheritance is allowed

        This is currently only an issue in PyPy. When PYTHON-504 is introduced this should
        be updated error out and warn the user

        @since 3.6.0
        @jira_ticket PYTHON-576
        @expected_result primary keys can be overwritten via inheritance

        @test_category object_mapper
        """
        class TimeModelBase(Model):
            uuid = columns.TimeUUID(primary_key=True)

        class DerivedTimeModel(TimeModelBase):
            __table_name__ = 'derived_time'
            uuid = columns.TimeUUID(primary_key=True, partition_key=True)
            value = columns.Text(required=False)

        # In case the table already exists in keyspace
        drop_table(DerivedTimeModel)

        sync_table(DerivedTimeModel)
        uuid_value = uuid1()
        uuid_value2 = uuid1()
        DerivedTimeModel.create(uuid=uuid_value, value="first")
        DerivedTimeModel.create(uuid=uuid_value2, value="second")
        DerivedTimeModel.objects.filter(uuid=uuid_value)
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def test_composite(self):
        # double bool
        insert, select = self.create_prepare(('double', 'boolean'))
        self.insert_select_token(insert, select, (3.1459, True))
        self.insert_select_token(insert, select, (1.21e9, False))

        # uuid string int
        insert, select = self.create_prepare(('timeuuid', 'varchar', 'int'))
        self.insert_select_token(insert, select, (uuid1(), 'asdf', 400))
        self.insert_select_token(insert, select, (uuid1(), 'fdsa', -1))
项目:nukeuuid    作者:florianeinfalt    | 项目源码 | 文件源码
def set_uuid(nodes, **kwargs):
    """
    Given a list of ``nodes`` and a set of keyword arguments ``kwargs``,
    set UUID(s) on ``nodes``.

    :param nodes: Nodes
    :type nodes: list
    :param \**kwargs: UUID dictionary

    Usage:

    >>> import nukeuuid
    >>> kw = {'': 'fca7201e-b53d-4918-9ab0-bb4ec5590f3c',
              'utility': '5f2d525d-3e00-4bc5-88c4-794ad87f5699'}
    >>> nukeuuid.set_uuid(nuke.selectedNodes(), **kw)
    """
    if not isinstance(nodes, list):
        nodes = [nodes]
    for node in nodes:
        for type_, uuid_ in kwargs.iteritems():
            if not type_ and not uuid_:
                uuid_ = str(uuid.uuid1())
            attr = _convert_type(type_)
            _make_attr(node, attr)
            node[attr].setValue(uuid_)
            node[attr].setEnabled(False)
项目:serverless-pynamodb-rest-example    作者:helveticafire    | 项目源码 | 文件源码
def setUp(self, load_dbs=None):
        TodoModel.setup_model(TodoModel, 'region', 'todo' + str(uuid.uuid1()), 'ENVIRONMENT' not in os.environ)
        if not TodoModel.exists():
            TodoModel.create_table(wait=True)
        if load_dbs:
            for db_file in load_dbs:
                TodoModel.load(db_file)
        super().setUp()
项目:serverless-pynamodb-rest-example    作者:helveticafire    | 项目源码 | 文件源码
def test_create(self):
        todo_text = 'text'
        todo_id = str(uuid.uuid1())
        t1 = TodoModel(todo_id=todo_id,
                       text=todo_text,
                       created_at=datetime.now())
        t1.save()
        self.assertEquals(t1.text, todo_text)
        self.assertEquals(t1.checked, False)

        # get and check
        t2 = TodoModel.get(todo_id)
        self.assertEquals(t2.text, todo_text)
项目:serverless-pynamodb-rest-example    作者:helveticafire    | 项目源码 | 文件源码
def handle(event, context):
    try:
        table_name = os.environ[ENV_VAR_DYNAMODB_TABLE]
        region = os.environ[ENV_VAR_DYNAMODB_REGION]
    except KeyError as err:
        error_message = '{0} is missing from environment variables'.format(str(err))
        return HttpResponseServerError(error_code='ENV_VAR_NOT_SET',
                                       error_message=error_message).__dict__()

    TodoModel.setup_model(TodoModel, region, table_name, ENV_VAR_ENVIRONMENT not in os.environ)

    try:
        data = json.loads(event['body'])
    except ValueError as err:
        return HttpResponseBadRequest(error_code='JSON_IRREGULAR',
                                      error_message=str(err)).__dict__()

    if 'text' not in data:
        logging.error('Validation Failed')
        return HttpResponseBadRequest(error_code='BODY_PROPERTY_MISSING',
                                      error_message='Could not create the todo item.').__dict__()

    if not data['text']:
        logging.error('Validation Failed - text was empty. %s', data)
        return HttpResponseBadRequest(error_code='VALIDATION_FAILED',
                                      error_message='Could not create the todo item. As text was empty.').__dict__()

    a_todo = TodoModel(todo_id=str(uuid.uuid1()),
                       text=data['text'],
                       checked=False,
                       created_at=datetime.now())

    # write the todo to the database
    a_todo.save()

    # create a response
    return HttpCreatedJSONResponse(body=dict(a_todo)).__dict__()
项目:Good-Old-Kott    作者:mostafa    | 项目源码 | 文件源码
def set(self, data, **kwargs):
        key = uuid.uuid1().hex
        self.__mem__[key] = None
        return self.__do_set__(key, data, **kwargs)
项目:django-functest    作者:django-functest    | 项目源码 | 文件源码
def new_browser_session_test(request):
    if 'UID' in request.session:
        uid = request.session['UID']
        message = "Welcome back"
    else:
        uid = uuid.uuid1()
        request.session['UID'] = str(uid)
        message = "Hello new user"

    return render(request, "django_functest/tests/new_browser_session_test.html",
                  {'uid': uid,
                   'message': message,
                   })
项目:charm-heat    作者:openstack    | 项目源码 | 文件源码
def __init__(self, api_version=1, request_id=None):
        self.api_version = api_version
        if request_id:
            self.request_id = request_id
        else:
            self.request_id = str(uuid.uuid1())
        self.ops = []
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def __init__(self, api_version=1, request_id=None):
        self.api_version = api_version
        if request_id:
            self.request_id = request_id
        else:
            self.request_id = str(uuid.uuid1())
        self.ops = []