Python shelve 模块,open() 实例源码

我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用shelve.open()

项目:pycreate2    作者:MomsFriendlyRobotCompany    | 项目源码 | 文件源码
def write():
    os.remove(filename)
    cap = cv2.VideoCapture(0)
    db = shelve.open(filename)
    imgs = []
    data = range(100)

    for i in range(100):
        ret, frame = cap.read()

        if ret:
            # jpg = frame  # 29 MB
            # jpg = cv2.imencode('.jpg', frame)  # make much smaller (1.9MB), otherwise 29MB
            jpg = cv2.imencode('.jpg', frame)[1].tostring()  # no bennefit with doing string (1.9MB)
            imgs.append(jpg)
            print('frame[{}] {}'.format(i, frame.shape))

        time.sleep(0.03)

    db['imgs'] = imgs
    db['data'] = data
    cap.release()
    db.close()
项目:jenkins-epo    作者:peopledoc    | 项目源码 | 文件源码
def open(self):
        if self.opened:
            return

        self.lock = open(SETTINGS.CACHE_PATH + '.lock', 'ab')
        try:
            fcntl.flock(self.lock, fcntl.LOCK_EX | fcntl.LOCK_NB)
            mode = 'c'
        except IOError:
            logger.warn("Cache locked, using read-only")
            mode = 'r'
            self.lock.close()
            self.lock = None

        try:
            self.storage = shelve.open(SETTINGS.CACHE_PATH, mode)
        except Exception as e:
            if mode != 'c':
                raise
            logger.warn("Dropping corrupted cache on %s", e)
            self.lock.truncate(0)
            self.storage = shelve.open(SETTINGS.CACHE_PATH, mode)
        self.opened = True
项目:pymotw3    作者:reingart    | 项目源码 | 文件源码
def __init__(self, path_entry):
        # Loading shelve causes an import recursive loop when it
        # imports dbm, and we know we are not going to load the
        # module # being imported, so when we seem to be
        # recursing just ignore the request so another finder
        # will be used.
        if ShelveFinder._maybe_recursing:
            raise ImportError
        try:
            # Test the path_entry to see if it is a valid shelf
            try:
                ShelveFinder._maybe_recursing = True
                with shelve.open(path_entry, 'r'):
                    pass
            finally:
                ShelveFinder._maybe_recursing = False
        except Exception as e:
            print('shelf could not import from {}: {}'.format(
                path_entry, e))
            raise
        else:
            print('shelf added to import path:', path_entry)
            self.path_entry = path_entry
        return
项目:proxyguy    作者:yudori    | 项目源码 | 文件源码
def new_profile(profile_name, address, port, username, password, activate):
    """
    create a network proxy configuration profile
    """
    store = shelve.open(db_file)
    try:
        store[str(profile_name)] = {
            'address': address,
            'port': port,
            'username': username,
            'password': password
        }
        click.echo("Profile '{}' successfully created".format(profile_name))
    finally:
        store.close()

    if activate:
        activate_profile(profile_name)
项目:proxyguy    作者:yudori    | 项目源码 | 文件源码
def activate_profile(profile_name):
    store = shelve.open(db_file)
    try:
        if profile_name == "":
            util.write(None)
            try:
                del store['active']
            finally:
                click.echo("No proxy mode activated")
        else:
            if profile_name is not "active":
                util.write(store[str(profile_name)])
                store['active'] = str(profile_name)
                click.echo("Profile '{}' successfully activated".
                           format(profile_name))
    except KeyError:
        click.echo("No such profile '{}'".format(str(profile_name)))
    finally:
        store.close()
项目:proxyguy    作者:yudori    | 项目源码 | 文件源码
def delete_profile(profile_name):
    """
    delete specified profile
    """
    store = shelve.open(db_file)
    try:
        if profile_name is not "active":
            del store[str(profile_name)]
            try:
                if str(store["active"]) == profile_name:
                    del store["active"]
            except KeyError:
                pass
        click.echo("Profile '{}' successfully deleted".format(str(profile_name)))
    except KeyError:
        click.echo("No such profile '{}'".format(str(profile_name)))
    finally:
        store.close()
