我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sys.intern()。
def load_build(self): stack = self.stack state = stack.pop() inst = stack[-1] setstate = getattr(inst, "__setstate__", None) if setstate: setstate(state) return slotstate = None if isinstance(state, tuple) and len(state) == 2: state, slotstate = state if state: inst_dict = inst.__dict__ intern = sys.intern for k, v in state.items(): if type(k) is str: inst_dict[intern(k)] = v else: inst_dict[k] = v if slotstate: for k, v in slotstate.items(): setattr(inst, k, v)
def test_intern(self): global numruns numruns += 1 self.assertRaises(TypeError, sys.intern) s = "never interned before" + str(numruns) self.assertTrue(sys.intern(s) is s) s2 = s.swapcase().swapcase() self.assertTrue(sys.intern(s2) is s) # Subclasses of string can't be interned, because they # provide too much opportunity for insane things to happen. # We don't want them in the interned dict and if they aren't # actually interned, we don't want to create the appearance # that they are by allowing intern() to succeed. class S(str): def __hash__(self): return 123 self.assertRaises(TypeError, sys.intern, S("abc"))
def doUnsubscribe(securities): try: _, sessionRestarted = openBloombergService(app.sessionForSubscriptions, "//blp/mktdata") if sessionRestarted: app.allSubscriptions = {} subscriptionList = blpapi.SubscriptionList() for security in securities: correlationId = blpapi.CorrelationId(sys.intern(security)) if security in app.allSubscriptions: del app.allSubscriptions[security] subscriptionList.add(security, correlationId=correlationId) app.sessionForSubscriptions.unsubscribe(subscriptionList) except Exception as e: handleBrokenSession(app, e) traceback.print_exc() return respond500(e) response = Response( json.dumps({ "message": "OK"}).encode(), status=202, mimetype='application/json') response.headers['Access-Control-Allow-Origin'] = allowCORS(request.headers.get('Origin')) return response
def silent_intern(x): """ Perform sys.intern() on the passed argument and return the result. If the input is ineligible (e.g. a unicode string) the original argument is returned and no exception is thrown. """ try: return sys.intern(x) except TypeError: return x # From Dinu C. Gherman, # Python Cookbook, second edition, recipe 6.17, p. 277. # Also: # http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/68205 # ASPN: Python Cookbook: Null Object Design Pattern #TODO??? class Null(object):
def load_build(self): stack = self.stack state = stack.pop() inst = stack[-1] setstate = getattr(inst, "__setstate__", None) if setstate is not None: setstate(state) return slotstate = None if isinstance(state, tuple) and len(state) == 2: state, slotstate = state if state: inst_dict = inst.__dict__ intern = sys.intern for k, v in state.items(): if type(k) is str: inst_dict[intern(k)] = v else: inst_dict[k] = v if slotstate: for k, v in slotstate.items(): setattr(inst, k, v)
def load(_, fd): ''' Read from file object to enable reading from arbitrary data source and position. returns global config ''' title = None; section = dd() # this dict initializes any missing value with an empty list TODO make ordered default dict, but couldn't find any compatible solution for line in xreadlines(fd): line = line.strip() # just in case of incorrect formatting if line.startswith('['): # new section detected: first store last section if len(section) > 0: _.sections[title] = section # OLD: and title is not None title = line[1:-1]; section = dd() # default-dictionary of (standard) type map elif line != '': try: idx = line.index('=') # try to parse key, value = line[:idx], line[idx+1:] # HINT: was .lower() and .rstrip(), but when reading/writing only via this script's API there's no need for that if key in [TAG, FROM] and value != '': section[intern(key)].append(intern(value)) # this dict allows several values per key elif key in [IGNORE, SKIP] and key not in section: section[intern(key)] = None # only keep key instead of default-appending empty string elif title == "" and key in [SKIPD, IGNORED, GLOBAL] and value != '': section[intern(key)].append(intern(value)) # global dir skip or ignore pattern, or global config setting else: warn("Encountered illegal key <%s>. Skipping entry." % key) except: warn("Key with no value for illegal key %s" % repr(line)) else: break # an empty line terminates file if len(section) > 0: _.sections[title] = section # last store OLD: and title is not None return { k.lower(): v if v.lower() not in ("true", "false") else v.lower().strip() == "true" for k, v in (wrapExc(lambda: kv.split("=")[:2], lambda: (kv, None)) for kv in _.sections.get("", {}).get(GLOBAL, [])) } # return global config map for convenience
def post_message(user: User, text: str, timestamp: Optional[Timestamp]=None) -> None: user = intern(user) timestamp = timestamp or time() post = Post(timestamp, user, text) posts.appendleft(post) user_posts[user].appendleft(post)
def follow(user: User, followed_user: User) -> None: user, followed_user = intern(user), intern(followed_user) following[user].add(followed_user) followers[followed_user].add(user)
def set_user(user: User, displayname: str, email: str, password: str, bio: Optional[str]=None, photo: Optional[str]=None) -> None: user = intern(user) hashed_password = hash_password(password) user_info[user] = UserInfo(displayname, email, hashed_password, bio, photo)
def __init__(self, opt, data_loader=None, cands=None, shared=None, **kwargs): # self.data is a list of episodes # each episode is a tuple of entries # each entry is a tuple of values for the action/observation table if shared: self.image_loader = shared.get('image_loader', None) self.data = shared.get('data', []) self.cands = shared.get('cands', None) else: self.image_loader = ImageLoader(opt) self.data = [] self._load(data_loader, opt['datafile']) self.cands = None if cands == None else set(sys.intern(c) for c in cands) self.addedCands = [] self.copied_cands = False
def __setitem__(self, key, value): """If key is in table, update it. Otherwise, extend the array to make room. This uses additive resizing not multiplicative, since the number of keys is not likely to change frequently during a run, so do not abuse it. Raises an error if you try to change the type of the value stored for that key--if you need to do this, you must delete the key first. """ val_type = type(value) if 'Tensor' in str(val_type): self.tensors[key] = value return if val_type not in self.types: raise TypeError('SharedTable does not support type ' + str(type(value))) if val_type == str: value = sys.intern(value) if key in self.idx: idx, typ = self.idx[key] if typ != val_type: raise TypeError(('Cannot change stored type for {key} from ' + '{v1} to {v2}. You need to del the key first' + ' if you need to change value types.' ).format(key=key, v1=typ, v2=val_type)) self.arrays[typ][idx] = value else: raise KeyError('Cannot add more keys to the shared table as ' 'they will not be synced across processes.')
def __init__( self: 'Property', name: Optional[str], value: _Prop_Value, ): """Create a new property instance. """ if name is None: self._folded_name = self.real_name = None # type: Optional[str] else: self.real_name = sys.intern(name) # type: Optional[str] self._folded_name = sys.intern(name.casefold()) # type: Optional[str] self.value = value # type: _Prop_Value
def name(self, new_name): if new_name is None: self._folded_name = self.real_name = None else: # Intern names to help reduce duplicates in memory. self.real_name = sys.intern(new_name) self._folded_name = sys.intern(new_name.casefold())
def parse_parts(self, parts): if six.PY2: parts = _py2_fsencode(parts) parsed = [] sep = self.sep altsep = self.altsep drv = root = '' it = reversed(parts) for part in it: if not part: continue if altsep: part = part.replace(altsep, sep) drv, root, rel = self.splitroot(part) if sep in rel: for x in reversed(rel.split(sep)): if x and x != '.': parsed.append(intern(x)) else: if rel and rel != '.': parsed.append(intern(rel)) if drv or root: if not drv: # If no drive is present, try to find one in the previous # parts. This makes the result of parsing e.g. # ("C:", "/", "a") reasonably intuitive. for part in it: if not part: continue if altsep: part = part.replace(altsep, sep) drv = self.splitroot(part)[0] if drv: break break if drv or root: parsed.append(drv + root) parsed.reverse() return drv, root, parsed
def __init__(self, kind, kids=[]): self.kind = intern(kind) UserList.__init__(self, kids)
def internSet(self, items): new = [] for i in items: new.append(sys.intern(i)) s = frozenset(new) h = hash(s) if h in self.setcache: return self.setcache[h] self.setcache[h] = s return s
def test_prefix_preservation(self): b = """x = intern( a )""" a = """import sys\nx = sys.intern( a )""" self.check(b, a) b = """y = intern("b" # test )""" a = """import sys\ny = sys.intern("b" # test )""" self.check(b, a) b = """z = intern(a+b+c.d, )""" a = """import sys\nz = sys.intern(a+b+c.d, )""" self.check(b, a)
def test(self): b = """x = intern(a)""" a = """import sys\nx = sys.intern(a)""" self.check(b, a) b = """z = intern(a+b+c.d,)""" a = """import sys\nz = sys.intern(a+b+c.d,)""" self.check(b, a) b = """intern("y%s" % 5).replace("y", "")""" a = """import sys\nsys.intern("y%s" % 5).replace("y", "")""" self.check(b, a) # These should not be refactored
def test_unchanged(self): s = """intern(a=1)""" self.unchanged(s) s = """intern(f, g)""" self.unchanged(s) s = """intern(*h)""" self.unchanged(s) s = """intern(**i)""" self.unchanged(s) s = """intern()""" self.unchanged(s)
def parse_parts(self, parts): if _py2: parts = _py2_fsencode(parts) parsed = [] sep = self.sep altsep = self.altsep drv = root = '' it = reversed(parts) for part in it: if not part: continue if altsep: part = part.replace(altsep, sep) drv, root, rel = self.splitroot(part) if sep in rel: for x in reversed(rel.split(sep)): if x and x != '.': parsed.append(intern(x)) else: if rel and rel != '.': parsed.append(intern(rel)) if drv or root: if not drv: # If no drive is present, try to find one in the previous # parts. This makes the result of parsing e.g. # ("C:", "/", "a") reasonably intuitive. for part in it: drv = self.splitroot(part)[0] if drv: break break if drv or root: parsed.append(drv + root) parsed.reverse() return drv, root, parsed
def test_sys_intern(self): """ Py2's builtin intern() has been moved to the sys module. Tests whether sys.intern is available. """ from sys import intern if utils.PY3: self.assertEqual(intern('hello'), 'hello') else: # intern() requires byte-strings on Py2: self.assertEqual(intern(b'hello'), b'hello')
def test_install_aliases(self): """ Does the install_aliases() interface monkey-patch urllib etc. successfully? """ from future.standard_library import remove_hooks, install_aliases remove_hooks() install_aliases() from collections import Counter, OrderedDict # backported to Py2.6 from collections import UserDict, UserList, UserString # Requires Python dbm support: # import dbm # import dbm.dumb # import dbm.gnu # import dbm.ndbm from itertools import filterfalse, zip_longest from subprocess import check_output # backported to Py2.6 from subprocess import getoutput, getstatusoutput from sys import intern # test_support may not be available (e.g. on Anaconda Py2.6): # import test.support import urllib.error import urllib.parse import urllib.request import urllib.response import urllib.robotparser self.assertTrue('urlopen' in dir(urllib.request))
def _validate_event(self, event): event = intern(event) if event not in self._events: raise ExtensionError('Unknown event name: %s' % event)
def __init__(self, splitter: DocumentSplitter, para_filter: Optional[ParagraphFilter], text_preprocess: Optional[TextPreprocessor], intern, require_answer=True): self.splitter = splitter self.para_filter = para_filter self.text_preprocess = text_preprocess self.intern = intern self.require_answer = require_answer
def finalize_chunk(self, x: FilteredData): if self.intern: question_map = {} for q in x.data: q.question_id = sys.intern(q.question_id) if q.question_id in question_map: q.question = question_map[q.question_id] else: q.question = tuple(sys.intern(w) for w in q.question) question_map[q.question_id] = q.question q.doc_id = sys.intern(q.doc_id) q.context = [sys.intern(w) for w in q.context]
def intern_mutli_question(questions): for q in questions: q.question = [sys.intern(x) for x in q.question] for para in q.paragraphs: para.doc_id = sys.intern(para.doc_id) para.text = [sys.intern(x) for x in para.text]
def __init__(self, splitter: DocumentSplitter, ranker: Optional[ParagraphFilter], text_process: Optional[TextPreprocessor], intern: bool=False, require_an_answer=True): self.intern = intern self.splitter = splitter self.ranker = ranker self.text_process = text_process self.require_an_answer = require_an_answer
def finalize_chunk(self, q: FilteredData): if self.intern: intern_mutli_question(q.data)
def __init__(self, name, directory, fs): """Initialize a generic Node.FS.Base object. Call the superclass initialization, take care of setting up our relative and absolute paths, identify our parent directory, and indicate that this node should use signatures.""" if SCons.Debug.track_instances: logInstanceCreation(self, 'Node.FS.Base') SCons.Node.Node.__init__(self) # Filenames and paths are probably reused and are intern'ed to # save some memory. #: Filename with extension as it was specified when the object was #: created; to obtain filesystem path, use Python str() function self.name = SCons.Util.silent_intern(name) self.fs = fs #: Reference to parent Node.FS object assert directory, "A directory must be provided" self._abspath = None self._labspath = None self._path = None self._tpath = None self._path_elements = None self.dir = directory self.cwd = None # will hold the SConscript directory for target nodes self.duplicate = directory.duplicate self.changed_since_last_build = 2 self._func_sconsign = 0 self._func_exists = 2 self._func_rexists = 2 self._func_get_contents = 0 self._func_target_from_source = 1 self.store_info = 1
def _save_str(self): try: return self._memo['_save_str'] except KeyError: pass result = sys.intern(self._get_str()) self._memo['_save_str'] = result return result
def load_interned(self): n = self.r_long() ret = intern(self._read(n)) self._stringtable.append(ret) return ret
def load_interned(self): n = _r_long(self) ret = intern(_read(self, n)) self._stringtable.append(ret) return ret