我们从Python开源项目中,提取了以下42个代码示例,用于说明如何使用resource.error()。
def test_selector_raises_timeout_error_on_interrupt_over_time(self): selectors2._DEFAULT_SELECTOR = None mock_socket = mock.Mock() mock_socket.fileno.return_value = 1 def slow_interrupting_select(*args, **kwargs): time.sleep(0.2) error = OSError() error.errno = errno.EINTR raise error patch_select_module(self, select=slow_interrupting_select) selector = self.make_selector() selector.register(mock_socket, selectors2.EVENT_READ) try: selector.select(timeout=0.1) except OSError as e: self.assertEqual(e.errno, errno.ETIMEDOUT) else: self.fail('Didn\'t raise an OSError')
def test_handles_closed_on_exception(self): # If CreateProcess exits with an error, ensure the # duplicate output handles are released ifhandle, ifname = mkstemp() ofhandle, ofname = mkstemp() efhandle, efname = mkstemp() try: subprocess.Popen (["*"], stdin=ifhandle, stdout=ofhandle, stderr=efhandle) except OSError: os.close(ifhandle) os.remove(ifname) os.close(ofhandle) os.remove(ofname) os.close(efhandle) os.remove(efname) self.assertFalse(os.path.exists(ifname)) self.assertFalse(os.path.exists(ofname)) self.assertFalse(os.path.exists(efname))
def __enter__(self): """Try to save previous ulimit, then set it to (0, 0).""" if resource is not None: try: self.old_limit = resource.getrlimit(resource.RLIMIT_CORE) resource.setrlimit(resource.RLIMIT_CORE, (0, 0)) except (ValueError, resource.error): pass if sys.platform == 'darwin': # Check if the 'Crash Reporter' on OSX was configured # in 'Developer' mode and warn that it will get triggered # when it is. # # This assumes that this context manager is used in tests # that might trigger the next manager. value = subprocess.Popen(['/usr/bin/defaults', 'read', 'com.apple.CrashReporter', 'DialogType'], stdout=subprocess.PIPE).communicate()[0] if value.strip() == b'developer': print("this tests triggers the Crash Reporter, " "that is intentional", end='') sys.stdout.flush()
def test_pipe_cloexec(self): sleeper = support.findfile("input_reader.py", subdir="subprocessdata") fd_status = support.findfile("fd_status.py", subdir="subprocessdata") p1 = subprocess.Popen([sys.executable, sleeper], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=False) self.addCleanup(p1.communicate, b'') p2 = subprocess.Popen([sys.executable, fd_status], stdout=subprocess.PIPE, close_fds=False) output, error = p2.communicate() result_fds = set(map(int, output.split(b','))) unwanted_fds = set([p1.stdin.fileno(), p1.stdout.fileno(), p1.stderr.fileno()]) self.assertFalse(result_fds & unwanted_fds, "Expected no fds from %r to be open in child, " "found %r" % (unwanted_fds, result_fds & unwanted_fds))
def test_handles_closed_on_exception(self): # If CreateProcess exits with an error, ensure the # duplicate output handles are released ifhandle, ifname = tempfile.mkstemp() ofhandle, ofname = tempfile.mkstemp() efhandle, efname = tempfile.mkstemp() try: subprocess.Popen (["*"], stdin=ifhandle, stdout=ofhandle, stderr=efhandle) except OSError: os.close(ifhandle) os.remove(ifname) os.close(ofhandle) os.remove(ofname) os.close(efhandle) os.remove(efname) self.assertFalse(os.path.exists(ifname)) self.assertFalse(os.path.exists(ofname)) self.assertFalse(os.path.exists(efname))
def __enter__(self): """Try to save previous ulimit, then set it to (0, 0).""" if resource is not None: try: self.old_limit = resource.getrlimit(resource.RLIMIT_CORE) resource.setrlimit(resource.RLIMIT_CORE, (0, 0)) except (ValueError, resource.error): pass if sys.platform == 'darwin': # Check if the 'Crash Reporter' on OSX was configured # in 'Developer' mode and warn that it will get triggered # when it is. # # This assumes that this context manager is used in tests # that might trigger the next manager. value = subprocess.Popen(['/usr/bin/defaults', 'read', 'com.apple.CrashReporter', 'DialogType'], stdout=subprocess.PIPE).communicate()[0] if value.strip() == b'developer': print "this tests triggers the Crash Reporter, that is intentional" sys.stdout.flush()
def test_start_new_session(self): # For code coverage of calling setsid(). We don't care if we get an # EPERM error from it depending on the test execution environment, that # still indicates that it was called. try: output = subprocess.check_output( [sys.executable, "-c", "import os; print(os.getpgid(os.getpid()))"], start_new_session=True) except OSError as e: if e.errno != errno.EPERM: raise else: parent_pgid = os.getpgid(os.getpid()) child_pgid = int(output) self.assertNotEqual(parent_pgid, child_pgid)
def test_huge_file_descriptor(self): from kazoo.handlers.threading import _HAS_EPOLL if not _HAS_EPOLL: self.skipTest('only run on systems with epoll()') import resource import socket from kazoo.handlers.utils import create_tcp_socket try: resource.setrlimit(resource.RLIMIT_NOFILE, (4096, 4096)) except (ValueError, resource.error): self.skipTest('couldnt raise fd limit high enough') fd = 0 socks = [] while fd < 4000: sock = create_tcp_socket(socket) fd = sock.fileno() socks.append(sock) h = self._makeOne() h.start() h.select(socks, [], []) with self.assertRaises(ValueError): h._select(socks, [], []) h._epoll_select(socks, [], []) h.stop()
def test_above_fd_setsize(self): # A scalable implementation should have no problem with more than # FD_SETSIZE file descriptors. Since we don't know the value, we just # try to set the soft RLIMIT_NOFILE to the hard RLIMIT_NOFILE ceiling. soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) if hard == resource.RLIM_INFINITY: self.skipTest("RLIMIT_NOFILE is infinite") try: # If we're on a *BSD system, the limit tag is different. _, bsd_hard = resource.getrlimit(resource.RLIMIT_OFILE) if bsd_hard == resource.RLIM_INFINITY: self.skipTest("RLIMIT_OFILE is infinite") if bsd_hard < hard: hard = bsd_hard # NOTE: AttributeError resource.RLIMIT_OFILE is not defined on Mac OS. except (OSError, resource.error, AttributeError): pass try: resource.setrlimit(resource.RLIMIT_NOFILE, (hard, hard)) self.addCleanup(resource.setrlimit, resource.RLIMIT_NOFILE, (soft, hard)) limit_nofile = min(hard, 2 ** 16) except (OSError, ValueError): limit_nofile = soft # Guard against already allocated FDs limit_nofile -= 256 limit_nofile = max(0, limit_nofile) s = self.make_selector() for i in range(limit_nofile // 2): rd, wr = self.make_socketpair() s.register(rd, selectors2.EVENT_READ) s.register(wr, selectors2.EVENT_WRITE) self.assertEqual(limit_nofile // 2, len(s.select()))
def __exit__(self, *args): """Return core file behavior to default.""" if self.old_limit is None: return if resource is not None: try: resource.setrlimit(resource.RLIMIT_CORE, self.old_limit) except (ValueError, resource.error): pass
def test_wait_when_sigchild_ignored(self): # NOTE: sigchild_ignore.py may not be an effective test on all OSes. sigchild_ignore = support.findfile("sigchild_ignore.py", subdir="subprocessdata") p = subprocess.Popen([sys.executable, sigchild_ignore], stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = p.communicate() self.assertEqual(0, p.returncode, "sigchild_ignore.py exited" " non-zero with this error:\n%s" % stderr.decode('utf8'))
def test_preexec_errpipe_does_not_double_close_pipes(self): """Issue16140: Don't double close pipes on preexec error.""" def raise_it(): raise RuntimeError("force the _execute_child() errpipe_data path.") with self.assertRaises(RuntimeError): self._TestExecuteChildPopen( self, [sys.executable, "-c", "pass"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, preexec_fn=raise_it)
def test_wait_when_sigchild_ignored(self): # NOTE: sigchild_ignore.py may not be an effective test on all OSes. sigchild_ignore = test_support.findfile("sigchild_ignore.py", subdir="subprocessdata") p = subprocess.Popen([sys.executable, sigchild_ignore], stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = p.communicate() self.assertEqual(0, p.returncode, "sigchild_ignore.py exited" " non-zero with this error:\n%s" % stderr)
def raise_limits(): import resource try: _, hard = resource.getrlimit(resource.RLIMIT_NOFILE) info("Current limits, soft and hard : {} {}".format(_, hard)) resource.setrlimit(resource.RLIMIT_NOFILE, (hard, hard)) except ValueError: error("Exceeds limit {}, infinity is {}".format(hard, resource.RLIM_INFINITY)) except resource.error: return False except OSError as e: critical('You may need to check ulimit parameter: {}'.format(e)) raise e return True
def create_pipeline_config(working_directory): try: default_config = os.environ['LOFARROOT'] + '/share/pipeline/pipeline.cfg' default_runtime = os.popen('grep runtime_directory ' + default_config + ' | cut -f2- -d"="').readlines()[0].rstrip('\n').replace(' ','') default_working = os.popen('grep working_directory ' + default_config + ' | cut -f2- -d"="').readlines()[0].rstrip('\n').replace(' ','') default_clusterdesc = os.popen('grep clusterdesc ' + default_config + ' | cut -f2- -d"="').readlines()[0].rstrip('\n').replace(' ','') default_logfile = os.popen('grep log_file ' + default_config + ' | cut -f2- -d"="').readlines()[0].rstrip('\n').replace(' ','') default_xml = os.popen('grep xml_stat_file ' + default_config + ' | cut -f2- -d"="').readlines()[0].rstrip('\n').replace(' ','') pipeline_cfg = working_directory + '/pipeline.cfg' with open(pipeline_cfg, 'w') as outfile: with open(default_config, 'r') as infile: for line in infile: outfile.write(line.replace(default_runtime, working_directory)\ .replace(default_working, '%(runtime_directory)s')\ .replace(default_clusterdesc, '%(working_directory)s/pipeline.clusterdesc')\ .replace(default_logfile, '%(runtime_directory)s/%(job_name)s/logs/%(start_time)s/pipeline.log')\ .replace(default_xml, '%(runtime_directory)s/%(job_name)s/logs/%(start_time)s/statistics.xml')) pass try: max_per_node = os.popen('nproc').readlines()[0].rstrip('\n') os.system('echo >> ' + pipeline_cfg) os.system('echo [remote] >> ' + pipeline_cfg) os.system('echo method = local >> ' + pipeline_cfg) os.system('echo max_per_node = ' + max_per_node + ' >> ' + pipeline_cfg) pass except IndexError: logging.error('The number of available CPUs could not be determined. Please check your installation of nproc.') sys.exit(1) pass except IOError or IndexError: logging.error('LOFAR pipeline configuration not found. Please check your installation.') sys.exit(1) pass infile.close() outfile.close() pass
def test_exception_cwd(self): """Test error in the child raised in the parent for a bad cwd.""" desired_exception = self._get_chdir_exception() try: p = subprocess.Popen([sys.executable, "-c", ""], cwd=self._nonexistent_dir) except OSError as e: # Test that the child process chdir failure actually makes # it up to the parent process as the correct exception. self.assertEqual(desired_exception.errno, e.errno) self.assertEqual(desired_exception.strerror, e.strerror) else: self.fail("Expected OSError: %s" % desired_exception)
def test_exception_bad_executable(self): """Test error in the child raised in the parent for a bad executable.""" desired_exception = self._get_chdir_exception() try: p = subprocess.Popen([sys.executable, "-c", ""], executable=self._nonexistent_dir) except OSError as e: # Test that the child process exec failure actually makes # it up to the parent process as the correct exception. self.assertEqual(desired_exception.errno, e.errno) self.assertEqual(desired_exception.strerror, e.strerror) else: self.fail("Expected OSError: %s" % desired_exception)
def test_wait_when_sigchild_ignored(self): # NOTE: sigchild_ignore.py may not be an effective test on all OSes. sigchild_ignore = support.findfile("sigchild_ignore.py", subdir="subprocessdata") p = subprocess.Popen([sys.executable, sigchild_ignore], stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = p.communicate() self.assertEqual(0, p.returncode, "sigchild_ignore.py exited" " non-zero with this error:\n%s" % stderr.decode('utf-8'))
def test_timeout_is_recalculated_after_interrupt(self): selectors2._DEFAULT_SELECTOR = None mock_socket = mock.Mock() mock_socket.fileno.return_value = 1 class InterruptingSelect(object): """ Helper object that imitates a select that interrupts after sleeping some time then returns a result. """ def __init__(self): self.call_count = 0 self.calls = [] def select(self, *args, **kwargs): self.calls.append((args, kwargs)) self.call_count += 1 if self.call_count == 1: time.sleep(0.1) error = OSError() error.errno = errno.EINTR raise error else: return [1], [], [] mock_select = InterruptingSelect() patch_select_module(self, select=mock_select.select) selector = self.make_selector() selector.register(mock_socket, selectors2.EVENT_READ) result = selector.select(timeout=1.0) # Make sure the mocked call actually completed correctly. self.assertEqual(len(result), 1) self.assertEqual(result[0][0].fileobj, mock_socket) self.assertEqual(result[0][1], selectors2.EVENT_READ) # There should be two calls to the mock_select.select() function self.assertEqual(mock_select.call_count, 2) # Timeout should be less in the second call. # The structure of mock_select.calls is [(args, kwargs), (args, kwargs)] where # args is ([r], [w], [x], timeout). self.assertLess(mock_select.calls[1][0][3], mock_select.calls[0][0][3])
def test_close_fds(self): fd_status = support.findfile("fd_status.py", subdir="subprocessdata") fds = os.pipe() self.addCleanup(os.close, fds[0]) self.addCleanup(os.close, fds[1]) open_fds = set(fds) # add a bunch more fds for _ in range(9): fd = os.open("/dev/null", os.O_RDONLY) self.addCleanup(os.close, fd) open_fds.add(fd) p = subprocess.Popen([sys.executable, fd_status], stdout=subprocess.PIPE, close_fds=False) output, ignored = p.communicate() remaining_fds = set(map(int, output.split(b','))) self.assertEqual(remaining_fds & open_fds, open_fds, "Some fds were closed") p = subprocess.Popen([sys.executable, fd_status], stdout=subprocess.PIPE, close_fds=True) output, ignored = p.communicate() remaining_fds = set(map(int, output.split(b','))) self.assertFalse(remaining_fds & open_fds, "Some fds were left open") self.assertIn(1, remaining_fds, "Subprocess failed") # Keep some of the fd's we opened open in the subprocess. # This tests _posixsubprocess.c's proper handling of fds_to_keep. fds_to_keep = set(open_fds.pop() for _ in range(8)) p = subprocess.Popen([sys.executable, fd_status], stdout=subprocess.PIPE, close_fds=True, pass_fds=()) output, ignored = p.communicate() remaining_fds = set(map(int, output.split(b','))) self.assertFalse(remaining_fds & fds_to_keep & open_fds, "Some fds not in pass_fds were left open") self.assertIn(1, remaining_fds, "Subprocess failed") # Mac OS X Tiger (10.4) has a kernel bug: sometimes, the file # descriptor of a pipe closed in the parent process is valid in the # child process according to fstat(), but the mode of the file # descriptor is invalid, and read or write raise an error.