我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用errno.EBUSY。
def _unsafe_writes(self, src, dest, exception): # sadly there are some situations where we cannot ensure atomicity, but only if # the user insists and we get the appropriate error we update the file unsafely if exception.errno == errno.EBUSY: #TODO: issue warning that this is an unsafe operation, but doing it cause user insists try: try: out_dest = open(dest, 'wb') in_src = open(src, 'rb') shutil.copyfileobj(in_src, out_dest) finally: # assuring closed files in 2.4 compatible way if out_dest: out_dest.close() if in_src: in_src.close() except (shutil.Error, OSError, IOError): e = get_exception() self.fail_json(msg='Could not write data to file (%s) from (%s): %s' % (dest, src, e)) else: self.fail_json(msg='Could not replace file: %s to %s: %s' % (src, dest, exception))
def createGroup(self,groupname): groupname=self.validateName(groupname) dirnam = self.translate(groupname) self.lock.acquire() try: try: os.mkdir(dirnam) self._dosynccall("createGroup",groupname) Log.msg('NameServer','created group',groupname) except OSError,x: if x.errno in (errno.EEXIST, errno.EBUSY): raise Pyro.errors.NamingError('group already exists',groupname) elif x.errno == errno.ENOENT: raise Pyro.errors.NamingError('(parent)group not found') else: raise Pyro.errors.NamingError(str(x)) finally: self.lock.release()
def test_scrub_inactive_pastes_error(self): pastes = [util.testing.PasteFactory.generate(expiry_time=None) for _ in range(15)] [util.testing.AttachmentFactory.generate(paste_id=paste.paste_id, file_name='file') for paste in pastes] [database.paste.deactivate_paste(paste.paste_id) for paste in pastes[:10]] with mock.patch.object(shutil, 'rmtree') as mock_rmtree: mock_rmtree.side_effect = OSError(errno.EBUSY) self.assertRaises( OSError, database.paste.scrub_inactive_pastes, ) with mock.patch.object(shutil, 'rmtree') as mock_rmtree: mock_rmtree.side_effect = OSError(errno.ENOENT) for paste in pastes: self.assertIsNotNone(database.paste.get_paste_by_id(paste.paste_id))
def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]): """A higher level wrapper representing how an application is supposed to use sendfile(). """ while 1: try: if self.SUPPORT_HEADERS_TRAILERS: return os.sendfile(sock, file, offset, nbytes, headers, trailers) else: return os.sendfile(sock, file, offset, nbytes) except OSError as err: if err.errno == errno.ECONNRESET: # disconnected raise elif err.errno in (errno.EAGAIN, errno.EBUSY): # we have to retry send data continue else: raise
def test_private_destroy_ebusy_timeout(self): # Tests that _destroy will retry 3 times to destroy the guest when an # EBUSY is raised, but eventually times out and raises the libvirtError ex = fakelibvirt.make_libvirtError( fakelibvirt.libvirtError, ("Failed to terminate process 26425 with SIGKILL: " "Device or resource busy"), error_code=fakelibvirt.VIR_ERR_SYSTEM_ERROR, int1=errno.EBUSY) mock_guest = mock.Mock(libvirt_guest.Guest, id=1) mock_guest.poweroff = mock.Mock(side_effect=ex) instance = objects.Instance(**self.test_instance) drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False) with mock.patch.object(drvr._host, 'get_guest', return_value=mock_guest): self.assertRaises(fakelibvirt.libvirtError, drvr._destroy, instance) self.assertEqual(3, mock_guest.poweroff.call_count)
def test_private_destroy_ebusy_multiple_attempt_ok(self): # Tests that the _destroy attempt loop is broken when EBUSY is no # longer raised. ex = fakelibvirt.make_libvirtError( fakelibvirt.libvirtError, ("Failed to terminate process 26425 with SIGKILL: " "Device or resource busy"), error_code=fakelibvirt.VIR_ERR_SYSTEM_ERROR, int1=errno.EBUSY) mock_guest = mock.Mock(libvirt_guest.Guest, id=1) mock_guest.poweroff = mock.Mock(side_effect=[ex, None]) inst_info = hardware.InstanceInfo(power_state.SHUTDOWN, id=1) instance = objects.Instance(**self.test_instance) drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False) with mock.patch.object(drvr._host, 'get_guest', return_value=mock_guest): with mock.patch.object(drvr, 'get_info', return_value=inst_info): drvr._destroy(instance) self.assertEqual(2, mock_guest.poweroff.call_count)
def cmd_destroy_shelf(self, cmdict): """ For a shelf, zombify books (mark for zeroing) and remove xattrs In (dict)--- path node Out (dict) --- shelf data """ self.errno = errno.EBUSY shelf = self.cmd_get_shelf(cmdict) assert not self.db.open_count( shelf), '%s has active opens' % shelf.name bos = self._list_shelf_books(shelf) xattrs = self.db.list_xattrs(shelf) for thisbos in bos: self.db.delete_bos(thisbos) _ = self._set_book_alloc(thisbos, TMBook.ALLOC_ZOMBIE) for xattr in xattrs: self.db.remove_xattr(shelf, xattr) rsp = self.db.delete_shelf(shelf, commit=True) return rsp
def load(volpath): """ Load and return dictionary from the sidecar """ meta_file = lib.DiskLib_SidecarMakeFileName(volpath.encode(), DVOL_KEY.encode()) retry_count = 0 vol_name = vmdk_utils.get_volname_from_vmdk_path(volpath) while True: try: with open(meta_file, "r") as fh: kv_str = fh.read() break except IOError as open_error: # This is a workaround to the timing/locking with metadata files issue #626 if open_error.errno == errno.EBUSY and retry_count <= vmdk_utils.VMDK_RETRY_COUNT: logging.warning("Meta file %s busy for load(), retrying...", meta_file) vmdk_utils.log_volume_lsof(vol_name) retry_count += 1 time.sleep(vmdk_utils.VMDK_RETRY_SLEEP) else: logging.exception("Failed to access %s", meta_file) return None try: return json.loads(kv_str) except ValueError: logging.exception("load:Failed to decode meta-data for %s", volpath) return None
def save(volpath, kv_dict, key=None, value=None): """ Save the dictionary to side car. """ meta_file = lib.DiskLib_SidecarMakeFileName(volpath.encode(), DVOL_KEY.encode()) kv_str = json.dumps(kv_dict) retry_count = 0 vol_name = vmdk_utils.get_volname_from_vmdk_path(volpath) while True: try: if key: with open(meta_file, "r") as rfh: kv_val = rfh.read() try: kv_match = json.loads(kv_val) except ValueError: logging.exception("load:Failed to decode meta-data for %s", volpath) return False if key in kv_match and kv_match[key] != value: return False with open(meta_file, "w") as fh: fh.write(align_str(kv_str, KV_ALIGN)) break except IOError as open_error: # This is a workaround to the timing/locking with metadata files issue #626 if open_error.errno == errno.EBUSY and retry_count <= vmdk_utils.VMDK_RETRY_COUNT: logging.warning("Meta file %s busy for save(), retrying...", meta_file) vmdk_utils.log_volume_lsof(vol_name) retry_count += 1 time.sleep(vmdk_utils.VMDK_RETRY_SLEEP) else: logging.exception("Failed to save meta-data for %s", volpath) return False return True
def test_bad_acquire(self): lock_file = os.path.join(self.lock_dir, 'lock') lock = BrokenLock(lock_file, errno.EBUSY) self.assertRaises(threading.ThreadError, lock.acquire)
def _initdb_1(self): # root is not a NamedTree but a directory try: os.mkdir(self.dbroot) except OSError,x: if x.errno not in (errno.EEXIST, errno.EBUSY): raise
def test_init_fail_open(self, usb_context): device = self.Device(0x1000, 0x2000, 1, 2, [ self.Settings([ self.Endpoint(0x0004, 0x0002), self.Endpoint(0x0084, 0x0002), ]) ]) device.open = MagicMock() device.open.side_effect = [ nfc.clf.transport.libusb.USBErrorAccess, nfc.clf.transport.libusb.USBErrorBusy, nfc.clf.transport.libusb.USBErrorNoDevice, ] usb_context.return_value.getDeviceList.return_value = [device] with pytest.raises(IOError) as excinfo: nfc.clf.transport.USB(1, 2) assert excinfo.value.errno == errno.EACCES with pytest.raises(IOError) as excinfo: nfc.clf.transport.USB(1, 2) assert excinfo.value.errno == errno.EBUSY with pytest.raises(IOError) as excinfo: nfc.clf.transport.USB(1, 2) assert excinfo.value.errno == errno.ENODEV
def main(args): print("This is the %s version of nfcpy run in Python %s\non %s" % (nfc.__version__, platform.python_version(), platform.platform())) print("I'm now searching your system for contactless devices") found = 0 for vid, pid, bus, dev in nfc.clf.transport.USB.find("usb"): if (vid, pid) in nfc.clf.device.usb_device_map: path = "usb:{0:03d}:{1:03d}".format(bus, dev) try: clf = nfc.ContactlessFrontend(path) print("** found %s" % clf.device) clf.close() found += 1 except IOError as error: if error.errno == errno.EACCES: usb_device_access_denied(bus, dev, vid, pid, path) elif error.errno == errno.EBUSY: usb_device_found_is_busy(bus, dev, vid, pid, path) if args.search_tty: for dev in nfc.clf.transport.TTY.find("tty")[0]: path = "tty:{0}".format(dev[8:]) try: clf = nfc.ContactlessFrontend(path) print("** found %s" % clf.device) clf.close() found += 1 except IOError as error: if error.errno == errno.EACCES: print("access denied for device with path %s" % path) elif error.errno == errno.EBUSY: print("the device with path %s is busy" % path) else: print("I'm not trying serial devices because you haven't told me") print("-- add the option '--search-tty' to have me looking") print("-- but beware that this may break other serial devs") if not found: print("Sorry, but I couldn't find any contactless device")
def umount(mount_point): """This function unmounts a mounted directory forcibly. This will be used for unmounting broken hard drive mounts which may hang. If umount returns EBUSY this will lazy unmount. :param mount_point: str. A String representing the filesystem mount point :returns: int. Returns 0 on success. errno otherwise. """ libc_path = ctypes.util.find_library("c") libc = ctypes.CDLL(libc_path, use_errno=True) # First try to umount with MNT_FORCE ret = libc.umount(mount_point, 1) if ret < 0: err = ctypes.get_errno() if err == errno.EBUSY: # Detach from try. IE lazy umount ret = libc.umount(mount_point, 2) if ret < 0: err = ctypes.get_errno() return err return 0 else: return err return 0
def test_umount_ebusy(self): self.ctypes.util.find_library.return_value = 'libc.so.6' umount_mock = Mock() self.ctypes.CDLL.return_value = umount_mock umount_mock.umount.side_effect = umount_busy self.ctypes.get_errno.return_value = errno.EBUSY ret = ceph.umount('/some/osd/mount') umount_mock.assert_has_calls([ call.umount('/some/osd/mount', 1), call.umount('/some/osd/mount', 2), ]) assert ret == 0
def test_main(): try: dsp = ossaudiodev.open('w') except (ossaudiodev.error, IOError) as msg: if msg.args[0] in (errno.EACCES, errno.ENOENT, errno.ENODEV, errno.EBUSY): raise unittest.SkipTest(msg) raise dsp.close() support.run_unittest(__name__)
def display_sound_init_error_message(self, e): if isinstance(e, soundcard.SoundInitException): solfege.win.display_error_message( """%s""" % str(e).decode(locale.getpreferredencoding(), 'replace')) elif isinstance(e, ImportError): solfege.win.display_error_message2(str(e), _("You should configure sound from the preferences window, and try to use an external midi player. Or try to recompile the program and check for error messages to see why the module is not built.")) elif getattr(e, 'errno', None) == errno.EACCES: solfege.win.display_error_message( "The sound init failed: %s\n" "The errno EACCES indicates that you don't have write " "permission to the device." % str(e).decode(locale.getpreferredencoding(), 'replace')) elif getattr(e, 'errno', None) == errno.EBUSY: solfege.win.display_error_message( "The sound init failed: %s\n" "It seems like some other program is using the device. You " "should try to quit that other program and restart Solfege." % str(e).decode(locale.getpreferredencoding(), 'replace')) else: solfege.win.display_error_message( "The sound init failed: %s\n" "You should configure sound from the 'Sound' page of " "the preferences window.\n\n" "It is also possible that the OS sound setup is incorrect." % str(e).decode(locale.getpreferredencoding(), 'replace'))
def test_main(): try: dsp = linuxaudiodev.open('w') except linuxaudiodev.error, msg: if msg.args[0] in (errno.EACCES, errno.ENOENT, errno.ENODEV, errno.EBUSY): raise unittest.SkipTest(msg) raise dsp.close() run_unittest(LinuxAudioDevTests)
def test_main(): try: dsp = ossaudiodev.open('w') except (ossaudiodev.error, IOError), msg: if msg.args[0] in (errno.EACCES, errno.ENOENT, errno.ENODEV, errno.EBUSY): raise unittest.SkipTest(msg) raise dsp.close() test_support.run_unittest(__name__)
def test_flags(self): try: os.sendfile(self.sockno, self.fileno, 0, 4096, flags=os.SF_NODISKIO) except OSError as err: if err.errno not in (errno.EBUSY, errno.EAGAIN): raise
def test_main(): try: dsp = ossaudiodev.open('w') except (ossaudiodev.error, OSError) as msg: if msg.args[0] in (errno.EACCES, errno.ENOENT, errno.ENODEV, errno.EBUSY): raise unittest.SkipTest(msg) raise dsp.close() support.run_unittest(__name__)
def initiate_sendfile(self): """A wrapper around sendfile.""" try: sent = sendfile(self._fileno, self._filefd, self._offset, self.ac_out_buffer_size) except OSError as err: if err.errno in _ERRNOS_RETRY or err.errno == errno.EBUSY: return elif err.errno in _ERRNOS_DISCONNECTED: self.handle_close() else: if self.tot_bytes_sent == 0: logger.warning( "sendfile() failed; falling back on using plain send") raise _GiveUpOnSendfile else: raise else: if sent == 0: # this signals the channel that the transfer is completed self.discard_buffers() self.handle_close() else: self._offset += sent self.tot_bytes_sent += sent # --- utility methods
def test_write_config_errors_if_unexpected_exception(self): dnsconfig = DNSConfig() exception = IOError(errno.EBUSY, factory.make_string()) self.patch(config, 'atomic_write', Mock(side_effect=exception)) self.assertRaises(IOError, dnsconfig.write_config)
def connect_miniterm(port): try: ser = serial.Serial(port, BAUDRATE, parity=PARITY, rtscts=False, xonxoff=False) return Miniterm(ser, echo=False) except serial.SerialException as e: if e.errno == errno.ENOENT: sys.stderr.write( "Device %r not found. Check your " "MicroPython device path settings.\n" % port) elif e.errno == errno.EBUSY: # Device is busy. Explain what to do. sys.stderr.write( "Found the device, but it is busy. " "Wait up to 20 seconds, or " "press the reset button on the " "back of the device next to the yellow light; " "then try again.\n" ) elif e.errno == errno.EACCES: sys.stderr.write("Found the device, but could not connect.".format(port)) sys.stderr.write(e) sys.stderr.write('On linux, try adding yourself to the "dialout" group') sys.stderr.write('sudo usermod -a -G dialout <your-username>') else: # Try to be as helpful as possible. sys.stderr.write("Found the device, but could not connect via" + " port %r: %s\n" % (port, e)) sys.stderr.write("I'm not sure what to suggest. :-(\n") input("Press ENTER to continue") sys.exit(1)
def open(self, usb_bus, dev_adr): self.usb_dev = None self.usb_out = None self.usb_inp = None for dev in self.context.getDeviceList(skip_on_error=True): if ((dev.getBusNumber() == usb_bus and dev.getDeviceAddress() == dev_adr)): break else: log.error("no device {0} on bus {1}".format(dev_adr, usb_bus)) raise IOError(errno.ENODEV, os.strerror(errno.ENODEV)) try: first_setting = dev.iterSettings().next() except StopIteration: log.error("no usb configuration settings, please replug device") raise IOError(errno.ENODEV, os.strerror(errno.ENODEV)) def transfer_type(x): return x & libusb.TRANSFER_TYPE_MASK def endpoint_dir(x): return x & libusb.ENDPOINT_DIR_MASK for endpoint in first_setting.iterEndpoints(): ep_addr = endpoint.getAddress() ep_attr = endpoint.getAttributes() if transfer_type(ep_attr) == libusb.TRANSFER_TYPE_BULK: if endpoint_dir(ep_addr) == libusb.ENDPOINT_IN: if not self.usb_inp: self.usb_inp = endpoint if endpoint_dir(ep_addr) == libusb.ENDPOINT_OUT: if not self.usb_out: self.usb_out = endpoint if not (self.usb_inp and self.usb_out): log.error("no bulk endpoints for read and write") raise IOError(errno.ENODEV, os.strerror(errno.ENODEV)) try: # workaround the PN533's buggy USB implementation self._manufacturer_name = dev.getManufacturer() self._product_name = dev.getProduct() except libusb.USBErrorIO: self._manufacturer_name = None self._product_name = None try: self.usb_dev = dev.open() self.usb_dev.claimInterface(0) except libusb.USBErrorAccess: raise IOError(errno.EACCES, os.strerror(errno.EACCES)) except libusb.USBErrorBusy: raise IOError(errno.EBUSY, os.strerror(errno.EBUSY)) except libusb.USBErrorNoDevice: raise IOError(errno.ENODEV, os.strerror(errno.ENODEV))
def play_sound_file(self, data, rate, ssize, nchannels): try: dsp = ossaudiodev.open('w') except IOError as msg: if msg.args[0] in (errno.EACCES, errno.ENOENT, errno.ENODEV, errno.EBUSY): raise unittest.SkipTest(msg) raise # at least check that these methods can be invoked dsp.bufsize() dsp.obufcount() dsp.obuffree() dsp.getptr() dsp.fileno() # Make sure the read-only attributes work. self.assertFalse(dsp.closed) self.assertEqual(dsp.name, "/dev/dsp") self.assertEqual(dsp.mode, "w", "bad dsp.mode: %r" % dsp.mode) # And make sure they're really read-only. for attr in ('closed', 'name', 'mode'): try: setattr(dsp, attr, 42) except (TypeError, AttributeError): pass else: self.fail("dsp.%s not read-only" % attr) # Compute expected running time of sound sample (in seconds). expected_time = float(len(data)) / (ssize/8) / nchannels / rate # set parameters based on .au file headers dsp.setparameters(AFMT_S16_NE, nchannels, rate) self.assertTrue(abs(expected_time - 3.51) < 1e-2, expected_time) t1 = time.time() dsp.write(data) dsp.close() t2 = time.time() elapsed_time = t2 - t1 percent_diff = (abs(elapsed_time - expected_time) / expected_time) * 100 self.assertTrue(percent_diff <= 10.0, "elapsed time (%s) > 10%% off of expected time (%s)" % (elapsed_time, expected_time))
def play_sound_file(self, data, rate, ssize, nchannels): try: dsp = ossaudiodev.open('w') except IOError, msg: if msg.args[0] in (errno.EACCES, errno.ENOENT, errno.ENODEV, errno.EBUSY): raise unittest.SkipTest(msg) raise # at least check that these methods can be invoked dsp.bufsize() dsp.obufcount() dsp.obuffree() dsp.getptr() dsp.fileno() # Make sure the read-only attributes work. self.assertFalse(dsp.closed) self.assertEqual(dsp.name, "/dev/dsp") self.assertEqual(dsp.mode, "w", "bad dsp.mode: %r" % dsp.mode) # And make sure they're really read-only. for attr in ('closed', 'name', 'mode'): try: setattr(dsp, attr, 42) except TypeError: pass else: self.fail("dsp.%s not read-only" % attr) # Compute expected running time of sound sample (in seconds). expected_time = float(len(data)) / (ssize//8) / nchannels / rate # set parameters based on .au file headers dsp.setparameters(AFMT_S16_NE, nchannels, rate) self.assertTrue(abs(expected_time - 3.51) < 1e-2, expected_time) t1 = time.time() dsp.write(data) dsp.close() t2 = time.time() elapsed_time = t2 - t1 percent_diff = (abs(elapsed_time - expected_time) / expected_time) * 100 self.assertTrue(percent_diff <= 10.0, "elapsed time > 10% off of expected time")
def rmtree(path, selinux=False, exclude=()): """Version of shutil.rmtree that ignores no-such-file-or-directory errors, tries harder if it finds immutable files and supports excluding paths""" if os.path.islink(path): raise OSError("Cannot call rmtree on a symbolic link") try_again = True retries = 0 failed_to_handle = False failed_filename = None if path in exclude: return while try_again: try_again = False try: names = os.listdir(path) for name in names: fullname = os.path.join(path, name) if fullname not in exclude: try: mode = os.lstat(fullname).st_mode except OSError: mode = 0 if stat.S_ISDIR(mode): try: rmtree(fullname, selinux=selinux, exclude=exclude) except OSError as e: if e.errno in (errno.EPERM, errno.EACCES, errno.EBUSY): # we alrady tried handling this on lower level and failed, # there's no point in trying again now failed_to_handle = True raise else: os.remove(fullname) os.rmdir(path) except OSError as e: if failed_to_handle: raise if e.errno == errno.ENOENT: # no such file or directory pass elif exclude and e.errno == errno.ENOTEMPTY: # there's something excluded left pass elif selinux and (e.errno == errno.EPERM or e.errno == errno.EACCES): try_again = True if failed_filename == e.filename: raise failed_filename = e.filename os.system("chattr -R -i %s" % path) elif e.errno == errno.EBUSY: retries += 1 if retries > 1: raise try_again = True getLog().debug("retrying failed tree remove after sleeping a bit") time.sleep(2) else: raise
def set_memory_hardlimit(cgrp, limit): """Set the cgroup hard-limits to the desired value. The logic here is complicated since the ordering of operations depends on weither we are lowering or raising the value. """ def _lower_memory_hardlimit(cgrp, limit): """Lower the cgroup memory hardlimit.""" cgroups.set_value('memory', cgrp, 'memory.limit_in_bytes', limit) cgroups.set_value('memory', cgrp, 'memory.memsw.limit_in_bytes', limit) def _raise_memory_hardlimit(cgrp, limit): """Raise the cgroup memory hardlimit.""" cgroups.set_value('memory', cgrp, 'memory.memsw.limit_in_bytes', limit) cgroups.set_value('memory', cgrp, 'memory.limit_in_bytes', limit) memory_hardlimit_funs = [_lower_memory_hardlimit, _raise_memory_hardlimit] while memory_hardlimit_funs: try: memory_hardlimit_fun = memory_hardlimit_funs.pop(0) memory_hardlimit_fun(cgrp, limit) break except IOError as err: if err.errno == errno.EBUSY: # Resource busy, this means the cgroup is already using more # then the limit we are trying to set. raise TreadmillCgroupError('Unable to set hard limit to %d. ' 'Cgroup %r memory over limit.' % (limit, cgrp)) elif err.errno == errno.EINVAL: # This means we did the hard limit operation in the wrong # order. If we have a different ordering to try, go ahead. if memory_hardlimit_funs: continue # For any other case, raise the exception raise
def play_sound_file(self, data, rate, ssize, nchannels): try: dsp = ossaudiodev.open('w') except OSError as msg: if msg.args[0] in (errno.EACCES, errno.ENOENT, errno.ENODEV, errno.EBUSY): raise unittest.SkipTest(msg) raise # at least check that these methods can be invoked dsp.bufsize() dsp.obufcount() dsp.obuffree() dsp.getptr() dsp.fileno() # Make sure the read-only attributes work. self.assertFalse(dsp.closed) self.assertEqual(dsp.name, "/dev/dsp") self.assertEqual(dsp.mode, "w", "bad dsp.mode: %r" % dsp.mode) # And make sure they're really read-only. for attr in ('closed', 'name', 'mode'): try: setattr(dsp, attr, 42) except (TypeError, AttributeError): pass else: self.fail("dsp.%s not read-only" % attr) # Compute expected running time of sound sample (in seconds). expected_time = float(len(data)) / (ssize/8) / nchannels / rate # set parameters based on .au file headers dsp.setparameters(AFMT_S16_NE, nchannels, rate) self.assertTrue(abs(expected_time - 3.51) < 1e-2, expected_time) t1 = time.time() dsp.write(data) dsp.close() t2 = time.time() elapsed_time = t2 - t1 percent_diff = (abs(elapsed_time - expected_time) / expected_time) * 100 self.assertTrue(percent_diff <= 10.0, "elapsed time (%s) > 10%% off of expected time (%s)" % (elapsed_time, expected_time))
def remove(self, pkgplan): path = self.get_installed_path(pkgplan.image.get_root()) try: os.rmdir(path) except OSError as e: if e.errno == errno.ENOENT: pass elif e.errno in (errno.EEXIST, errno.ENOTEMPTY): # Cannot remove directory since it's # not empty. pkgplan.salvage(path) elif e.errno == errno.ENOTDIR: # Either the user or another package has changed # this directory into a link or file. Salvage # what's there and drive on. pkgplan.salvage(path) elif e.errno == errno.EBUSY and os.path.ismount(path): # User has replaced directory with mountpoint, # or a package has been poorly implemented. if not self.attrs.get("implicit"): err_txt = _("Unable to remove {0}; it is " "in use as a mountpoint. To " "continue, please unmount the " "filesystem at the target " "location and try again.").format( path) raise apx.ActionExecutionError(self, details=err_txt, error=e, fmri=pkgplan.origin_fmri) elif e.errno == errno.EBUSY: # os.path.ismount() is broken for lofs # filesystems, so give a more generic # error. if not self.attrs.get("implicit"): err_txt = _("Unable to remove {0}; it " "is in use by the system, another " "process, or as a " "mountpoint.").format(path) raise apx.ActionExecutionError(self, details=err_txt, error=e, fmri=pkgplan.origin_fmri) elif e.errno != errno.EACCES: # this happens on Windows raise