项目:sequana    作者:sequana    | 项目源码 | 文件源码
def _get_data(url):
    """Helper function to get data over http or from a local file"""
    if url.startswith('http://'):
        # Try Python 2, use Python 3 on exception
        try:
            resp = urllib.urlopen(url)
            encoding = resp.headers.dict.get('content-encoding', 'plain')
        except AttributeError:
            resp = urllib.request.urlopen(url)
            encoding = resp.headers.get('content-encoding', 'plain')
        data = resp.read()
        if encoding == 'plain':
            pass
        elif encoding == 'gzip':
            data = StringIO(data)
            data = gzip.GzipFile(fileobj=data).read()
        else:
            raise RuntimeError('unknown encoding')
    else:
        with open(url, 'r') as fid:
            data = fid.read()

    return data
项目:sequana    作者:sequana    | 项目源码 | 文件源码
def get_data(url, gallery_dir):
    """Persistent dictionary usage to retrieve the search indexes"""

    # shelve keys need to be str in python 2
    if sys.version_info[0] == 2 and isinstance(url, unicode):
        url = url.encode('utf-8')

    cached_file = os.path.join(gallery_dir, 'searchindex')
    search_index = shelve.open(cached_file)
    if url in search_index:
        data = search_index[url]
    else:
        data = _get_data(url)
        search_index[url] = data
    search_index.close()

    return data
项目:roguelike-tutorial    作者:Wolfenswan    | 项目源码 | 文件源码
def save_game():
    """ open a new empty shelve (possibly overwriting an old one) to write the game data """
    with shelve.open('savegames/savegame', 'n') as savefile:
        gv.cursor.deactivate()
        savefile['map'] = gv.game_map
        savefile['objects'] = gv.game_objects
        savefile['log'] = gv.game_log
        savefile['gamestate'] = gv.gamestate
        savefile['dlevel'] = gv.dungeon_level

        # Store the index of special objects, so they can be later restored from the gv.game_objects array
        savefile['p_index'] = gv.game_objects.index(gv.player)
        savefile['c_index'] = gv.game_objects.index(gv.cursor)
        savefile['sd_index'] = gv.game_objects.index(gv.stairs_down)
        savefile['su_index'] = gv.game_objects.index(gv.stairs_up)

        savefile.close()
项目:roguelike-tutorial    作者:Wolfenswan    | 项目源码 | 文件源码
def load_game():
    """ load an existing savegame """
    with shelve.open('savegames/savegame', 'r') as savefile:
        gv.game_map = savefile['map']
        gv.game_objects = savefile['objects']
        gv.game_log = savefile['log']
        gv.gamestate = savefile['gamestate']
        gv.dungeon_level = savefile['dlevel']

        # Restore special objects
        gv.player = gv.game_objects[savefile['p_index']]
        gv.cursor = gv.game_objects[savefile['c_index']]
        gv.stairs_down = gv.game_objects[savefile['sd_index']]
        gv.stairs_up = gv.game_objects[savefile['su_index']]

        msgbox('Welcome back stranger to level {0} of {1}!'.format(gv.dungeon_level, settings.DUNGEONNAME),width=35, text_color=colors.red)
项目:Treehacks    作者:andrewsy97    | 项目源码 | 文件源码
def extract_all(data):
    resp = dict(data)
    # text = get_sanitized(data)
    text = data['text']
    # request feature extraction.
    text_hash = hashlib.sha256(text.encode('ascii', 'ignore')).hexdigest()
    print 'text_hash', text_hash
    cache_db = shelve.open(path.join(CACHE_DIR, 'feature'))
    if not cache_db.has_key(text_hash):
        print 'new call'
        call = alchemyapi.combined('text', text)
        cache_db[text_hash] = call
    else:
        print 'cached call'
        call = cache_db[text_hash]
    cache_db.close()
    # filter results.
    whitelist = ['concepts', 'entities', 'keywords', 'taxonomy']
    for key in whitelist:
        if key not in call:
            resp[key] = []
            continue
        resp[key] = call[key]
    return resp
