我们从Python开源项目中,提取了以下32个代码示例,用于说明如何使用os.environ.items()。
def push_dnzb(self, nzbheaders=None): """pushes meta information to NZBGet Server as DNZB content if no `nzbheaders` (dictionary) is defined, then the default one is used instead. """ if nzbheaders is None: nzbheaders = self.nzbheaders if not isinstance(nzbheaders, dict): return False for k, v in nzbheaders.items(): # Push content to NZB Server self.push('%s%s' % ( SHR_ENVIRO_DNZB_ID, k.upper(), ), v.strip()) return True
def compare_generic_iter(make_it,match): """Utility to compare a generic 2.1/2.2+ iterator with an iterable If running under Python 2.2+, this tests the iterator using iter()/next(), as well as __getitem__. 'make_it' must be a function returning a fresh iterator to be tested (since this may test the iterator twice).""" it = make_it() n = 0 for item in match: if not it[n]==item: raise AssertionError n+=1 try: it[n] except IndexError: pass else: raise AssertionError("Too many items from __getitem__",it) try: iter, StopIteration except NameError: pass else: # Only test iter mode under 2.2+ it = make_it() if not iter(it) is it: raise AssertionError for item in match: if not next(it) == item: raise AssertionError try: next(it) except StopIteration: pass else: raise AssertionError("Too many items from .__next__()", it)
def testMappingInterface(self): test = [('x','y')] self.assertEqual(len(Headers([])),0) self.assertEqual(len(Headers(test[:])),1) self.assertEqual(Headers(test[:]).keys(), ['x']) self.assertEqual(Headers(test[:]).values(), ['y']) self.assertEqual(Headers(test[:]).items(), test) self.assertFalse(Headers(test).items() is test) # must be copy! h=Headers([]) del h['foo'] # should not raise an error h['Foo'] = 'bar' for m in h.__contains__, h.get, h.get_all, h.__getitem__: self.assertTrue(m('foo')) self.assertTrue(m('Foo')) self.assertTrue(m('FOO')) self.assertFalse(m('bar')) self.assertEqual(h['foo'],'bar') h['foo'] = 'baz' self.assertEqual(h['FOO'],'baz') self.assertEqual(h.get_all('foo'),['baz']) self.assertEqual(h.get("foo","whee"), "baz") self.assertEqual(h.get("zoo","whee"), "whee") self.assertEqual(h.setdefault("foo","whee"), "baz") self.assertEqual(h.setdefault("zoo","whee"), "whee") self.assertEqual(h["foo"],"baz") self.assertEqual(h["zoo"],"whee")
def checkOSEnviron(self,handler): empty = {}; setup_testing_defaults(empty) env = handler.environ from os import environ for k,v in environ.items(): if k not in empty: self.assertEqual(env[k],v) for k,v in empty.items(): self.assertIn(k, env)
def compare_generic_iter(make_it,match): """Utility to compare a generic 2.1/2.2+ iterator with an iterable If running under Python 2.2+, this tests the iterator using iter()/next(), as well as __getitem__. 'make_it' must be a function returning a fresh iterator to be tested (since this may test the iterator twice).""" it = make_it() n = 0 for item in match: if not it[n]==item: raise AssertionError n+=1 try: it[n] except IndexError: pass else: raise AssertionError("Too many items from __getitem__",it) try: iter, StopIteration except NameError: pass else: # Only test iter mode under 2.2+ it = make_it() if not iter(it) is it: raise AssertionError for item in match: if not it.next()==item: raise AssertionError try: it.next() except StopIteration: pass else: raise AssertionError("Too many items from .next()",it)
def testMappingInterface(self): test = [('x','y')] self.assertEqual(len(Headers([])),0) self.assertEqual(len(Headers(test[:])),1) self.assertEqual(Headers(test[:]).keys(), ['x']) self.assertEqual(Headers(test[:]).values(), ['y']) self.assertEqual(Headers(test[:]).items(), test) self.assertIsNot(Headers(test).items(), test) # must be copy! h=Headers([]) del h['foo'] # should not raise an error h['Foo'] = 'bar' for m in h.has_key, h.__contains__, h.get, h.get_all, h.__getitem__: self.assertTrue(m('foo')) self.assertTrue(m('Foo')) self.assertTrue(m('FOO')) self.assertFalse(m('bar')) self.assertEqual(h['foo'],'bar') h['foo'] = 'baz' self.assertEqual(h['FOO'],'baz') self.assertEqual(h.get_all('foo'),['baz']) self.assertEqual(h.get("foo","whee"), "baz") self.assertEqual(h.get("zoo","whee"), "whee") self.assertEqual(h.setdefault("foo","whee"), "baz") self.assertEqual(h.setdefault("zoo","whee"), "whee") self.assertEqual(h["foo"],"baz") self.assertEqual(h["zoo"],"whee")
def testMappingInterface(self): test = [('x','y')] self.assertEqual(len(Headers([])),0) self.assertEqual(len(Headers(test[:])),1) self.assertEqual(Headers(test[:]).keys(), ['x']) self.assertEqual(Headers(test[:]).values(), ['y']) self.assertEqual(Headers(test[:]).items(), test) self.assertIsNot(Headers(test).items(), test) # must be copy! h=Headers([]) del h['foo'] # should not raise an error h['Foo'] = 'bar' for m in h.__contains__, h.get, h.get_all, h.__getitem__: self.assertTrue(m('foo')) self.assertTrue(m('Foo')) self.assertTrue(m('FOO')) self.assertFalse(m('bar')) self.assertEqual(h['foo'],'bar') h['foo'] = 'baz' self.assertEqual(h['FOO'],'baz') self.assertEqual(h.get_all('foo'),['baz']) self.assertEqual(h.get("foo","whee"), "baz") self.assertEqual(h.get("zoo","whee"), "whee") self.assertEqual(h.setdefault("foo","whee"), "baz") self.assertEqual(h.setdefault("zoo","whee"), "whee") self.assertEqual(h["foo"],"baz") self.assertEqual(h["zoo"],"whee")
def testMappingInterface(self): test = [('x','y')] self.assertEqual(len(Headers([])),0) self.assertEqual(len(Headers(test[:])),1) self.assertEqual(Headers(test[:]).keys(), ['x']) self.assertEqual(Headers(test[:]).values(), ['y']) self.assertEqual(Headers(test[:]).items(), test) self.assertFalse(Headers(test).items() is test) # must be copy! h=Headers([]) del h['foo'] # should not raise an error h['Foo'] = 'bar' for m in h.has_key, h.__contains__, h.get, h.get_all, h.__getitem__: self.assertTrue(m('foo')) self.assertTrue(m('Foo')) self.assertTrue(m('FOO')) self.assertFalse(m('bar')) self.assertEqual(h['foo'],'bar') h['foo'] = 'baz' self.assertEqual(h['FOO'],'baz') self.assertEqual(h.get_all('foo'),['baz']) self.assertEqual(h.get("foo","whee"), "baz") self.assertEqual(h.get("zoo","whee"), "whee") self.assertEqual(h.setdefault("foo","whee"), "baz") self.assertEqual(h.setdefault("zoo","whee"), "whee") self.assertEqual(h["foo"],"baz") self.assertEqual(h["zoo"],"whee")
def get_task(self, taskid=None): """Returns a dictionary of task details identified by the id specified. If no id is specified, then the current task is detected and returned. """ if taskid is None: # assume default taskid = self.taskid if not isinstance(taskid, int): try: taskid = int(taskid) except (ValueError, TypeError): # can't be typecasted to an integer return {} if taskid <= 0: # No task defined return {} # Precompile Regulare Expression for Speed task_re = re.compile('^%s%s%d_([A-Z0-9_]+)$' % ( SCHEDULER_ENVIRO_ID, TASK_ENVIRO_ID, taskid, )) self.logger.debug('Looking for %s%s%d_([A-Z0-9_]+)$' % ( SCHEDULER_ENVIRO_ID, TASK_ENVIRO_ID, taskid, )) # Fetch Task related content return dict([(task_re.match(k).group(1), v.strip()) \ for (k, v) in environ.items() if task_re.match(k)])
def get_feed(self, feedid=None): """Returns a dictionary of feed details identified by the id specified. If no id is specified, then the current feed is detected and returned. """ if feedid is None: # assume default feedid = self.feedid if not isinstance(feedid, int): try: feedid = int(feedid) except (ValueError, TypeError): # can't be typecasted to an integer return {} if feedid <= 0: # No feed id defined return {} # Precompile Regulare Expression for Speed feed_re = re.compile('^%s%s%d_([A-Z0-9_]+)$' % ( FEED_ENVIRO_ID, FEEDID_ENVIRO_ID, feedid, )) self.logger.debug('Looking for %s%s%d_([A-Z0-9_]+)$' % ( FEED_ENVIRO_ID, FEEDID_ENVIRO_ID, feedid, )) # Fetch Feed related content return dict([(feed_re.match(k).group(1), v.strip()) \ for (k, v) in environ.items() if feed_re.match(k)])
def pull_dnzb(self): """pulls meta information stored in the DNZB environment variables and returns a dictionary """ # Preload nzbheaders based on any DNZB environment variables return dict([(DNZB_OPTS_RE.match(k).group(1).upper(), v.strip()) \ for (k, v) in environ.items() if DNZB_OPTS_RE.match(k)])
def scheduler_init(self, *args, **kwargs): # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- # Fetch Script Specific Arguments # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- taskid = kwargs.get('taskid') # Fetch/Load Scan Script Configuration script_config = dict([(SCHEDULER_OPTS_RE.match(k).group(1), v.strip()) \ for (k, v) in environ.items() if SCHEDULER_OPTS_RE.match(k)]) if self.vvdebug: # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- # Print Global Script Varables to help debugging process # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- for k, v in script_config.items(): self.logger.vvdebug('%s%s=%s' % (SCHEDULER_ENVIRO_ID, k, v)) # Merge Script Configuration With System Config self.system = dict(script_config.items() + self.system.items()) # self.taskid # This is the Task Identifier passed in from NZBGet if taskid is None: self.taskid = environ.get( '%sTASKID' % SCHEDULER_ENVIRO_ID, ) else: self.taskid = taskid # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- # Error Handling # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- try: self.taskid = int(self.taskid) self.logger.info('Task ID assigned: %d' % self.taskid) except (ValueError, TypeError): # Default is 0 self.taskid = 0 self.logger.warning('No Task ID was assigned') # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- # Enforce system/global variables for script processing # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- self.system['TASKID'] = self.taskid if isinstance(self.taskid, int) and self.taskid > 0: environ['%sTASKID' % SCHEDULER_ENVIRO_ID] = str(self.taskid) # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- # Debug Flag Check # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
def items(self, check_system=True, check_shared=True, use_db=True): """ This lets you utilize for-loops by returning you a list of keys """ items = list() if use_db and self.database is None and self.database_key: try: # Connect to database on first use only self.database = Database( container=self.database_key, database=join( self.tempdir, NZBGET_DATABASE_FILENAME, ), logger=self.logger, ) # Fetch from database first items = self.database.items() except EnvironmentError: # Database Access Problem # set the dbstore to false so it isn't used anymore self.database = False except NameError: # Sqlite wasn't installed # set the dbstore to false so it isn't used anymore self.database = False elif use_db and self.database: # Fetch from database first items = self.database.items() if check_shared: # Shared values trump any database set ones items = dict(items + self.shared.items()).items() # configuration trumps shared values items = dict(items + self.config.items()).items() if check_system: # system trumps all values items = dict(items + self.system.items()).items() return items # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- # nzb_set() and nzb_get() wrappers # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
def nzb_items(self, use_db=True): """ This lets you utilize for-loops by returning you a list of keys """ items = list() if use_db and self.database is None and self.database_key: try: # Connect to database on first use only self.database = Database( container=self.database_key, database=join( self.tempdir, NZBGET_DATABASE_FILENAME, ), logger=self.logger, ) # Fetch from database first items = self.database.items() except EnvironmentError: # Database Access Problem # set the dbstore to false so it isn't used anymore self.database = False except NameError: # Sqlite wasn't installed # set the dbstore to false so it isn't used anymore self.database = False elif use_db and self.database: # Fetch from database first items = self.database.items(category=Category.NZB) # configuration trumps shared values items = dict(items + self.nzbheaders.items()).items() return items # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- # Sanity # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
def parse_list(self, *args): """ Take a string list and break it into a delimited list of arguments. This funciton also supports the processing of a list of delmited strings and will always return a unique set of arguments. Duplicates are always combined in the final results. You can append as many items to the argument listing for parsing. Hence: parse_list('.mkv, .iso, .avi') becomes: ['.mkv', '.iso', '.avi'] Hence: parse_list('.mkv, .iso, .avi', ['.avi', '.mp4']) becomes: ['.mkv', '.iso', '.avi', '.mp4'] The parsing is very forgiving and accepts spaces, slashes, commas semicolons, and pipes as delimiters """ result = [] for arg in args: if isinstance(arg, basestring): result += re.split(STRING_DELIMITERS, arg) elif isinstance(arg, (list, tuple)): for _arg in arg: if isinstance(arg, basestring): result += re.split(STRING_DELIMITERS, arg) # A list inside a list? - use recursion elif isinstance(_arg, (list, tuple)): result += self.parse_list(_arg) else: # Convert whatever it is to a string and work with it result += self.parse_list(str(_arg)) else: # Convert whatever it is to a string and work with it result += self.parse_list(str(arg)) # apply as well as make the list unique by converting it # to a set() first. filter() eliminates any empty entries return filter(bool, list(set(result)))