我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用tempfile.gettempdir()。
def mkstemp(suffix=None, prefix=None, dir=None, text=False): """ Args: suffix (`pathlike` or `None`): suffix or `None` to use the default prefix (`pathlike` or `None`): prefix or `None` to use the default dir (`pathlike` or `None`): temp dir or `None` to use the default text (bool): if the file should be opened in text mode Returns: Tuple[`int`, `fsnative`]: A tuple containing the file descriptor and the file path Raises: EnvironmentError Like :func:`python3:tempfile.mkstemp` but always returns a `fsnative` path. """ suffix = fsnative() if suffix is None else path2fsn(suffix) prefix = gettempprefix() if prefix is None else path2fsn(prefix) dir = gettempdir() if dir is None else path2fsn(dir) return tempfile.mkstemp(suffix, prefix, dir, text)
def mkdtemp(suffix=None, prefix=None, dir=None): """ Args: suffix (`pathlike` or `None`): suffix or `None` to use the default prefix (`pathlike` or `None`): prefix or `None` to use the default dir (`pathlike` or `None`): temp dir or `None` to use the default Returns: `fsnative`: A path to a directory Raises: EnvironmentError Like :func:`python3:tempfile.mkstemp` but always returns a `fsnative` path. """ suffix = fsnative() if suffix is None else path2fsn(suffix) prefix = gettempprefix() if prefix is None else path2fsn(prefix) dir = gettempdir() if dir is None else path2fsn(dir) return tempfile.mkdtemp(suffix, prefix, dir)
def runTest(self): """The actual test goes here.""" if os.system( "ruby --help > %s" % os.path.join( tempfile.gettempdir(), tempfile.gettempprefix() ) ): self.stop() raise RuntimeError("""Can't find the "ruby" command.""") self.reportProgres() if not os.path.exists("py2rb/builtins/module.rb"): self.stop() raise RuntimeError("""Can't find the "py2rb/builtins/module.rb" command.""") if not os.path.exists("py2rb/builtins/using.rb"): self.stop() raise RuntimeError("""Can't find the "py2rb/builtins/using.rb" command.""") if not os.path.exists("py2rb/builtins/require.rb"): self.stop() raise RuntimeError("""Can't find the "py2rb/builtins/require.rb" command.""") self.reportProgres()
def serve(content): """Write content to a temp file and serve it in browser""" temp_folder = tempfile.gettempdir() temp_file_name = tempfile.gettempprefix() + str(uuid.uuid4()) + ".html" # Generate a file path with a random name in temporary dir temp_file_path = os.path.join(temp_folder, temp_file_name) # save content to temp file save(temp_file_path, content) # Open templfile in a browser webbrowser.open("file://{}".format(temp_file_path)) # Block the thread while content is served try: while True: time.sleep(1) except KeyboardInterrupt: # cleanup the temp file os.remove(temp_file_path)
def download_workflow(url): """Download workflow at ``url`` to a local temporary file. :param url: URL to .alfredworkflow file in GitHub repo :returns: path to downloaded file """ filename = url.split("/")[-1] if (not filename.endswith('.alfredworkflow') and not filename.endswith('.alfred3workflow')): raise ValueError('Attachment `{0}` not a workflow'.format(filename)) local_path = os.path.join(tempfile.gettempdir(), filename) wf().logger.debug( 'Downloading updated workflow from `%s` to `%s` ...', url, local_path) response = web.get(url) with open(local_path, 'wb') as output: output.write(response.content) return local_path
def load_names_data(): fp = os.path.join(tempfile.gettempdir(), ZIP_NAME) if not os.path.exists(fp): r = requests.get(URL_NAMES) with open(fp, 'wb') as f: f.write(r.content) post = collections.OrderedDict() with zipfile.ZipFile(fp) as zf: # get ZipInfo instances for zi in sorted(zf.infolist(), key=lambda zi: zi.filename): fn = zi.filename if fn.startswith('yob'): year = int(fn[3:7]) df = pd.read_csv( zf.open(zi), header=None, names=('name', 'gender', 'count')) df['year'] = year post[year] = df df = pd.concat(post.values()) df.set_index('name', inplace=True, drop=True) return df
def render_xlsx(cls): ''' Render the df to a xlsx file. ''' # load data df = sample_names_data() # build presentation model pm = tc.build_presentation_model(df=df, output_format='xlsx') # render to xlsx tempdir = tempfile.gettempdir() fp = os.path.join(tempdir, 'example1.xlsx') layout = [pm] print('Writing to ' + fp) xlsxw.XLSXWriter.to_xlsx(layout, output_fp=fp) # end_XLSXExample1 # start_XLSXExample2
def render_html(cls): # load data df = load_names_data() df = df[:100] # build presentation model pm = tc.build_presentation_model(df=df, output_format='html') # render to xlsx tempdir = tempfile.gettempdir() fp = os.path.join(tempdir, 'example_1.html') layout = [pm] print('Writing to ' + fp) html = htmlw.HTMLWriter.to_html(layout, border=1) output_fp = os.path.join( tempfile.gettempdir(), 'example1.html') with open(output_fp, 'w') as f: f.write(html) # end_HTMLExample1 # start_HTMLExample2
def compile_so(libs): # I don't know how else to find these .so files other than just asking clang # to make a .so file out of all of them clang = os.getenv('CLANG', 'clang') tempdir = tempfile.gettempdir() libname = '.'.join(sorted(libs)) target = join(tempdir, 'lib' + libname + '.so') if not os.path.exists(target): libs = ['-l' + lib for lib in libs] flags = ['-shared'] cmd = [clang, '-o', target] + flags + libs subprocess.check_call(cmd) return target
def _find_grail_rc(self): import glob import pwd import socket import tempfile tempdir = os.path.join(tempfile.gettempdir(), ".grail-unix") user = pwd.getpwuid(os.getuid())[0] filename = os.path.join(tempdir, user + "-*") maybes = glob.glob(filename) if not maybes: return None s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) for fn in maybes: # need to PING each one until we find one that's live try: s.connect(fn) except socket.error: # no good; attempt to clean it out, but don't fail: try: os.unlink(fn) except IOError: pass else: return s
def cmdargs_for_style(self, formatstyle, filename=None): # type: (Style, Optional[str]) -> List[str] assert isinstance(formatstyle, Style) configdata = bytestr(self.styletext(formatstyle)) sha = shahex(configdata) cfg = os.path.join(tempfile.gettempdir(), 'whatstyle_uncrustify_%s.cfg' % sha) if not self.tempfile_exists(cfg): writebinary(cfg, configdata) self.add_tempfile(cfg) cmdargs = ['-c', cfg] # The filename extension might be ambiguous so we choose from the languages # registered in identify_language. if self.languages: lang = self.languages[0] cmdargs.extend(['-l', lang]) return cmdargs
def cmdargs_for_style(self, formatstyle, filename=None): # type: (Style, Optional[str]) -> List[str] assert isinstance(formatstyle, Style) configdata = bytestr(self.styletext(formatstyle)) sha = shahex(configdata) cfg = os.path.join(tempfile.gettempdir(), 'whatstyle_rustfmt_%s/%s' % (sha, self.configfilename)) try: dirpath = os.path.dirname(cfg) os.makedirs(dirpath) self.add_tempfile(dirpath) except OSError as exc: if exc.errno != errno.EEXIST: raise if not self.tempfile_exists(cfg): writebinary(cfg, configdata) self.add_tempfile(cfg) cmdargs = ['--config-path', cfg] return cmdargs
def fromrepo(cls, path=None): repo = cls() if path is None: path = Repo.findparent(getcwd()) if path is None: error( "Could not find mbed program in current path \"%s\".\n" "You can fix this by calling \"mbed new .\" or \"mbed config root .\" in the root of your program." % getcwd()) repo.path = os.path.abspath(path) repo.name = os.path.basename(repo.path) cache_cfg = Global().get_cfg('CACHE', '') if cache_repositories and cache_cfg and cache_cfg != 'none' and cache_cfg != 'off' and cache_cfg != 'disabled': loc = cache_cfg if (cache_cfg and cache_cfg != 'on' and cache_cfg != 'enabled') else None repo.cache = loc or os.path.join(tempfile.gettempdir(), 'mbed-repo-cache') repo.sync() if repo.scm is None: warning( "Program \"%s\" in \"%s\" does not use source control management.\n" "To fix this you should use \"mbed new .\" in the root of your program." % (repo.name, repo.path)) return repo
def __init__(self, max_age): """Constructor. Args: max_age: Cache expiration in seconds. """ self._max_age = max_age self._file = os.path.join(tempfile.gettempdir(), FILENAME) f = LockedFile(self._file, 'a+', 'r') try: f.open_and_lock() if f.is_locked(): _read_or_initialize_cache(f) # If we can not obtain the lock, other process or thread must # have initialized the file. except Exception as e: logging.warning(e, exc_info=True) finally: f.unlock_and_close()
def compare_component_output(self, input_path, expected_output_path): rendering_engine = self.get_rendering_engine() temp_dir = tempfile.gettempdir() output_dir = os.path.join(temp_dir, str(uuid.uuid4())) process_sketch_archive(zip_path=input_path, compress_zip=False, output_path=output_dir, engine=rendering_engine) self.assertTrue(dircmp.is_same(expected_output_path, output_dir)) shutil.rmtree(output_dir) storage.clear() output_zip = os.path.join(temp_dir, "{}.zip".format(str(uuid.uuid4()))) process_sketch_archive(zip_path=input_path, compress_zip=True, output_path=output_zip, engine=rendering_engine) z = zipfile.ZipFile(output_zip) z.extractall(output_dir) self.assertTrue(dircmp.is_same(expected_output_path, output_dir)) shutil.rmtree(output_dir) os.remove(output_zip)
def create_local_temp_dir(name, directory=None): """ Create a directory for storing temporary files needed for testing neo If directory is None or not specified, automatically create the directory in {tempdir}/files_for_testing_neo on linux/unix/mac or {tempdir}\files_for_testing_neo on windows, where {tempdir} is the system temporary directory returned by tempfile.gettempdir(). """ if directory is None: directory = os.path.join(tempfile.gettempdir(), 'files_for_testing_neo') if not os.path.exists(directory): os.mkdir(directory) directory = os.path.join(directory, name) if not os.path.exists(directory): os.mkdir(directory) return directory
def activate_debug(module): """ activates the given module for debugging :param module: the module name to activate :return: None """ module = str(module) if module not in AVAILABLE_DEBUG_MODULES: print_error("debug module '{}' is unknown, cannot activate it".format(module)) return if module in ACTIVATED_DEBUG_MODULES: print_error("debug module '{}' is already active".format(module)) return # if this is the first activation set LOGFILE and print it if not ACTIVATED_DEBUG_MODULES: import os global LOGFILE LOGFILE = str(os.path.join(tempfile.gettempdir(), "outis.log")) print_message("DEBUGGING is active, writing to debug file " + str(LOGFILE)) # add module to ACTIVATED_DEBUG_MODULES ACTIVATED_DEBUG_MODULES.append(module)
def main(): args = parse_args() logging.basicConfig(level=(logging.WARN if args.quiet else logging.INFO)) # Don't allow more than 10 concurrent requests to the wayback machine concurrency = min(args.concurrency, 10) # Scrape results are stored in a temporary folder if no folder specified target_folder = args.target_folder if args.target_folder else tempfile.gettempdir() logger.info('Writing scrape results in the folder {target_folder}'.format(target_folder=target_folder)) # Parse the period entered by the user (throws an exception if the dates are not correctly formatted) from_date = datetime.strptime(args.from_date, CLI_DATE_FORMAT) to_date = datetime.strptime(args.to_date, CLI_DATE_FORMAT) # The scraper downloads the elements matching the given xpath expression in the target folder scraper = Scraper(target_folder, args.xpath) # Launch the scraping using the scraper previously instantiated scrape_archives(args.website_url, scraper.scrape, from_date, to_date, args.user_agent, timedelta(days=args.delta), concurrency)
def update_sideshow_file(fname, server_fname, server=SIDESHOW_SERVER, temp_path=gettempdir()): """ Update the JPL side show file stored locally at *fname*. The remote file is accessed via FTP on *server* at *server_fname*. The path *temp_path* is used to store intermediate files. Return *fname*. """ dest_fname = replace_path(temp_path, server_fname) logger.info('opening connection to {}'.format(server)) with closing(FTP(server)) as ftp, open(dest_fname, 'w') as fid: logger.info('logging in') ftp.login() logger.info('writing to {}'.format(dest_fname)) ftp.retrbinary('RETR ' + server_fname, fid.write) logger.info('uncompressing file to {}'.format(fname)) with GzipFile(dest_fname) as gzip_fid, open(fname, 'w') as fid: fid.write(gzip_fid.read()) return fname
def kernel(): """Create a ipykernel conda environment separate from this test environment where jupyter console is installed. The environments must be separate otherwise we cannot easily check if kernel start is activating the environment or if it was already active when the test suite started. """ # unique name for the kernel and environment name = str(uuid4()) env_path = '{}/kernel-env-{name}'.format(gettempdir(), name=name) pexpect.run('/bin/bash -c "conda create -y -p {env_path} ipykernel && \ source activate {env_path} && \ python -m ipykernel install --user \ --name {name}"'.format(env_path=env_path, name=name)) # query jupyter for the user data directory in a separate command to # make parsing easier stdout = pexpect.run('jupyter --data-dir') user_path = stdout.decode('utf-8').strip() # the kernel spec resides in the jupyter user data path spec_path = os.path.join(user_path, 'kernels', name) yield Kernel(name, os.path.join(spec_path, 'kernel.json'), env_path) shutil.rmtree(env_path)
def _get_storage_path(cls): try: return cls._storage_path except AttributeError: storage_path = getattr(settings, "SESSION_FILE_PATH", None) if not storage_path: storage_path = tempfile.gettempdir() # Make sure the storage path is valid. if not os.path.isdir(storage_path): raise ImproperlyConfigured( "The session storage path %r doesn't exist. Please set your" " SESSION_FILE_PATH setting to an existing directory in which" " Django can store session data." % storage_path) cls._storage_path = storage_path return storage_path
def _should_continue(): min_check_interval = 3600 * 24 update_timestamp_file = os.path.join(tempfile.gettempdir(), 'uavcan_gui_tool', 'update_check_timestamp') try: with open(update_timestamp_file, 'r') as f: update_check_timestamp = float(f.read().strip()) except Exception: logger.debug('Update timestamp file could not be read', exc_info=True) update_check_timestamp = 0 if (time.time() - update_check_timestamp) < min_check_interval: return False try: os.makedirs(os.path.dirname(update_timestamp_file)) except Exception: pass # Nobody cares. with open(update_timestamp_file, 'w') as f: f.write(str(time.time())) return True
def setUp(self): self.tmp_dir = tempfile.gettempdir() self.template_name = 'good_template.html' self.bad_string = '<% name="Jazzar" My name is ${name}.' self.template_string = '<% name="Jazzar" %> My name is ${name}.' tmp_template = os.path.join(self.tmp_dir, self.template_name) with open(tmp_template, 'w') as f: f.write(self.template_string) parameters = { 'NAME': 'mako', 'DIRS': [self.tmp_dir], 'APP_DIRS': False, 'OPTIONS': {} } self.mako_backend = MakoBackend(parameters)
def __init__(self): self.DUMMY_FILE = '{0}/dummy.hosts'.format(tempfile.gettempdir()) pass # def generate_hostname(self, minlen=5, maxlen=20): # """ # Generate a random hostname between minlen and maxlen, appending # '.example.net' (eg. oskpijsrss7.example.net) # # :param minlen: minimum length of the hostname (default: 5) # # :param maxlen: maximum length of the hostname (default: 20) # """ # # len = random.randint(minlen, maxlen) # # host_str = ''.join( # random.choice(string.ascii_lowercase) for _ in range(len -1)) # # host_str = '{0}{1}.example.net'.format( # host_str, random.choice(string.digits)) # # return host_str
def __init__(self, max_age): """Constructor. Args: max_age: Cache expiration in seconds. """ self._max_age = max_age self._file = os.path.join(tempfile.gettempdir(), FILENAME) f = LockedFile(self._file, 'a+', 'r') try: f.open_and_lock() if f.is_locked(): _read_or_initialize_cache(f) # If we can not obtain the lock, other process or thread must # have initialized the file. except Exception as e: LOGGER.warning(e, exc_info=True) finally: f.unlock_and_close()
def cmdDiffFolder( self, folder, head=False ): self.debugLog( 'cmdDiffFolder( %r )' % (folder,) ) abs_folder = self.pathForSvn( folder ) if head: old_rev = self.svn_rev_head else: old_rev = self.svn_rev_base diff_text = self.client().diff( tempfile.gettempdir(), abs_folder, old_rev, abs_folder, self.svn_rev_working, recurse=True, relative_to_dir=str( self.projectPath() ), use_git_diff_format=True ) return diff_text
def download_workflow(url): """Download workflow at ``url`` to a local temporary file. :param url: URL to .alfredworkflow file in GitHub repo :returns: path to downloaded file """ filename = url.split('/')[-1] if (not filename.endswith('.alfredworkflow') and not filename.endswith('.alfred3workflow')): raise ValueError('attachment not a workflow: {0}'.format(filename)) local_path = os.path.join(tempfile.gettempdir(), filename) wf().logger.debug( 'downloading updated workflow from `%s` to `%s` ...', url, local_path) response = web.get(url) with open(local_path, 'wb') as output: output.write(response.content) return local_path
def __init__(self): self._target_size = 10 logging.info("loading minst data") path = os.path.join(tempfile.gettempdir(), "mnist.pkl.gz") if not os.path.exists(path): logging.info("downloading minst data") urllib.urlretrieve (MNIST_URL, path) self._train_set, self._valid_set, self._test_set = cPickle.load(gzip.open(path, 'rb')) # Moving validation examples to training set, leaving 1000 train_set_x = np.vstack((self._train_set[0], self._valid_set[0][:-1000])) train_set_y = np.hstack((self._train_set[1], self._valid_set[1][:-1000])) valid_set_x = self._valid_set[0][-1000:] valid_set_y = self._valid_set[1][-1000:] self._train_set = (train_set_x, train_set_y) self._valid_set = (valid_set_x, valid_set_y) logging.info("[mnist small validation] training data size: %d" % len(self._train_set[0])) logging.info("[mnist small validation] valid data size: %d" % len(self._valid_set[0])) logging.info("[mnist small validation] test data size: %d" % len(self._test_set[0]))
def ssl_authenticator(): try: import pupy_credentials keystr=pupy_credentials.SSL_BIND_KEY certstr=pupy_credentials.SSL_BIND_CERT except: keystr=DEFAULT_SSL_BIND_KEY certstr=DEFAULT_SSL_BIND_CERT key_path=None cert_path=None if os.path.isfile("pupy.conf"): config = configparser.ConfigParser() config.read("pupy.conf") key_path=config.get("pupyd","keyfile").replace("\\",os.sep).replace("/",os.sep) cert_path=config.get("pupyd","certfile").replace("\\",os.sep).replace("/",os.sep) else: tmpdir=tempfile.gettempdir() cert_path=os.path.join(tmpdir, ''.join(random.choice(string.lowercase+string.digits) for _ in range(random.randint(5,8)))) key_path=os.path.join(tmpdir,''.join(random.choice(string.lowercase+string.digits) for _ in range(random.randint(5,8)))) with open(cert_path,'wb') as f: f.write(certstr.strip()) with open(key_path,'wb') as f: f.write(keystr.strip()) return SSLAuthenticator(key_path, cert_path, ciphers="SHA256+AES256:SHA1+AES256:@STRENGTH")
def download_workflow(url): """Download workflow at ``url`` to a local temporary file. :param url: URL to .alfredworkflow file in GitHub repo :returns: path to downloaded file """ filename = url.split("/")[-1] if (not url.endswith('.alfredworkflow') or not filename.endswith('.alfredworkflow')): raise ValueError('Attachment `{0}` not a workflow'.format(filename)) local_path = os.path.join(tempfile.gettempdir(), filename) wf().logger.debug( 'Downloading updated workflow from `%s` to `%s` ...', url, local_path) response = web.get(url) with open(local_path, 'wb') as output: output.write(response.content) return local_path
def test_profiler(self): self.do_computation() profilers = self.sc.profiler_collector.profilers self.assertEqual(1, len(profilers)) id, profiler, _ = profilers[0] stats = profiler.stats() self.assertTrue(stats is not None) width, stat_list = stats.get_print_list([]) func_names = [func_name for fname, n, func_name in stat_list] self.assertTrue("heavy_foo" in func_names) old_stdout = sys.stdout sys.stdout = io = StringIO() self.sc.show_profiles() self.assertTrue("heavy_foo" in io.getvalue()) sys.stdout = old_stdout d = tempfile.gettempdir() self.sc.dump_profiles(d) self.assertTrue("rdd_%d.pstats" % id in os.listdir(d))
def _get_pandoc_version(pandoc_path): new_env = os.environ.copy() if 'HOME' not in os.environ: new_env['HOME'] = tempfile.gettempdir() p = subprocess.Popen( [pandoc_path, '--version'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, env=new_env) comm = p.communicate() out_lines = comm[0].decode().splitlines(False) if p.returncode != 0 or len(out_lines) == 0: raise RuntimeError("Couldn't call pandoc to get version information. Output from " "pandoc:\n%s" % str(comm)) version_pattern = re.compile(r"^\d+(\.\d+){1,}$") for tok in out_lines[0].split(): if version_pattern.match(tok): version = tok break return version
def configure(dir=None, format_strs=None): if dir is None: dir = os.getenv('OPENAI_LOGDIR') if dir is None: dir = osp.join(tempfile.gettempdir(), datetime.datetime.now().strftime("openai-%Y-%m-%d-%H-%M-%S-%f")) assert isinstance(dir, str) os.makedirs(dir, exist_ok=True) if format_strs is None: strs = os.getenv('OPENAI_LOG_FORMAT') format_strs = strs.split(',') if strs else LOG_OUTPUT_FORMATS output_formats = [make_output_format(f, dir) for f in format_strs] Logger.CURRENT = Logger(dir=dir, output_formats=output_formats) log('Logging to %s'%dir)
def main(): cachePath = os.path.join(tempfile.gettempdir(), CACHEFILE) cache = loadCache(cachePath, BASE64) chainInfo = rpcRetrieve('getblockchaininfo') bestHash = chainInfo['bestblockhash'] height = int(chainInfo['blocks']) bip9forks = chainInfo['bip9_softforks'] print(formatWelcome(cache, WINDOW, bestHash, height, F(chainInfo['difficulty']), bip9forks, THRESHOLD)) if cache.height == 0: print('Please wait while retrieving latest block versions and caching them...\n') if len(cache.hashes) < 1 or cache.hashes[0] != bestHash: retrieveBlock = lambda h: rpcRetrieve('getblock', h) cache = updateCache(cache, WINDOW, HASHES_SIZE, bestHash, height, retrieveBlock) saveCache(cache, cachePath, BASE64) print(formatAllData(cache, bip9forks, THRESHOLD, WINDOW))
def download_and_save_icons(dist_dir): saved_icons = dict() temp_dir = tempfile.gettempdir() temp_path = download_icons(AWS_ICON_DOWNLOAD_URL, os.path.join(temp_dir, 'icons.zip')) for image, friendly_name in get_images_from_archive_file(temp_path): emoji_icon = emojinize_image(image) dist_path = os.path.join( dist_dir, '{name}.{ext}'.format(name=friendly_name, ext='png'), ) emoji_icon.save(dist_path, 'PNG', optimize=True) logging.info("Save emoji: %s", dist_path) saved_icons[friendly_name] = dist_path return saved_icons
def prepare_test(test_type): """prepare the geodatabase data for running tests on""" cfg = TEST_CONFIG[test_type] test_type = cfg['name'] out_report_folder = os.path.join(tempfile.gettempdir(), test_type) if not os.path.exists(out_report_folder): os.mkdir(out_report_folder) if arcpy_found: xml_schema = cfg['xml_schema'] in_gdb = arcpy.CreateFileGDB_management(out_report_folder, test_type).getOutput(0) arcpy.ImportXMLWorkspaceDocument_management(in_gdb, xml_schema, "SCHEMA_ONLY") else: in_gdb = cfg['ogr_geodatabase'] json_results = cfg['json_results'] return (in_gdb, out_report_folder, json_results) #adding the project folder to support running test files individually and from the IDE
def zip_folder(folder_path): """ Zip the entire folder and return a file to the zip. Use this inside a "with" statement to cleanup the zipfile after it is used. :param folder_path: :return: Name of the zipfile """ filename = os.path.join( tempfile.gettempdir(), "data-" + uuid.uuid4().hex) zipfile_name = make_zip(filename, folder_path) try: yield zipfile_name finally: if os.path.exists(zipfile_name): os.remove(zipfile_name)
def rtask_save_proc(task=None): import os import tempfile # save data in /tmp/tickdata with open(os.path.join(os.sep, tempfile.gettempdir(), 'tickdata'), 'w') as fd: while True: i, n = yield task.receive() if n is None: break fd.write('%s: %s\n' % (i, n)) raise StopIteration(0) # This task runs on client. It gets trend messages from remote task that # computes moving window average.