项目:em    作者:nhynes    | 项目源码 | 文件源码
def proj_create(args, config, _extra_args):
    """Creates a new em-managed project."""
    tmpl_repo = config['project']['template_repo']
    try:
        pygit2.clone_repository(tmpl_repo, args.dest)

        # delete history of template
        shutil.rmtree(osp.join(args.dest, '.git'), ignore_errors=True)
        pygit2.init_repository(args.dest)
    except ValueError:
        pass  # already in a repo

    for em_dir in ['experiments', 'data']:
        dpath = osp.join(args.dest, em_dir)
        if not osp.isdir(dpath):
            os.mkdir(dpath)

    with shelve.open(osp.join(args.dest, '.em')) as emdb:
        emdb['__em__'] = {}
项目:em    作者:nhynes    | 项目源码 | 文件源码
def resume(args, config, prog_args):
    """Resume a stopped experiment."""
    name = args.name

    repo = pygit2.Repository('.')

    with shelve.open('.em') as emdb:
        if name not in emdb:
            return _die(E_NO_EXP.format(name))
        info = emdb[name]
        if 'pid' in info or info.get('status') == 'running':
            return _die(E_IS_RUNNING.format(name))
        try:
            repo.lookup_branch(name)
        except pygit2.GitError:
            return _die(E_NO_EXP.format(name))

    prog_args.append('--resume')
    if args.epoch:
        prog_args.append(args.epoch)

    return _run_job(name, config, args.gpu, prog_args, args.background)
项目:em    作者:nhynes    | 项目源码 | 文件源码
def list_experiments(args, _config, _extra_args):
    """List experiments."""
    import subprocess

    if args.filter:
        filter_key, filter_value = args.filter.split('=')

        def _filt(stats):
            return filter_key in stats and stats[filter_key] == filter_value

    with shelve.open('.em') as emdb:
        if args.filter:
            names = {name
                     for name, info in sorted(emdb.items()) if _filt(info)}
        else:
            names = emdb.keys()
        names -= {EM_KEY}
        if not names:
            return

    subprocess.run(
        ['column'], input='\n'.join(sorted(names)) + '\n', encoding='utf8')
项目:em    作者:nhynes    | 项目源码 | 文件源码
def show(args, _config, _extra_args):
    """Show details about an experiment."""
    import pickle
    import pprint

    name = args.name

    with shelve.open('.em') as emdb:
        if name not in emdb or name == EM_KEY:
            return _die(E_NO_EXP.format(name))
        for info_name, info_val in sorted(emdb[name].items()):
            if isinstance(info_val, datetime.date):
                info_val = info_val.ctime()
            print(f'{info_name}: {info_val}')

    if not args.opts:
        return

    opts_path = _expath(name, 'run', 'opts.pkl')
    with open(opts_path, 'rb') as f_opts:
        print('\noptions:')
        opts = pickle.load(f_opts)
        cols = shutil.get_terminal_size((80, 20)).columns
        pprint.pprint(vars(opts), indent=2, compact=True, width=cols)
项目:PyperGrabber    作者:pykong    | 项目源码 | 文件源码
def get_pdf(pdf_link):

    # check whether value already existing in permanent storage:
    pdf_name = pdf_link.rsplit('/', 1)[-1]  # set filename according to last element of link
    if not check_db(pdf_name) and not check_db(pdf_link):
        # print 'Downloading: {}'.format(pdf_link)
        try:
            opener = urllib2.build_opener()
            opener.addheaders = [('User-agent', USER_AGENT)]

            r = opener.open(pdf_link)

            path = tmp_dir + pdf_name

            with open(path, "wb") as code:  # 'w'
                code.write(r.read())

            # log successful download:
            log_download('DOWNLOADED: {}'.format(pdf_link))

        except Exception as e:
            log_download('FAILURE: {} | {}'.format(pdf_link, e))
    else:
        log_download('File already downloaded: {}'.format(pdf_name))
