我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用email.message.get_sequences()。
def pack(self): """Re-name messages to eliminate numbering gaps. Invalidates keys.""" sequences = self.get_sequences() prev = 0 changes = [] for key in self.iterkeys(): if key - 1 != prev: changes.append((key, prev + 1)) if hasattr(os, 'link'): os.link(os.path.join(self._path, str(key)), os.path.join(self._path, str(prev + 1))) os.unlink(os.path.join(self._path, str(key))) else: os.rename(os.path.join(self._path, str(key)), os.path.join(self._path, str(prev + 1))) prev += 1 self._next_key = prev + 1 if len(changes) == 0: return for name, key_list in sequences.items(): for old, new in changes: if old in key_list: key_list[key_list.index(old)] = new self.set_sequences(sequences)
def pack(self): """Re-name messages to eliminate numbering gaps. Invalidates keys.""" sequences = self.get_sequences() prev = 0 changes = [] for key in self.keys(): if key - 1 != prev: changes.append((key, prev + 1)) if hasattr(os, 'link'): os.link(os.path.join(self._path, str(key)), os.path.join(self._path, str(prev + 1))) os.unlink(os.path.join(self._path, str(key))) else: os.rename(os.path.join(self._path, str(key)), os.path.join(self._path, str(prev + 1))) prev += 1 self._next_key = prev + 1 if len(changes) == 0: return for name, key_list in sequences.items(): for old, new in changes: if old in key_list: key_list[key_list.index(old)] = new self.set_sequences(sequences)
def get_message(self, key): """Return a Message representation or raise a KeyError.""" try: if self._locked: f = open(os.path.join(self._path, str(key)), 'rb+') else: f = open(os.path.join(self._path, str(key)), 'rb') except OSError as e: if e.errno == errno.ENOENT: raise KeyError('No message with key: %s' % key) else: raise with f: if self._locked: _lock_file(f) try: msg = MHMessage(f) finally: if self._locked: _unlock_file(f) for name, key_list in self.get_sequences().items(): if key in key_list: msg.add_sequence(name) return msg
def get_sequences(self): """Return a name-to-key-list dictionary to define each sequence.""" results = {} with open(os.path.join(self._path, '.mh_sequences'), 'r', encoding='ASCII') as f: all_keys = set(self.keys()) for line in f: try: name, contents = line.split(':') keys = set() for spec in contents.split(): if spec.isdigit(): keys.add(int(spec)) else: start, stop = (int(x) for x in spec.split('-')) keys.update(range(start, stop + 1)) results[name] = [key for key in sorted(keys) \ if key in all_keys] if len(results[name]) == 0: del results[name] except ValueError: raise FormatError('Invalid sequence specification: %s' % line.rstrip()) return results
def get_message(self, key): """Return a Message representation or raise a KeyError.""" try: if self._locked: f = open(os.path.join(self._path, str(key)), 'r+') else: f = open(os.path.join(self._path, str(key)), 'r') except IOError, e: if e.errno == errno.ENOENT: raise KeyError('No message with key: %s' % key) else: raise try: if self._locked: _lock_file(f) try: msg = MHMessage(f) finally: if self._locked: _unlock_file(f) finally: f.close() for name, key_list in self.get_sequences().iteritems(): if key in key_list: msg.add_sequence(name) return msg
def get_sequences(self): """Return a name-to-key-list dictionary to define each sequence.""" results = {} f = open(os.path.join(self._path, '.mh_sequences'), 'r') try: all_keys = set(self.keys()) for line in f: try: name, contents = line.split(':') keys = set() for spec in contents.split(): if spec.isdigit(): keys.add(int(spec)) else: start, stop = (int(x) for x in spec.split('-')) keys.update(range(start, stop + 1)) results[name] = [key for key in sorted(keys) \ if key in all_keys] if len(results[name]) == 0: del results[name] except ValueError: raise FormatError('Invalid sequence specification: %s' % line.rstrip()) finally: f.close() return results
def _dump_sequences(self, message, key): """Inspect a new MHMessage and update sequences appropriately.""" pending_sequences = message.get_sequences() all_sequences = self.get_sequences() for name, key_list in all_sequences.iteritems(): if name in pending_sequences: key_list.append(key) elif key in key_list: del key_list[key_list.index(key)] for sequence in pending_sequences: if sequence not in all_sequences: all_sequences[sequence] = [key] self.set_sequences(all_sequences)
def get_sequences(self): """Return a list of sequences that include the message.""" return self._sequences[:]
def get_message(self, key): """Return a Message representation or raise a KeyError.""" try: if self._locked: f = open(os.path.join(self._path, str(key)), 'rb+') else: f = open(os.path.join(self._path, str(key)), 'rb') except IOError as e: if e.errno == errno.ENOENT: raise KeyError('No message with key: %s' % key) else: raise try: if self._locked: _lock_file(f) try: msg = MHMessage(f) finally: if self._locked: _unlock_file(f) finally: f.close() for name, key_list in self.get_sequences().items(): if key in key_list: msg.add_sequence(name) return msg
def _dump_sequences(self, message, key): """Inspect a new MHMessage and update sequences appropriately.""" pending_sequences = message.get_sequences() all_sequences = self.get_sequences() for name, key_list in all_sequences.items(): if name in pending_sequences: key_list.append(key) elif key in key_list: del key_list[key_list.index(key)] for sequence in pending_sequences: if sequence not in all_sequences: all_sequences[sequence] = [key] self.set_sequences(all_sequences)