我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.unlink()。
def mkdir(path, owner='root', group='root', perms=0o555, force=False): """Create a directory""" log("Making dir {} {}:{} {:o}".format(path, owner, group, perms)) uid = pwd.getpwnam(owner).pw_uid gid = grp.getgrnam(group).gr_gid realpath = os.path.abspath(path) path_exists = os.path.exists(realpath) if path_exists and force: if not os.path.isdir(realpath): log("Removing non-directory file {} prior to mkdir()".format(path)) os.unlink(realpath) os.makedirs(realpath, perms) elif not path_exists: os.makedirs(realpath, perms) os.chown(realpath, uid, gid) os.chmod(realpath, perms)
def _clean_check(cmd, target): """ Run the command to download target. If the command fails, clean up before re-raising the error. """ try: subprocess.check_call(cmd) except subprocess.CalledProcessError: if os.access(target, os.F_OK): os.unlink(target) raise
def dot2graph(self, dot, format='svg'): # windows ??????????????????? # ?????? NamedTemporaryFile ????? with NamedTemporaryFile(delete=False) as dotfile: dotfile.write(dot) outfile = NamedTemporaryFile(delete=False) os.system('dot -Efontname=sans -Nfontname=sans %s -o%s -T%s' % ( dotfile.name, outfile.name, format)) result = outfile.read() outfile.close() os.unlink(dotfile.name) os.unlink(outfile.name) return result
def test_rs3topng(): """rs3 file is converted to PNG""" png_str = rstviewer.rs3topng(RS3_FILEPATH) temp = tempfile.NamedTemporaryFile(suffix='.png', delete=False) temp.close() rstviewer.rs3topng(RS3_FILEPATH, temp.name) with open(temp.name, 'r') as png_file: assert png_str == png_file.read() os.unlink(temp.name) # generated images might not be 100% identical, probably # because of the font used with open(EXPECTED_PNG1, 'r') as expected_png_file: ident1 = png_str == expected_png_file.read() with open(EXPECTED_PNG2, 'r') as expected_png_file: ident2 = png_str == expected_png_file.read() assert ident1 or ident2
def test_cli_rs3topng(): """conversion to PNG on the commandline""" temp_png = tempfile.NamedTemporaryFile(suffix='.png', delete=False) temp_png.close() # calling `rstviewer -f png input.rs3 output.png` will end the program # with sys.exit(0), so we'll have to catch this here. with pytest.raises(SystemExit) as serr: cli(['-f', 'png', RS3_FILEPATH, temp_png.name]) out, err = pytest.capsys.readouterr() assert err == 0 with open(temp_png.name, 'r') as png_file: png_str = png_file.read() os.unlink(temp_png.name) # generated images might not be 100% identical, probably # because of the font used with open(EXPECTED_PNG1, 'r') as expected_png_file: ident1 = png_str == expected_png_file.read() with open(EXPECTED_PNG2, 'r') as expected_png_file: ident2 = png_str == expected_png_file.read() assert ident1 or ident2
def is_running(name): """Test whether task is running under ``name``. :param name: name of task :type name: ``unicode`` :returns: ``True`` if task with name ``name`` is running, else ``False`` :rtype: ``Boolean`` """ pidfile = _pid_file(name) if not os.path.exists(pidfile): return False with open(pidfile, 'rb') as file_obj: pid = int(file_obj.read().strip()) if _process_exists(pid): return True elif os.path.exists(pidfile): os.unlink(pidfile) return False
def _delete_directory_contents(self, dirpath, filter_func): """Delete all files in a directory. :param dirpath: path to directory to clear :type dirpath: ``unicode`` or ``str`` :param filter_func function to determine whether a file shall be deleted or not. :type filter_func ``callable`` """ if os.path.exists(dirpath): for filename in os.listdir(dirpath): if not filter_func(filename): continue path = os.path.join(dirpath, filename) if os.path.isdir(path): shutil.rmtree(path) else: os.unlink(path) self.logger.debug('Deleted : %r', path)
def _start_kuryr_manage_daemon(self): LOG.info("Pool manager started") server_address = oslo_cfg.CONF.pool_manager.sock_file try: os.unlink(server_address) except OSError: if os.path.exists(server_address): raise try: httpd = UnixDomainHttpServer(server_address, RequestHandler) httpd.serve_forever() except KeyboardInterrupt: pass except Exception: LOG.exception('Failed to start Pool Manager.') httpd.socket.close()
def run(self): self.run_command("egg_info") from glob import glob for pattern in self.match: pattern = self.distribution.get_name() + '*' + pattern files = glob(os.path.join(self.dist_dir, pattern)) files = [(os.path.getmtime(f), f) for f in files] files.sort() files.reverse() log.info("%d file(s) matching %s", len(files), pattern) files = files[self.keep:] for (t, f) in files: log.info("Deleting %s", f) if not self.dry_run: if os.path.isdir(f): shutil.rmtree(f) else: os.unlink(f)
def save(self): """Write changed .pth file back to disk""" if not self.dirty: return rel_paths = list(map(self.make_relative, self.paths)) if rel_paths: log.debug("Saving %s", self.filename) lines = self._wrap_lines(rel_paths) data = '\n'.join(lines) + '\n' if os.path.islink(self.filename): os.unlink(self.filename) with open(self.filename, 'wt') as f: f.write(data) elif os.path.exists(self.filename): log.debug("Deleting empty %s", self.filename) os.unlink(self.filename) self.dirty = False
def zap_pyfiles(self): log.info("Removing .py files from temporary directory") for base, dirs, files in walk_egg(self.bdist_dir): for name in files: path = os.path.join(base, name) if name.endswith('.py'): log.debug("Deleting %s", path) os.unlink(path) if base.endswith('__pycache__'): path_old = path pattern = r'(?P<name>.+)\.(?P<magic>[^.]+)\.pyc' m = re.match(pattern, name) path_new = os.path.join(base, os.pardir, m.group('name') + '.pyc') log.info("Renaming file from [%s] to [%s]" % (path_old, path_new)) try: os.remove(path_new) except OSError: pass os.rename(path_old, path_new)
def generate_pdf(card): """ Make a PDF from a card :param card: dict from fetcher.py :return: Binary PDF buffer """ from eclaire.base import SPECIAL_LABELS pdf = FPDF('L', 'mm', (62, 140)) pdf.set_margins(2.8, 2.8, 2.8) pdf.set_auto_page_break(False, margin=0) pdf.add_page() font = pkg_resources.resource_filename('eclaire', 'font/Clairifont.ttf') pdf.add_font('Clairifont', fname=font, uni=True) pdf.set_font('Clairifont', size=48) pdf.multi_cell(0, 18, txt=card.name.upper(), align='L') qrcode = generate_qr_code(card.url) qrcode_file = mktemp(suffix='.png', prefix='trello_qr_') qrcode.save(qrcode_file) pdf.image(qrcode_file, 118, 35, 20, 20) os.unlink(qrcode_file) # May we never speak of this again. pdf.set_fill_color(255, 255, 255) pdf.rect(0, 55, 140, 20, 'F') pdf.set_font('Clairifont', '', 16) pdf.set_y(-4) labels = ', '.join([label.name for label in card.labels if label.name not in SPECIAL_LABELS]) pdf.multi_cell(0, 0, labels, 0, 'R') return pdf.output(dest='S')
def makelink(self, tarinfo, targetpath): """Make a (symbolic) link called targetpath. If it cannot be created (platform limitation), we try to make a copy of the referenced file instead of a link. """ if hasattr(os, "symlink") and hasattr(os, "link"): # For systems that support symbolic and hard links. if tarinfo.issym(): if os.path.lexists(targetpath): os.unlink(targetpath) os.symlink(tarinfo.linkname, targetpath) else: # See extract(). if os.path.exists(tarinfo._link_target): if os.path.lexists(targetpath): os.unlink(targetpath) os.link(tarinfo._link_target, targetpath) else: self._extract_member(self._find_link_target(tarinfo), targetpath) else: try: self._extract_member(self._find_link_target(tarinfo), targetpath) except KeyError: raise ExtractError("unable to resolve link inside archive")
def _find_grail_rc(self): import glob import pwd import socket import tempfile tempdir = os.path.join(tempfile.gettempdir(), ".grail-unix") user = pwd.getpwuid(os.getuid())[0] filename = os.path.join(tempdir, user + "-*") maybes = glob.glob(filename) if not maybes: return None s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) for fn in maybes: # need to PING each one until we find one that's live try: s.connect(fn) except socket.error: # no good; attempt to clean it out, but don't fail: try: os.unlink(fn) except IOError: pass else: return s
def __init__(self, proxies=None, **x509): if proxies is None: proxies = getproxies() assert hasattr(proxies, 'has_key'), "proxies must be a mapping" self.proxies = proxies self.key_file = x509.get('key_file') self.cert_file = x509.get('cert_file') self.addheaders = [('User-Agent', self.version)] self.__tempfiles = [] self.__unlink = os.unlink # See cleanup() self.tempcache = None # Undocumented feature: if you assign {} to tempcache, # it is used to cache files retrieved with # self.retrieve(). This is not enabled by default # since it does not work for changing documents (and I # haven't got the logic to check expiration headers # yet). self.ftpcache = ftpcache # Undocumented feature: you can use a different # ftp cache by assigning to the .ftpcache member; # in case you want logically independent URL openers # XXX This is not threadsafe. Bah.
def nextfile(self): savestdout = self._savestdout self._savestdout = 0 if savestdout: sys.stdout = savestdout output = self._output self._output = 0 if output: output.close() file = self._file self._file = 0 if file and not self._isstdin: file.close() backupfilename = self._backupfilename self._backupfilename = 0 if backupfilename and not self._backup: try: os.unlink(backupfilename) except OSError: pass self._isstdin = False self._buffer = [] self._bufindex = 0
def close(self, remove=os.unlink,error=os.error): if self.pipe: rc = self.pipe.close() else: rc = 255 if self.tmpfile: try: remove(self.tmpfile) except error: pass return rc # Alias
def putsequences(self, sequences): """Write the set of sequences back to the folder.""" fullname = self.getsequencesfilename() f = None for key, seq in sequences.iteritems(): s = IntSet('', ' ') s.fromlist(seq) if not f: f = open(fullname, 'w') f.write('%s: %s\n' % (key, s.tostring())) if not f: try: os.unlink(fullname) except os.error: pass else: f.close()
def removemessages(self, list): """Remove one or more messages -- may raise os.error.""" errors = [] deleted = [] for n in list: path = self.getmessagefilename(n) commapath = self.getmessagefilename(',' + str(n)) try: os.unlink(commapath) except os.error: pass try: os.rename(path, commapath) except os.error, msg: errors.append(msg) else: deleted.append(n) if deleted: self.removefromallsequences(deleted) if errors: if len(errors) == 1: raise os.error, errors[0] else: raise os.error, ('multiple errors:', errors)
def copymessage(self, n, tofolder, ton): """Copy one message over a specific destination message, which may or may not already exist.""" path = self.getmessagefilename(n) # Open it to check that it exists f = open(path) f.close() del f topath = tofolder.getmessagefilename(ton) backuptopath = tofolder.getmessagefilename(',%d' % ton) try: os.rename(topath, backuptopath) except os.error: pass ok = 0 try: tofolder.setlast(None) shutil.copy2(path, topath) ok = 1 finally: if not ok: try: os.unlink(topath) except os.error: pass
def pack(self): """Re-name messages to eliminate numbering gaps. Invalidates keys.""" sequences = self.get_sequences() prev = 0 changes = [] for key in self.iterkeys(): if key - 1 != prev: changes.append((key, prev + 1)) if hasattr(os, 'link'): os.link(os.path.join(self._path, str(key)), os.path.join(self._path, str(prev + 1))) os.unlink(os.path.join(self._path, str(key))) else: os.rename(os.path.join(self._path, str(key)), os.path.join(self._path, str(prev + 1))) prev += 1 self._next_key = prev + 1 if len(changes) == 0: return for name, key_list in sequences.items(): for old, new in changes: if old in key_list: key_list[key_list.index(old)] = new self.set_sequences(sequences)
def _findLib_gcc(name): expr = r'[^\(\)\s]*lib%s\.[^\(\)\s]*' % re.escape(name) fdout, ccout = tempfile.mkstemp() os.close(fdout) cmd = 'if type gcc >/dev/null 2>&1; then CC=gcc; elif type cc >/dev/null 2>&1; then CC=cc;else exit 10; fi;' \ '$CC -Wl,-t -o ' + ccout + ' 2>&1 -l' + name try: f = os.popen(cmd) try: trace = f.read() finally: rv = f.close() finally: try: os.unlink(ccout) except OSError, e: if e.errno != errno.ENOENT: raise if rv == 10: raise OSError, 'gcc or cc command not found' res = re.search(expr, trace) if not res: return None return res.group(0)
def test_save_svgz_filename(): import gzip qr = segno.make_qr('test') f = tempfile.NamedTemporaryFile('wb', suffix='.svgz', delete=False) f.close() qr.save(f.name) f = open(f.name, mode='rb') expected = b'\x1f\x8b\x08' # gzip magic number val = f.read(len(expected)) f.close() f = gzip.open(f.name) try: content = f.read(6) finally: f.close() os.unlink(f.name) assert expected == val assert b'<?xml ' == content
def test_write_unicode_filename(): qr = segno.make_qr('test') f = tempfile.NamedTemporaryFile('wt', suffix='.svg', delete=False) f.close() title = 'mürrische Mädchen' desc = '?' qr.save(f.name, title=title, desc=desc) f = open(f.name, mode='rb') root = _parse_xml(f) f.seek(0) val = f.read(6) f.close() os.unlink(f.name) assert b'<?xml ' == val assert title == _get_title(root).text assert desc == _get_desc(root).text
def grab(bbox=None): if sys.platform == "darwin": f, file = tempfile.mkstemp('.png') os.close(f) subprocess.call(['screencapture', '-x', file]) im = Image.open(file) im.load() os.unlink(file) else: size, data = grabber() im = Image.frombytes( "RGB", size, data, # RGB, 32-bit line padding, origo in lower left corner "raw", "BGR", (size[0]*3 + 3) & -4, -1 ) if bbox: im = im.crop(bbox) return im
def test_infile_outfile(self): with tempfile.NamedTemporaryFile() as infile: infile.write(self.data.encode()) infile.flush() # outfile will get overwritten by tool, so the delete # may not work on some platforms. Do it manually. outfile = tempfile.NamedTemporaryFile() try: self.assertEqual( self.runTool(args=[infile.name, outfile.name]), ''.encode()) with open(outfile.name, 'rb') as f: self.assertEqual(f.read(), self.expect.encode()) finally: outfile.close() if os.path.exists(outfile.name): os.unlink(outfile.name)
def finalize(self): assert (self.__asynchronous == 0) and not self.__dirty if self.__buildIdCache is not None: self.__buildIdCache.close() self.__buildIdCache = None if self.__lock: try: os.unlink(self.__lock) except FileNotFoundError: from .tty import colorize from sys import stderr print(colorize("Warning: lock file was deleted while Bob was still running!", "33"), file=stderr) except OSError as e: from .tty import colorize from sys import stderr print(colorize("Warning: cannot unlock workspace: "+str(e), "33"), file=stderr)
def download_file(file, url, optional = False): try: print "Downloading", url + "...", sys.stdout.flush() my_urlretrieve(url, file) print "OK" sys.stdout.flush() except IOError, e: if optional: print "missing but optional, so that's OK" else: print e sys.stdout.flush() if os.path.exists(file): os.unlink(file) raise # Create a file of the given size, containing NULs, without holes.
def install(self): # This is needed for Xen and noemu, where we get the kernel # from the dist rather than the installed image self.dist.set_workdir(self.workdir) if self.vmm == 'noemu': self.dist.download() self._install() else: # Already installed? if os.path.exists(self.wd0_path()): return try: self._install() except: if os.path.exists(self.wd0_path()): os.unlink(self.wd0_path()) raise # Boot the virtual machine (installing it first if it's not # installed already). The vmm_args argument applies when # booting, but not when installing. Does not wait for # a login prompt.
def test_plotscene(): tempfilename = tempfile.NamedTemporaryFile(suffix='.svg').name print("using %s as a temporary file" % tempfilename) pg.setConfigOption('foreground', (0,0,0)) w = pg.GraphicsWindow() w.show() p1 = w.addPlot() p2 = w.addPlot() p1.plot([1,3,2,3,1,6,9,8,4,2,3,5,3], pen={'color':'k'}) p1.setXRange(0,5) p2.plot([1,5,2,3,4,6,1,2,4,2,3,5,3], pen={'color':'k', 'cosmetic':False, 'width': 0.3}) app.processEvents() app.processEvents() ex = pg.exporters.SVGExporter(w.scene()) ex.export(fileName=tempfilename) # clean up after the test is done os.unlink(tempfilename)
def is_running(name): """ Test whether task is running under ``name`` :param name: name of task :type name: ``unicode`` :returns: ``True`` if task with name ``name`` is running, else ``False`` :rtype: ``Boolean`` """ pidfile = _pid_file(name) if not os.path.exists(pidfile): return False with open(pidfile, 'rb') as file_obj: pid = int(file_obj.read().strip()) if _process_exists(pid): return True elif os.path.exists(pidfile): os.unlink(pidfile) return False
def _delete_directory_contents(self, dirpath, filter_func): """Delete all files in a directory :param dirpath: path to directory to clear :type dirpath: ``unicode`` or ``str`` :param filter_func function to determine whether a file shall be deleted or not. :type filter_func ``callable`` """ if os.path.exists(dirpath): for filename in os.listdir(dirpath): if not filter_func(filename): continue path = os.path.join(dirpath, filename) if os.path.isdir(path): shutil.rmtree(path) else: os.unlink(path) self.logger.debug('Deleted : %r', path)
def service_resume(service_name, init_dir="/etc/init", initd_dir="/etc/init.d"): """Resume a system service. Reenable starting again at boot. Start the service""" upstart_file = os.path.join(init_dir, "{}.conf".format(service_name)) sysv_file = os.path.join(initd_dir, service_name) if init_is_systemd(): service('enable', service_name) elif os.path.exists(upstart_file): override_path = os.path.join( init_dir, '{}.override'.format(service_name)) if os.path.exists(override_path): os.unlink(override_path) elif os.path.exists(sysv_file): subprocess.check_call(["update-rc.d", service_name, "enable"]) else: raise ValueError( "Unable to detect {0} as SystemD, Upstart {1} or" " SysV {2}".format( service_name, upstart_file, sysv_file)) started = service_running(service_name) if not started: started = service_start(service_name) return started
def download(self, source, dest): """ Download an archive file. :param str source: URL pointing to an archive file. :param str dest: Local path location to download archive file to. """ # propogate all exceptions # URLError, OSError, etc proto, netloc, path, params, query, fragment = urlparse(source) if proto in ('http', 'https'): auth, barehost = splituser(netloc) if auth is not None: source = urlunparse((proto, barehost, path, params, query, fragment)) username, password = splitpasswd(auth) passman = HTTPPasswordMgrWithDefaultRealm() # Realm is set to None in add_password to force the username and password # to be used whatever the realm passman.add_password(None, source, username, password) authhandler = HTTPBasicAuthHandler(passman) opener = build_opener(authhandler) install_opener(opener) response = urlopen(source) try: with open(dest, 'wb') as dest_file: dest_file.write(response.read()) except Exception as e: if os.path.isfile(dest): os.unlink(dest) raise e # Mandatory file validation via Sha1 or MD5 hashing.
def test_cli_rs3tohtml(): """conversion to HTML on the commandline""" temp_html = tempfile.NamedTemporaryFile(suffix='.html', delete=False) temp_html.close() cli([RS3_FILEPATH, temp_html.name]) with open(temp_html.name, 'r') as html_file: assert EXPECTED_HTML in html_file.read() os.unlink(temp_html.name)
def rs3topng(rs3_filepath, png_filepath=None): """Convert a RS3 file into a PNG image of the RST tree. If no output filename is given, the PNG image is returned as a string (which is useful for embedding). """ try: from selenium import webdriver from selenium.common.exceptions import WebDriverException except ImportError: raise ImportError( 'Please install selenium: pip install selenium') html_str = rs3tohtml(rs3_filepath) temp = tempfile.NamedTemporaryFile(suffix='.html', delete=False) temp.write(html_str.encode('utf8')) temp.close() try: driver = webdriver.PhantomJS() except WebDriverException as err: raise WebDriverException( 'Please install phantomjs: http://phantomjs.org/\n' + err.msg) driver.get(temp.name) os.unlink(temp.name) png_str = driver.get_screenshot_as_png() if png_filepath: with open(png_filepath, 'w') as png_file: png_file.write(png_str) else: return png_str
def release(self): """Release the lock by deleting `self.lockfile`.""" self._locked = False try: os.unlink(self.lockfile) except (OSError, IOError) as err: # pragma: no cover if err.errno != 2: raise err
def cache_data(self, name, data): """Save ``data`` to cache under ``name``. If ``data`` is ``None``, the corresponding cache file will be deleted. :param name: name of datastore :param data: data to store. This may be any object supported by the cache serializer """ serializer = manager.serializer(self.cache_serializer) cache_path = self.cachefile('%s.%s' % (name, self.cache_serializer)) if data is None: if os.path.exists(cache_path): os.unlink(cache_path) self.logger.debug('Deleted cache file : %s', cache_path) return with atomic_writer(cache_path, 'wb') as file_obj: serializer.dump(data, file_obj) self.logger.debug('Cached data saved at : %s', cache_path)
def test_bearer_token(self, m_cfg, m_get): token_content = ( "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJrdWJlcm5ldGVzL3Nl" "cnZpY2VhY2NvdW50Iiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9uYW1lc" "3BhY2UiOiJrdWJlLXN5c3RlbSIsImt1YmVybmV0ZXMuaW8vc2VydmljZWFjY291bn" "Qvc2VjcmV0Lm5hbWUiOiJkZWZhdWx0LXRva2VuLWh4M3QxIiwia3ViZXJuZXRlcy5" "pby9zZXJ2aWNlYWNjb3VudC9zZXJ2aWNlLWFjY291bnQubmFtZSI6ImRlZmF1bHQi" "LCJrdWJlcm5ldGVzLmlvL3NlcnZpY2VhY2NvdW50L3NlcnZpY2UtYWNjb3VudC51a" "WQiOiIxYTkyM2ZmNi00MDkyLTExZTctOTMwYi1mYTE2M2VkY2ViMDUiLCJzdWIiOi" "JzeXN0ZW06c2VydmljZWFjY291bnQ6a3ViZS1zeXN0ZW06ZGVmYXVsdCJ9.lzcPef" "DQ-uzF5cD-5pLwTKpRvtvvxKB4LX8TLymrPLMTth8WGr1vT6jteJPmLiDZM2C5dZI" "iFJpOw4LL1XLullik-ls-CmnTWq97NvlW1cZolC0mNyRz6JcL7gkH8WfUSjLA7x80" "ORalanUxtl9-ghMGKCtKIACAgvr5gGT4iznGYQQRx_hKURs4O6Js5vhwNM6UuOKeW" "GDDAlhgHMG0u59z3bhiBLl6jbQktZsu8c3diXniQb3sYqYQcGKUm1IQFujyA_ByDb" "5GUtCv1BOPL_-IjYtvdJD8ZzQ_UnPFoYQklpDyJLB7_7qCGcfVEQbnSCh907NdKo4" "w_8Wkn2y-Tg") token_file = tempfile.NamedTemporaryFile(mode="w+t", delete=False) try: m_cfg.kubernetes.token_file = token_file.name token_file.write(token_content) token_file.close() m_cfg.kubernetes.ssl_verify_server_crt = False path = '/test' client = k8s_client.K8sClient(self.base_url) client.get(path) headers = { 'Authorization': 'Bearer {}'.format(token_content)} m_get.assert_called_once_with( self.base_url + path, cert=(None, None), headers=headers, verify=False) finally: os.unlink(m_cfg.kubernetes.token_file)
def get_exit_code(self): self.process.wait() os.unlink(self.tmp_script_filename) return self.process.returncode
def release(self): if not self.is_locked(): raise NotLocked("%s is not locked" % self.path) elif not self.i_am_locking(): raise NotMyLock("%s is locked, but not by me" % self.path) os.unlink(self.lock_file)
def release(self): if not self.is_locked(): raise NotLocked("%s is not locked" % self.path) elif not os.path.exists(self.unique_name): raise NotMyLock("%s is locked, but not by me" % self.path) os.unlink(self.unique_name) os.rmdir(self.lock_file)
def break_lock(self): if os.path.exists(self.lock_file): for name in os.listdir(self.lock_file): os.unlink(os.path.join(self.lock_file, name)) os.rmdir(self.lock_file)
def acquire(self, timeout=None): try: open(self.unique_name, "wb").close() except IOError: raise LockFailed("failed to create %s" % self.unique_name) timeout = timeout if timeout is not None else self.timeout end_time = time.time() if timeout is not None and timeout > 0: end_time += timeout while True: # Try and create a hard link to it. try: os.link(self.unique_name, self.lock_file) except OSError: # Link creation failed. Maybe we've double-locked? nlinks = os.stat(self.unique_name).st_nlink if nlinks == 2: # The original link plus the one I created == 2. We're # good to go. return else: # Otherwise the lock creation failed. if timeout is not None and time.time() > end_time: os.unlink(self.unique_name) if timeout > 0: raise LockTimeout("Timeout waiting to acquire" " lock for %s" % self.path) else: raise AlreadyLocked("%s is already locked" % self.path) time.sleep(timeout is not None and timeout / 10 or 0.1) else: # Link creation succeeded. We're good to go. return