项目:bolero    作者:rock-learning    | 项目源码 | 文件源码
def _get_data(url):
    """Helper function to get data over http(s) or from a local file"""
    if urllib_parse.urlparse(url).scheme in ('http', 'https'):
        resp = urllib_request.urlopen(url)
        encoding = resp.headers.get('content-encoding', 'plain')
        data = resp.read()
        if encoding == 'plain':
            data = data.decode('utf-8')
        elif encoding == 'gzip':
            data = BytesIO(data)
            data = gzip.GzipFile(fileobj=data).read().decode('utf-8')
        else:
            raise RuntimeError('unknown encoding')
    else:
        with codecs.open(url, mode='r', encoding='utf-8') as fid:
            data = fid.read()

    return data
项目:bolero    作者:rock-learning    | 项目源码 | 文件源码
def get_data(url, gallery_dir):
    """Persistent dictionary usage to retrieve the search indexes"""

    # shelve keys need to be str in python 2
    if sys.version_info[0] == 2 and isinstance(url, unicode):
        url = url.encode('utf-8')

    cached_file = os.path.join(gallery_dir, 'searchindex')
    search_index = shelve.open(cached_file)
    if url in search_index:
        data = search_index[url]
    else:
        data = _get_data(url)
        search_index[url] = data
    search_index.close()

    return data
项目:Personal_AI_Assistant    作者:PratylenClub    | 项目源码 | 文件源码
def fill_tf_idf_shelve(self):
        tf_idf_shelve = shelve.open(self.tf_idf_shelve_file_name, writeback=True)
        if TF not in tf_idf_shelve:
            tf_idf_shelve[TF] = {}
        if DF not in tf_idf_shelve:
            tf_idf_shelve[DF] = {}
        if D not in tf_idf_shelve:
            tf_idf_shelve[D] = 0
        if TF_IDF not in tf_idf_shelve:
            tf_idf_shelve[TF_IDF] = {}
        if CENTROID not in tf_idf_shelve:
            tf_idf_shelve[CENTROID] = {}

        for action,trigger_txt in self.trigger_dict.iteritems():
            if action not in tf_idf_shelve[TF].keys():
                trigger = self.tokenize_text(trigger_txt)
                tf_idf_shelve[TF][action] = Counter(trigger)
                for word in unique(trigger):
                    if word not in tf_idf_shelve[DF].keys():
                        tf_idf_shelve[DF][word] = 0
                    tf_idf_shelve[DF][word] += 1
        tf_idf_shelve[D] = len(tf_idf_shelve[TF])
        tf_idf_shelve.close()
        self.compute_tf_idf()
        self.compute_centroids()
项目:Personal_AI_Assistant    作者:PratylenClub    | 项目源码 | 文件源码
def add_list_of_words_in_w2v_model(self, unknown_words):
        huge_w2v_model_file = open(self.w2v_huge_model_path, "r")
        current_w2v_model_file = open(self.w2v_model_path, "a")
        line = huge_w2v_model_file.readline()
        unknown_words_left = len(unknown_words)
        while line and unknown_words_left:
            word = line.split()[0]
            if word in unknown_words:
                current_w2v_model_file.write(line)
                unknown_words = unknown_words - set([word])
                unknown_words_left -= 1
            line = huge_w2v_model_file.readline()
        for word in list(unknown_words):
            random_position = random(self.w2v_model.vector_size)*2-1
            current_w2v_model_file.write(" ".join(([word]+[str(x) for x in random_position])))
            print "warning random positions introduced for new words ... in the future this should be solved"
        current_w2v_model_file.close()
        huge_w2v_model_file.close()
