我们从Python开源项目中,提取了以下15个代码示例,用于说明如何使用enum.unique()。
def job_root(instance, filename=''): """ Return the path of `filename` stored at the root folder of his job `instance`. Parameters ---------- instance : AJob The model instance associated filename : str Original filename Returns -------- str Path to filename which is unique for a job """ return os.path.join(root_job(instance), filename)
def job_data(instance, filename=''): """ Return the path of `filename` stored in a subfolder of the root folder of his job `instance`. Parameters ---------- instance : AJob or ADataFile The model instance associated filename : str Original filename Returns -------- str Path to filename which is unique for a job """ head = root_job(instance.job) if isinstance(instance, ADataFile) else root_job(instance) tail = _user_path(instance.upload_to_data, filename) or os.path.join('data', filename) return os.path.join(head, tail)
def job_results(instance, filename=''): """ Return the path of `filename` stored in a subfolder of the root folder of his job `instance`. Parameters ---------- instance : AJob The model instance associated filename : str Original filename Returns -------- str Path to filename which is unique for a job """ tail = _user_path(instance.upload_to_results, filename) or os.path.join('results', filename) return os.path.join(root_job(instance), tail)
def ensure_index(cls): collection = cls.collection() collection.create_index( [ ("execution_id", generic.SORT_ASC), ("task_type", generic.SORT_ASC) ], name="index_execution_id", unique=True ) collection.create_index( [ ("time.created", generic.SORT_ASC), ("time.started", generic.SORT_ASC), ("time.completed", generic.SORT_ASC), ("time.cancelled", generic.SORT_ASC), ("time.failed", generic.SORT_ASC) ], name="index_time" ) collection.create_index( TTL_FIELDNAME, expireAfterSeconds=0, name="index_task_ttl" )
def get_statuses_by_dir(path, file_statuses): """Sort the file statuses into which directory at the current level they are under. Only keep unique statuses, and return a dictionary of sets for each directory rooted at the given path.""" statuses_by_dir = defaultdict(set) prefix = path + '/' len_prefix = len(prefix) for name, status in file_statuses.items(): if not name.startswith(prefix): continue dirname = name[:name.find('/', len_prefix)] statuses_by_dir[dirname].add(status) return statuses_by_dir # import bprofile # @bprofile.BProfile('test.png')
def func_11(): Months = Enum('Moths', ('Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec') ) for name, member in Months.__members__.items(): print(name, '=>', member, ',', member.value) # value??????????int??????1?? # ?????????????????Enum????????? # unique??????????????????
def create_linter( cls: t.Type['AssignmentLinter'], assignment_id: int, name: str, config: str, ) -> 'AssignmentLinter': """Create a new instance of this class for a given :class:`Assignment` with a given :py:class:`.linters.Linter` :param assignment_id: The id of the assignment :param name: Name of the linter :returns: The created AssignmentLinter """ id = str(uuid.uuid4()) # Find a unique id. while db.session.query( AssignmentLinter.query.filter(cls.id == id).exists() ).scalar(): # pragma: no cover id = str(uuid.uuid4()) self = cls(id=id, assignment_id=assignment_id, name=name) self.config = config self.tests = [] for work in Assignment.query.get(assignment_id ).get_all_latest_submissions(): self.tests.append(LinterInstance(work, self)) return self
def __init__(self, work: Work, tester: AssignmentLinter) -> None: # Find a unique id id = str(uuid.uuid4()) while db.session.query( LinterInstance.query.filter(LinterInstance.id == id).exists() ).scalar(): # pragma: no cover id = str(uuid.uuid4()) self.id = id self.work = work self.tester = tester
def __init__(self, key, delta): """Initialize Digest Frequency. :param key: unique identifier, used in database :param delta: timedelta covered by the frequency, or None (e.g., 7 days) """ self.key = key self.delta = delta
def __init__(self, name, type_, class_): self.key = name.lower() self.name = name self.type = type_ self.class_ = class_ & _CLASS_MASK self.unique = (class_ & _CLASS_UNIQUE) != 0
def to_string(self, hdr, other): """String representation with additional information""" result = "%s[%s,%s" % (hdr, self.get_type(self.type), self.get_class_(self.class_)) if self.unique: result += "-unique," else: result += "," result += self.name if other is not None: result += ",%s]" % other else: result += "]" return result
def write_record(self, record, now): """Writes a record (answer, authoritative answer, additional) to the packet""" if self.state == self.State.finished: return 1 start_data_length, start_size = len(self.data), self.size self.write_name(record.name) self.write_short(record.type) if record.unique and self.multicast: self.write_short(record.class_ | _CLASS_UNIQUE) else: self.write_short(record.class_) if now == 0: self.write_int(record.ttl) else: self.write_int(record.get_remaining_ttl(now)) index = len(self.data) # Adjust size for the short we will write before this record self.size += 2 record.write(self) self.size -= 2 length = sum((len(d) for d in self.data[index:])) # Here is the short we adjusted for self.insert_short(index, length) # if we go over, then rollback and quit if self.size > _MAX_MSG_ABSOLUTE: while len(self.data) > start_data_length: self.data.pop() self.size = start_size self.state = self.State.finished return 1 return 0
def register_service(self, info, ttl=_DNS_TTL, allow_name_change=False): """Registers service information to the network with a default TTL of 60 seconds. Zeroconf will then respond to requests for information for that service. The name of the service may be changed if needed to make it unique on the network.""" self.check_service(info, allow_name_change) self.services[info.name.lower()] = info if info.type in self.servicetypes: self.servicetypes[info.type] += 1 else: self.servicetypes[info.type] = 1 now = current_time_millis() next_time = now i = 0 while i < 3: if now < next_time: self.wait(next_time - now) now = current_time_millis() continue out = DNSOutgoing(_FLAGS_QR_RESPONSE | _FLAGS_AA) out.add_answer_at_time( DNSPointer(info.type, _TYPE_PTR, _CLASS_IN, ttl, info.name), 0) out.add_answer_at_time( DNSService(info.name, _TYPE_SRV, _CLASS_IN, ttl, info.priority, info.weight, info.port, info.server), 0) out.add_answer_at_time( DNSText(info.name, _TYPE_TXT, _CLASS_IN, ttl, info.text), 0) if info.address: out.add_answer_at_time( DNSAddress(info.server, _TYPE_A, _CLASS_IN, ttl, info.address), 0) self.send(out) i += 1 next_time += _REGISTER_TIME
def const_key(obj): # Python implmentation of the C function _PyCode_ConstantKey() # of Python 3.6 obj_type = type(obj) # Note: check obj_type == test_type rather than isinstance(obj, test_type) # to not merge instance of subtypes if (obj is None or obj is Ellipsis or obj_type in {int, bool, bytes, str, types.CodeType}): return (obj_type, obj) if obj_type == float: # all we need is to make the tuple different in either the 0.0 # or -0.0 case from all others, just to avoid the "coercion". if obj == 0.0 and math.copysign(1.0, obj) < 0: return (obj_type, obj, None) else: return (obj_type, obj) if obj_type == complex: # For the complex case we must make complex(x, 0.) # different from complex(x, -0.) and complex(0., y) # different from complex(-0., y), for any x and y. # All four complex zeros must be distinguished. real_negzero = (obj.real == 0.0 and math.copysign(1.0, obj.real) < 0.0) imag_negzero = (obj.imag == 0.0 and math.copysign(1.0, obj.imag) < 0.0) # use True, False and None singleton as tags for the real and imag # sign, to make tuples different if real_negzero and imag_negzero: return (obj_type, obj, True) elif imag_negzero: return (obj_type, obj, False) elif real_negzero: return (obj_type, obj, None) else: return (obj_type, obj) if type(obj) == tuple: key = tuple(const_key(item) for item in obj) return (obj_type, obj, key) if type(obj) == frozenset: key = frozenset(const_key(item) for item in obj) return (obj_type, obj, key) # for other types, use the object identifier as an unique identifier # to ensure that they are seen as unequal. return (obj_type, obj, id(obj))
def check_service(self, info, allow_name_change): """Checks the network for a unique service name, modifying the ServiceInfo passed in if it is not unique.""" # This is kind of funky because of the subtype based tests # need to make subtypes a first class citizen service_name = service_type_name(info.name) if not info.type.endswith(service_name): raise BadTypeInNameException instance_name = info.name[:-len(service_name) - 1] next_instance_number = 2 now = current_time_millis() next_time = now i = 0 while i < 3: # check for a name conflict while self.cache.current_entry_with_name_and_alias( info.type, info.name): if not allow_name_change: raise NonUniqueNameException # change the name and look for a conflict info.name = '%s-%s.%s' % ( instance_name, next_instance_number, info.type) next_instance_number += 1 service_type_name(info.name) next_time = now i = 0 if now < next_time: self.wait(next_time - now) now = current_time_millis() continue out = DNSOutgoing(_FLAGS_QR_QUERY | _FLAGS_AA) self.debug = out out.add_question(DNSQuestion(info.type, _TYPE_PTR, _CLASS_IN)) out.add_authorative_answer(DNSPointer( info.type, _TYPE_PTR, _CLASS_IN, _DNS_TTL, info.name)) self.send(out) i += 1 next_time += _CHECK_TIME