项目:Personal_AI_Assistant    作者:PratylenClub    | 项目源码 | 文件源码
def add_list_of_words_in_w2v_model(self, unknown_words):
        huge_w2v_model_file = open(self.w2v_huge_model_path, "r")
        current_w2v_model_file = open(self.w2v_model_path, "a")
        line = huge_w2v_model_file.readline()
        unknown_words_left = len(unknown_words)
        while line and unknown_words_left:
            word = line.split()[0]
            if word in unknown_words:
                current_w2v_model_file.write(line)
                unknown_words = unknown_words - set([word])
                unknown_words_left -= 1
            line = huge_w2v_model_file.readline()
        for word in list(unknown_words):
            random_position = random(self.w2v_model.vector_size)*2-1
            current_w2v_model_file.write(" ".join(([word]+[str(x) for x in random_position])))
            print "warning random positions introduced for new words ... in the future this should be solved"
        current_w2v_model_file.close()
        huge_w2v_model_file.close()
项目:video-importer    作者:tnc-ca-geo    | 项目源码 | 文件源码
def list_files(self, path):
        storage = self.args.storage
        shelve_name = os.path.join(os.path.dirname(__file__), storage)
        db = shelve.open(shelve_name)
        status = []
        for key, value in db.items():
            value['hash'] = key
            status.append(value)
            status.sort(lambda a,b: cmp(a['filename'],b['filename']))
        stream = StringIO.StringIO()
        writer = csv.writer(stream)
        writer.writerow(('FILENAME','GIVEN_NAME','CREATED_ON','DISCOVERED_ON','UPLOADED_ON'))
        for params in status:
            writer.writerow((params['filename'], params['given_name'],params['camera'],
                             params['timestamp'], params['discovered_on'], params['uploaded_on']))
        return stream.getvalue()
项目:multi-diffusion    作者:chemical-diffusion    | 项目源码 | 文件源码
def _get_data(url):
    """Helper function to get data over http or from a local file"""
    if url.startswith('http://'):
        # Try Python 2, use Python 3 on exception
        try:
            resp = urllib.urlopen(url)
            encoding = resp.headers.dict.get('content-encoding', 'plain')
        except AttributeError:
            resp = urllib.request.urlopen(url)
            encoding = resp.headers.get('content-encoding', 'plain')
        data = resp.read()
        if encoding == 'plain':
            pass
        elif encoding == 'gzip':
            data = StringIO(data)
            data = gzip.GzipFile(fileobj=data).read()
        else:
            raise RuntimeError('unknown encoding')
    else:
        with open(url, 'r') as fid:
            data = fid.read()

    return data
项目:multi-diffusion    作者:chemical-diffusion    | 项目源码 | 文件源码
def get_data(url, gallery_dir):
    """Persistent dictionary usage to retrieve the search indexes"""

    # shelve keys need to be str in python 2
    if sys.version_info[0] == 2 and isinstance(url, unicode):
        url = url.encode('utf-8')

    cached_file = os.path.join(gallery_dir, 'searchindex')
    search_index = shelve.open(cached_file)
    if url in search_index:
        data = search_index[url]
    else:
        data = _get_data(url)
        search_index[url] = data
    search_index.close()

    return data
项目:pycreate2    作者:MomsFriendlyRobotCompany    | 项目源码 | 文件源码
def read():
    db = shelve.open(filename)
    imgs = db['imgs']
    data = db['data']

    for i in range(len(imgs)):
        d = data[i]
        print(i, d)
        img = imgs[i]
        img = np.fromstring(img, np.uint8)
        frame = cv2.imdecode(img, 1)
        print('frame[{}] {}'.format(i, frame.shape))
        cv2.imshow('camera', frame)
        cv2.waitKey(300)

    print('bye ...')
    cv2.destroyAllWindows()
    db.close()
项目:mycroft-skill-jupiter-broadcasting    作者:the7erm    | 项目源码 | 文件源码
def get_latest_episode(self, url, media=False):
        storage_path = join(self.file_system.path, 'feedcache')
        LOGGER.debug("storage_path:%s" % storage_path)
        storage = shelve.open(storage_path)
        ttl = 60 * 60
        link = ""
        try:
            fc = cache.Cache(storage, timeToLiveSeconds=ttl)
            parsed_data = fc.fetch(url)
            print "parsed_data.feed.title:", parsed_data.feed.title
            for entry in parsed_data.entries:
                pprint(entry)
                if media:
                    media_content = entry.media_content
                    if media_content:
                        link = entry.media_content[0]['url']
                else:
                    link = entry.link
                if link:
                    break
        finally:
            storage.close()
        return link
项目:yt    作者:yt-project    | 项目源码 | 文件源码
def dump(self, result_storage):
        # The 'tainted' attribute is automatically set to 'True'
        # if the dataset required for an answer test is missing
        # (see can_run_ds() and can_run_sim()).
        # This logic check prevents creating a shelve with empty answers.
        storage_is_tainted = result_storage.get('tainted', False)
        if self.answer_name is None or storage_is_tainted:
            return
        # Store data using shelve
        ds = shelve.open(self.answer_name, protocol=-1)
        for ds_name in result_storage:
            answer_name = "%s" % ds_name
            if answer_name in ds:
                mylog.info("Overwriting %s", answer_name)
            ds[answer_name] = result_storage[ds_name]
        ds.close()
项目:Price-Comparator    作者:Thejas-1    | 项目源码 | 文件源码
def val_dump(rels, db):
    """
    Make a ``Valuation`` from a list of relation metadata bundles and dump to
    persistent database.

    :param rels: bundle of metadata needed for constructing a concept
    :type rels: list of dict
    :param db: name of file to which data is written.
               The suffix '.db' will be automatically appended.
    :type db: str
    """
    concepts = process_bundle(rels).values()
    valuation = make_valuation(concepts, read=True)
    db_out = shelve.open(db, 'n')

    db_out.update(valuation)

    db_out.close()
项目:Price-Comparator    作者:Thejas-1    | 项目源码 | 文件源码
def label_indivs(valuation, lexicon=False):
    """
    Assign individual constants to the individuals in the domain of a ``Valuation``.

    Given a valuation with an entry of the form ``{'rel': {'a': True}}``,
    add a new entry ``{'a': 'a'}``.

    :type valuation: Valuation
    :rtype: Valuation
    """
    # collect all the individuals into a domain
    domain = valuation.domain
    # convert the domain into a sorted list of alphabetic terms
    # use the same string as a label
    pairs = [(e, e) for e in domain]
    if lexicon:
        lex = make_lex(domain)
        with open("chat_pnames.cfg", 'w') as outfile:
            outfile.writelines(lex)
    # read the pairs into the valuation
    valuation.update(pairs)
    return valuation
项目:_    作者:zengchunyun    | 项目源码 | 文件源码
def auth(self):
        connect_database = shelve.open("../database/database")
        database = connect_database.get("data")
        verify = UserVerify(database)
        conn = self.request
        str()
        print(bytes(u"??????"))
        conn.send(bytes("??????:"))
        username = conn.recv(1024)
        conn.send(bytes("?????:"))
        password = conn.recv(1024)
        login = None
        if username and password:
            try:
                login = verify.login(user=username, password=password)
            except SystemExit as e:
                print(e)
                conn.close()
        if not login:
            return False
        else:
            return True
项目:logscan    作者:magedu    | 项目源码 | 文件源码
def __init__(self, db_path):
        self.db_path = db_path
        self.db = shelve.open(self.db_path, 'c')
        self.lock = threading.Lock()
        self.stopped = False
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def __init__(self, filename, flag='c', protocol=None, writeback=False):
        import anydbm
        Shelf.__init__(self, anydbm.open(filename, flag), protocol, writeback)
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def open(filename, flag='c', protocol=None, writeback=False):
    """Open a persistent dictionary for reading and writing.

    The filename parameter is the base filename for the underlying
    database.  As a side-effect, an extension may be added to the
    filename and more than one file may be created.  The optional flag
    parameter has the same interpretation as the flag parameter of
    anydbm.open(). The optional protocol parameter specifies the
    version of the pickle protocol (0, 1, or 2).

    See the module's __doc__ string for an overview of the interface.
    """

    return DbfilenameShelf(filename, flag, protocol, writeback)
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def parse(self):
        if not os.path.isdir("recipes"):
            raise ParseError("No recipes directory found.")
        self.__cache.open()
        try:
            self.__parse()

            # config files overrule everything else
            for c in self.__configFiles:
                c = str(c) + ".yaml"
                if not os.path.isfile(c):
                    raise ParseError("Config file {} does not exist!".format(c))
                self.__parseUserConfig(c)
        finally:
            self.__cache.close()
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def __generatePackages(self, nameFormatter, env, cacheKey, sandboxEnabled):
        # use separate caches with and without sandbox
        if sandboxEnabled:
            cacheName = ".bob-packages-sb.pickle"
        else:
            cacheName = ".bob-packages.pickle"

        # try to load the persisted packages
        states = { n:s() for (n,s) in self.__states.items() }
        rootPkg = Package()
        rootPkg.construct("<root>", [], nameFormatter, None, [], [], states,
            {}, {}, None, None, [], {}, -1)
        try:
            with open(cacheName, "rb") as f:
                persistedCacheKey = f.read(len(cacheKey))
                if cacheKey == persistedCacheKey:
                    tmp = PackageUnpickler(f, self.getRecipe, self.__plugins,
                                           nameFormatter).load()
                    return tmp.toStep(nameFormatter, rootPkg).getPackage()
        except (EOFError, OSError, pickle.UnpicklingError):
            pass

        # not cached -> calculate packages
        result = self.__rootRecipe.prepare(nameFormatter, env, sandboxEnabled,
                                           states)[0]

        # save package tree for next invocation
        tmp = CoreStepRef(rootPkg, result.getPackageStep())
        try:
            newCacheName = cacheName + ".new"
            with open(newCacheName, "wb") as f:
                f.write(cacheKey)
                PackagePickler(f, nameFormatter).dump(tmp)
            os.replace(newCacheName, cacheName)
        except OSError as e:
            print("Error saving internal state:", str(e), file=sys.stderr)

        return result
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def open(self):
        self.__shelve = shelve.open(".bob-cache.shelve")
        self.__files = {}
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def loadYaml(self, name, yamlSchema, default):
        bs = binStat(name)
        if name in self.__shelve:
            cached = self.__shelve[name]
            if ((cached['lstat'] == bs) and
                (cached.get('vsn') == BOB_INPUT_HASH)):
                self.__files[name] = cached['digest']
                return cached['data']

        with open(name, "r") as f:
            try:
                rawData = f.read()
                data = yaml.safe_load(rawData)
                digest = hashlib.sha1(rawData.encode('utf8')).digest()
            except Exception as e:
                raise ParseError("Error while parsing {}: {}".format(name, str(e)))

        if data is None: data = default
        try:
            data = yamlSchema.validate(data)
        except schema.SchemaError as e:
            raise ParseError("Error while validating {}: {}".format(name, str(e)))

        self.__files[name] = digest
        self.__shelve[name] = {
            'lstat' : bs,
            'data' : data,
            'vsn' : BOB_INPUT_HASH,
            'digest' : digest
        }
        return data
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def loadBinary(self, name):
        with open(name, "rb") as f:
            result = f.read()
        self.__files[name] = hashlib.sha1(result).digest()
        return result
项目:saapy    作者:ashapochka    | 项目源码 | 文件源码
def build_prj1_code_graph(self):
        if not self.prj1_scitools_client.project_exists():
            print('understand project does not exist, '
                  'first run "$ prj1 understand --build"')
        else:
            with shelve.open(str(self.shelve_prj1_code_db_path)) as db:
                self.prj1_scitools_client.open_project()
                scitools_project = self.prj1_scitools_client.build_project(
                    self.prj1_code_repo_path)
                self.prj1_scitools_client.close_project()
                db['code_graph'] = scitools_project
                print('loaded scitools project of size',
                      len(scitools_project.code_graph))
                print('entity kinds:', scitools_project.entity_kinds)
                print('ref kinds:', scitools_project.ref_kinds)
项目:saapy    作者:ashapochka    | 项目源码 | 文件源码
def build_git_graph(self):
        with shelve.open(str(self.shelve_db_path)) as db:
            git_client = GitClient(self.git_repo_path)
            git_graph = git_client.build_commit_graph()
            git_client.add_commit_tree(git_graph, ref_name='origin/master')
            db['git_graph'] = git_graph
        self.git_graph = git_graph
项目:saapy    作者:ashapochka    | 项目源码 | 文件源码
def load_git_graph(self):
        with shelve.open(str(self.shelve_db_path)) as db:
            if 'git_graph' in db:
                self.git_graph = db['git_graph']
            else:
                self.git_graph = None
        return self.git_graph
项目:saapy    作者:ashapochka    | 项目源码 | 文件源码
def build_code_graph(self):
        if not self.scitools_client.project_exists():
            print('understand project does not exist, '
                  'first run "$ povray understand --build"')
        else:
            with shelve.open(str(self.shelve_db_path)) as db:
                self.scitools_client.open_project()
                self.scitools_project = self.scitools_client.build_project(
                    self.git_repo_path)
                self.scitools_client.close_project()
                db['code_graph'] = self.scitools_project
                print('loaded scitools project of size',
                      len(self.scitools_project.code_graph))
                print('entity kinds:', self.scitools_project.entity_kinds)
                print('ref kinds:', self.scitools_project.entity_kinds)
项目:saapy    作者:ashapochka    | 项目源码 | 文件源码
def load_code_graph(self) -> ScitoolsProject:
        with shelve.open(str(self.shelve_db_path)) as db:
            if 'code_graph' in db:
                self.scitools_project = db['code_graph']
            else:
                self.scitools_project = None
        return self.scitools_project
项目:saapy    作者:ashapochka    | 项目源码 | 文件源码
def test_unshelve_similarity_graph(data_root):
    with shelve.open(str(data_root / 'test-similarity_graph.shelve')) as db:
        graph = db['similarity_graph']
    assert graph is not None
    print()
    actor_groups = graph.group_similar_actors()
    pprint(actor_groups)
项目:studsup    作者:ebmscruff    | 项目源码 | 文件源码
def load_game(mySaveNum):
    myShelfFile = "./save"+str(mySaveNum)
    print("Save file: {0}".format(myShelfFile))
    myOpenShelf = shelve.open(myShelfFile)
    cities = myOpenShelf['cities']
    clubs = myOpenShelf['clubs']
    humans = myOpenShelf['humans']
    leagues = myOpenShelf['leagues']
    managers = myOpenShelf['managers']
    chairmans = myOpenShelf['chairmans']
    currentCity = myOpenShelf['currentCity']
    chosenClub = myOpenShelf['chosenClub']
    chosenLeague = myOpenShelf['chosenLeague']
    myOpenShelf.close()
    menuMain(chosenLeague, chosenClub)
项目:studsup    作者:ebmscruff    | 项目源码 | 文件源码
def save_game(myChosenLeague, myChosenClub, mySaveNum=0, myCurrentCity=0):
    myShelfFile = "./save"+str(mySaveNum)
    print("Save file: {0}".format(myShelfFile))
    myOpenShelf = shelve.open(myShelfFile)
    myOpenShelf['cities'] = cities
    myOpenShelf['clubs'] = clubs
    myOpenShelf['humans'] = humans
    myOpenShelf['leagues'] = leagues
    myOpenShelf['managers'] = managers
    myOpenShelf['chairmans'] = chairmans
    myOpenShelf['currentCity'] = myCurrentCity
    myOpenShelf['chosenClub'] = myChosenClub
    myOpenShelf['chosenLeague'] = myChosenLeague
    myOpenShelf.close()
项目:jenkins-epo    作者:peopledoc    | 项目源码 | 文件源码
def get(self, *a, **kw):
        self.open()
        return super(FileCache, self).get(*a, **kw)
项目:jenkins-epo    作者:peopledoc    | 项目源码 | 文件源码
def save(self):
        self.open()
        self.storage.sync()
        logger.debug("Saved %s.", SETTINGS.CACHE_PATH)