我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.path.isfile()。
def test_uu_encoding(self): """ Test the encoding of data; this is nessisary prior to a post """ # First we take a binary file binary_filepath = join(self.var_dir, 'joystick.jpg') assert isfile(binary_filepath) # Initialize Codec encoder = CodecUU(work_dir=self.test_dir) content = encoder.encode(binary_filepath) # We should have gotten an ASCII Content Object assert isinstance(content, NNTPAsciiContent) is True # We should actually have content associated with out data assert len(content) > 0
def test_hexdump(self): """converts binary content to hexidecimal in a standard easy to read format """ # Compare File hexdump_file = join(self.var_dir, 'hexdump.txt') assert isfile(hexdump_file) all_characters = ''.join(map(chr, range(0, 256))) with open(hexdump_file, 'r') as fd_in: ref_data = fd_in.read() # when reading in content, there is a new line appeneded # after the last line (even if one isn't otherwise present) # rstrip() to just simplify the test by stripping off # all trailing whitespace assert hexdump(all_characters) == ref_data.rstrip()
def session(self, reset=False): """ Returns a database session """ if not self._loaded: return False if not isfile(self.db_path): reset = True if self._db and reset is True: self._db = None if self._db is None: # Reset our database self._db = NNTPGetDatabase(engine=self.engine, reset=reset) # Acquire our session return self._db.session()
def session(self, reset=False): """ Returns a database session """ if not self._loaded: return False if not isfile(self.db_path): reset = True if self._db and reset is True: self._db = None if self._db is None: # Reset our database self._db = NNTPPostDatabase(engine=self.engine, reset=reset) # Acquire our session return self._db.session()
def database_reset(ctx): """ Reset's the database based on the current configuration """ logger.info('Resetting database ...') ctx['NNTPSettings'].open(reset=True) __db_prep(ctx) db_path = join(ctx['NNTPSettings'].base_dir, 'cache', 'search') logger.debug('Scanning %s for databases...' % db_path) with pushd(db_path, create_if_missing=True): for entry in listdir(db_path): db_file = join(db_path, entry) if not isfile(db_file): continue try: unlink(db_file) logger.info('Removed %s ...' % entry) except: logger.warning('Failed to remove %s ...' % entry)
def dirsize(src): """ Takes a source directory and returns the entire size of all of it's content(s) in bytes. The function returns None if the size can't be properly calculated. """ if not isdir(src): # Nothing to return return 0 try: with pushd(src, create_if_missing=False): size = sum(getsize(f) for f in listdir('.') if isfile(f)) except (OSError, IOError): return None # Return our total size return size
def enumerate_backups_entities(): """Enumerates the entities of all the available backups""" if isdir(Backuper.backups_dir): # Look for subdirectories for directory in listdir(Backuper.backups_dir): entity_file = path.join(Backuper.backups_dir, directory, 'entity.tlo') # Ensure the entity.pickle file exists if isfile(entity_file): # Load and yield it with open(entity_file, 'rb') as file: with BinaryReader(stream=file) as reader: try: yield reader.tgread_object() except TypeNotFoundError: # Old user, scheme got updated, don't care. pass #endregion #region Backup exists and deletion
def backup_propic(self): """Backups the profile picture for the given entity as the current peer profile picture, returning its path""" # Allow multiple versions of the profile picture # TODO Maybe this should be another method, because when downloading media... We also have multiple versions filename = self.media_handler.get_propic_path(self.entity, allow_multiple=True) generic_filename = self.media_handler.get_propic_path(self.entity) if filename: # User may not have a profile picture if not isfile(filename): # Only download the file if it doesn't exist yet self.client.download_profile_photo(self.entity.photo, file_path=filename, add_extension=False) # If we downloaded a new version, copy it to the "default" generic file if isfile(generic_filename): remove(generic_filename) shutil.copy(filename, generic_filename) # The user may not have a profile picture return generic_filename
def storageindex(self): #get the filelist onlyfiles = [ f for f in listdir(self.indexdata) if isfile(join(self.indexdata,f)) ] #read from using pandas for f in onlyfiles: df = pd.read_csv(self.indexdata+"/"+f) s=f.split('.') name = s[0][2:8] records = json.loads(df.T.to_json()).values() for row in records: row['date'] = datetime.datetime.strptime(row['date'], "%Y-%m-%d") print name self.index[name].insert_many(records) #storage stock pool into database
def find_rain(src, paths=[]): if src[0] == '/': paths = [''] elif src[0] != '.': paths = get_paths() + paths for path in paths: if isfile(join(path, src) + '.rn'): return join(path, src) + '.rn' elif isfile(join(path, src)) and src.endswith('.rn'): return join(path, src) elif isdir(join(path, src)) and isfile(join(path, src, '_pkg.rn')): return join(path, src, '_pkg.rn') # find any file from a string
def find_name(src): path = os.path.abspath(src) path, name = os.path.split(path) fname, ext = os.path.splitext(name) if fname == '_pkg': _, fname = os.path.split(path) mname = normalize_name(fname) proot = [] while path and os.path.isfile(join(path, '_pkg.rn')): path, name = os.path.split(path) proot.insert(0, normalize_name(name)) if not src.endswith('_pkg.rn'): proot.append(mname) qname = '.'.join(proot) return (qname, mname)
def clean_epub_directory(): epubs = listdir(config.EPUB_DIRECTORY) if len(epubs) <= config.MAX_EPUB: return epubs.sort() number_to_delete = len(epubs) - config.MAX_EPUB + 2 deleted = 0 for t in epubs: f = path.join(config.EPUB_DIRECTORY, t) if not path.isfile(f): continue if deleted >= number_to_delete: break try: remove(f) deleted += 1 except OSError: pass
def loadConfig(self, config): configFile = config.getConfigFile() if not isfile(configFile): self.logger.warn("Config file %s does not exist. Using defaults." % configFile) return config self.logger.debug("Loading config from %s." % configFile) loader = ConfigParser.SafeConfigParser(allow_no_value=True) loader.add_section("main") loader.set("main", "log_file", config.logFile) loader.set("main", "migration_dirs", join(config.migrationDirs, ":")) loader.set("main", "pre_migration_dirs", join(config.preMigrationDirs, ":")) loader.set("main", "post_migration_dirs", join(config.postMigrationDirs, ":")) loader.set("main", "state_dir", config.stateDir) loader.set("main", "run_dir", config.runDir) loader.read(configFile) config.logFile = loader.get("main", "log_file") config.migrationDirs = split(loader.get("main", "migration_dirs"), ":") config.preMigrationDirs = split(loader.get("main", "pre_migration_dirs"), ":") config.postMigrationDirs = split(loader.get("main", "post_migration_dirs"), ":") config.stateDir = loader.get("main", "state_dir") config.runDir = loader.get("main", "run_dir") return config
def _record_filter(args, base_dir): """ Save the filter provided """ filter_file = '{}/.filter'.format(base_dir) if not isfile(filter_file): # do a touch filter_file open(filter_file, 'a').close() current_filter = {} with open(filter_file) as filehandle: current_filter = yaml.load(filehandle) if current_filter is None: current_filter = {} pprint.pprint(current_filter) # filter a bunch of salt content and the target key before writing rec_args = {k: v for k, v in args.items() if k is not 'target' and not k.startswith('__')} current_filter[args['target']] = rec_args with open(filter_file, 'w') as filehandle: yaml.dump(current_filter, filehandle, default_flow_style=False)
def __init__(self, title = None, pathfile = None, debug_mode = True, debug_level = 0): self.path_file = pathfile self.debug_mode = debug_mode self.starttime = time.perf_counter() self.nowtime = time.perf_counter() self.lastcall = time.perf_counter() self.debug_level = debug_level # create file? # if not isfile(self.path_file): # with open(self.path_file, 'w') as f: # f.write("-init log file-") if title is not None: today = datetime.datetime.now() s = title + " program started the " + today.strftime("%d of %b %Y at %H:%M") self.log("=============================================================\n" + s + "\n=============================================================")
def download_model(lang, paths): model_folder = join(paths.user_config, 'model') model_en_folder = join(model_folder, lang) if not isdir(model_folder): mkdir(model_folder) if not isdir(model_en_folder): mkdir(model_en_folder) file_name = paths.model_dir + '.tar.gz' if not isfile(file_name): import urllib.request import shutil url = 'https://github.com/MatthewScholefield/pocketsphinx-models/raw/master/' + lang + '.tar.gz' with urllib.request.urlopen(url) as response, open(file_name, 'wb') as file: shutil.copyfileobj(response, file) import tarfile tar = tarfile.open(file_name) tar.extractall(path=model_en_folder) tar.close()
def generate(self, name, data): """ Translate the data into different formats Depending on the format, this can be accessed different ways Args: name (IntentName): full intent name data (dict): dict containing all data from the skill Returns: bool: Whether the data could be generated """ for dir_fn in [self.rt.paths.skill_vocab, self.rt.paths.skill_formats]: file_name = join(dir_fn(name.skill), name.intent + self._extension) if isfile(file_name): with open(file_name, 'r') as file: log.debug('Generating', self.__class__.__name__ + '...') self.generate_format(file, data) return True return False
def create_file(file_path): """ Create a file and create parent folder if missing """ if not (path.isfile(file_path) and path.exists(file_path)): dirs = file_path.split("/") i = 0 while i < len(dirs) - 1: directory = "/".join(dirs[0:i + 1]).strip() if not path.exists(directory) and len(directory) != 0: makedirs(directory) logging.debug("Creating directory %s " % directory) i += 1 mknod(file_path) return True else: return False
def screenshot_area(): """ Screenshot an area of the screen using gnome-screenshot used to QR scan """ ink_flag = call(['which', 'gnome-screenshot'], stdout=PIPE, stderr=PIPE) if ink_flag == 0: file_name = path.join(GLib.get_tmp_dir(), NamedTemporaryFile().name) p = Popen(["gnome-screenshot", "-a", "-f", file_name], stdout=PIPE, stderr=PIPE) output, error = p.communicate() if error: error = error.decode("utf-8").split("\n") logging.error("\n".join([e for e in error])) if not path.isfile(file_name): logging.debug("The screenshot was not token") return False return file_name else: logging.error( "Couldn't find gnome-screenshot, please install it first") return False
def feed(self, url_list, offset=0, max_num=0): if isinstance(url_list, str): if path.isfile(url_list): with open(url_list, 'r') as fin: url_list = [line.rstrip('\n') for line in fin] else: raise IOError('url list file {} not found'.format(url_list)) elif not isinstance(url_list, list): raise TypeError('"url_list" can only be a filename or a str list') if offset < 0 or offset >= len(url_list): raise ValueError('"offset" exceed the list length') else: if max_num > 0: end_idx = min(len(url_list), offset + max_num) else: end_idx = len(url_list) for i in range(offset, end_idx): url = url_list[i] self.out_queue.put(url) self.logger.debug('put url to url_queue: {}'.format(url))
def rev_parse_manifest_path(self, cwd): """ Search parent directories for package.json. Starting at the current working directory. Go up one directory at a time checking if that directory contains a package.json file. If it does, return that directory. """ name = 'package.json' manifest_path = path.normpath(path.join(cwd, name)) bin_path = path.join(cwd, 'node_modules/.bin/') if path.isfile(manifest_path) and path.isdir(bin_path): return manifest_path parent = path.normpath(path.join(cwd, '../')) if parent == '/' or parent == cwd: return None return self.rev_parse_manifest_path(parent)
def clear_cache(force = False): """ If the folder exists, and has more than 5MB of icons in the cache, delete it to clear all the icons then recreate it. """ from os.path import getsize, join, isfile, exists from os import makedirs, listdir from sublime import cache_path from shutil import rmtree # The icon cache path icon_path = join(cache_path(), "GutterColor") # The maximum amount of space to take up limit = 5242880 # 5 MB if exists(icon_path): size = sum(getsize(join(icon_path, f)) for f in listdir(icon_path) if isfile(join(icon_path, f))) if force or (size > limit): rmtree(icon_path) if not exists(icon_path): makedirs(icon_path)
def fix_scheme_in_settings(settings_file,current_scheme, new_scheme, regenerate=False): """Change the color scheme in the given Settings to a background-corrected one""" from os.path import join, normpath, isfile settings = load_settings(settings_file) settings_scheme = settings.get("color_scheme") if current_scheme == settings_scheme: new_scheme_path = join(packages_path(), normpath(new_scheme[len("Packages/"):])) if isfile(new_scheme_path) and not regenerate: settings.set("color_scheme", new_scheme) else: generate_scheme_fix(current_scheme, new_scheme_path) settings.set("color_scheme", new_scheme) save_settings(settings_file) return True return False
def PrepareDataList(BASE, length): List = [] for M in range(0,min(length,len(BASE))): img, text = BASE[M] image = misc.imread(img,mode='RGB') #image = misc.imresize(image, [227, 227]) r1 = [] if isfile(text): f = open(text, 'r') s = f.readline() st = s.split(' ') for i in range(0,2): r1.append(int(st[i])) f.close() else: #If there are no txt file - "no bird situation" r1.append(0); r1.append(0); List.append([image,r1]) return List # Random test and train list
def findAllModules(self, project_path): modules = [] if path.isfile(path.join(project_path, 'package.json')): packageJsonFile = open(path.join(project_path, 'package.json')) try: packageJson = json.load(packageJsonFile) for key in [ "dependencies", "devDependencies", "peerDependencies", "optionalDependencies" ]: if key in packageJson: modules += packageJson[key].keys() except ValueError: SimpleImport.log_error("Failed to load package.json at {0}".format( self.project_path )) # Close file packageJsonFile.close() return modules
def getProjectSettings(self): SETTINGS_FILE = SimpleImportCommand.SETTINGS_FILE if path.isfile(path.join(self.project_path, SETTINGS_FILE)): with open(path.join(self.project_path, SETTINGS_FILE)) as raw_json: try: settings_json = json.load(raw_json) if self.interpreter.syntax in settings_json: settings = settings_json[self.interpreter.syntax] if "$path" in settings: settings_on_file = {} for match in settings["$path"]: if len(match) == 1: settings_on_file.update(match[0]) else: pattern = '|'.join(match[:-1]) if re.search("^{0}".format(pattern), self.view_relpath): settings_on_file.update(match[-1]) return settings_on_file else: return settings_json[self.interpreter.syntax] except ValueError: print("Failed to load .simple-import.json at {0}".format(self.project_path))
def download_component(component_name): try: component = [component for component in components if component["name"] == component_name][0] try: folder = dirname(dirname(__file__)) file_name = join(folder, join(*component["destination"])) if isfile(file_name): print( "Component '{}' is already existed.".format(component["name"])) else: print("Start download component '{}'".format(component["name"])) print(file_name) download_file(component["url"], file_name) print("Finish download compoent '{}'".format(component["name"])) except Exception as e: print(e) print("Cannot download component '{}'".format(component["name"])) except: message = "Error: Component with name '{}' does not exist.".format( component_name) print(message)
def clean_list(self): ''' Manage the list of items to backup ''' # Case <backup_itens> is empty if self.backup_itens == []: msg = "After version 0.0.4 <backup_itens> cannot be empty" self.log.update_log(msg) raise BaseException(msg) from None # Add items for item in self.backup_itens: if path.isfile(path.abspath(item)) or path.isdir(path.abspath(item)): self.backup_list.append(path.abspath(item)) else: log.update_log("Invalid item. It'll be wiped from backup list: <%s>" % item, 'INFO')
def renameCB(self, newname): if newname and newname != 'bootname' and newname != self.oldname: if not path.exists('/boot/%s' %newname) and path.isfile('/boot/%s' %self.oldname): ret = system("mv -fn '/boot/%s' '/boot/%s'" %(self.oldname,newname)) if ret: self.session.open(MessageBox, _('Rename failed!'), MessageBox.TYPE_ERROR) else: bootname = self.readlineFile('/boot/bootname').split('=') if len(bootname) == 2 and bootname[1] == self.oldname: self.writeFile('/boot/bootname', '%s=%s' %(bootname[0],newname)) self.getCurrent() return elif self.bootname == self.oldname: self.getCurrent() return self.list[self.selection] = newname self["config"].setText(_("Select Image: %s") %newname) else: if not path.exists('/boot/%s' %self.oldname): self.getCurrent() txt = _("File not found - rename failed!") else: txt = _("Name already exists - rename failed!") self.session.open(MessageBox, txt, MessageBox.TYPE_ERROR)
def get_host_file(_path=None): """ If ``_path`` is passed in, return that (makes it easier for the caller if it can pass a ``None``) otherwise look into the most common locations for a host file and if found, return that. """ if _path: return _path # if no path is passed onto us try and look in the cwd for a hosts file if os.path.isfile('hosts'): logger.info( 'found and loaded the hosts file from the current working directory: %s', os.getcwd() ) return path.abspath('hosts') # if that is not the case, try for /etc/ansible/hosts if path.isfile('/etc/ansible/hosts'): return '/etc/ansible/hosts' logger.warning('unable to find an Ansible hosts file to work with') logger.warning('tried locations: %s, %s', os.getcwd(), '/etc/ansible/hosts')
def _generate_sparse_format_file(self, feature_file): sparse_file = insert_modifier_in_filename(feature_file, 'sparse_format') if path.isfile(sparse_file): self.logger.info("Re-using previously generated sparse format file: %s" % sparse_file) else: self.logger.info('Generating a sparse version of the feature file (zeros replaced with empty columns ' 'which the ranker knows how to deal with)') temp_file = get_temp_file(sparse_file) with smart_file_open(temp_file, 'w') as outfile: writer = csv.writer(outfile) with smart_file_open(feature_file) as infile: reader = csv.reader(infile) for row in reader: writer.writerow(row[:1] + row[2:]) move(temp_file, sparse_file) self.logger.info('Done generating file: %s' % sparse_file) return self._get_file_size(sparse_file), sparse_file
def _drop_answer_id_col_from_feature_file(self, train_file_location): file_without_aid = insert_modifier_in_filename(train_file_location, 'no_aid') if path.isfile(file_without_aid): self.logger.info('Found a previously generated version of the training file without answer id column, ' 're-using it: %s' % file_without_aid) else: self.logger.info('Generating a version of the feature file without answer id (which is what ranker' ' training expects') temp_file = get_temp_file(file_without_aid) with smart_file_open(temp_file, 'w') as outfile: writer = csv.writer(outfile) with smart_file_open(train_file_location) as infile: reader = csv.reader(infile) for row in reader: writer.writerow(row[:1] + row[2:]) move(temp_file, file_without_aid) self.logger.info('Done generating file: %s' % file_without_aid) return file_without_aid
def str_is_existing_path(path: str) -> str: """ >>> import tempfile >>> with tempfile.TemporaryFile() as f: ... str_is_existing_file(f.name) == f.name True >>> with tempfile.TemporaryDirectory() as path: ... str_is_existing_path(path) == path True >>> str_is_existing_path('') Traceback (most recent call last): argparse.ArgumentTypeError: Given path is not an existing file or directory. >>> str_is_existing_path('/non/existing/dir') Traceback (most recent call last): argparse.ArgumentTypeError: Given path is not an existing file or directory. """ if isfile(path) or isdir(path): return path else: raise ArgumentTypeError("Given path is not an existing file or directory.")
def str_is_existing_file(path: str) -> str: """ >>> import tempfile >>> with tempfile.TemporaryFile() as f: ... str_is_existing_file(f.name) == f.name True >>> str_is_existing_file('/home') Traceback (most recent call last): argparse.ArgumentTypeError: Given path is not an existing file. >>> str_is_existing_file('') Traceback (most recent call last): argparse.ArgumentTypeError: Given path is not an existing file. >>> str_is_existing_file('/non/existing/file') Traceback (most recent call last): argparse.ArgumentTypeError: Given path is not an existing file. """ if isfile(path): return path else: raise ArgumentTypeError("Given path is not an existing file.")
def ensure_file(name, url=None, force=False, logger=logging.getLogger(), postprocess=None): """ Ensures that the file requested exists in the cache, downloading it if it does not exist. Args: name (str): name of the file. url (str): url to download the file from, if it doesn't exist. force (bool): whether to force the download, regardless of the existence of the file. logger (logging.Logger): logger to log results. postprocess (function): a function that, if given, will be applied after the file is downloaded. The function has the signature ``f(fname)`` Returns: str: file name of the downloaded file. """ fname = Embedding.path(name) if not path.isfile(fname) or force: if url: logger.critical('Downloading from {} to {}'.format(url, fname)) Embedding.download_file(url, fname) if postprocess: postprocess(fname) else: raise Exception('{} does not exist!'.format(fname)) return fname
def __init__(self, nb_classes, resnet_layers, input_shape, weights): """Instanciate a PSPNet.""" self.input_shape = input_shape json_path = join("weights", "keras", weights + ".json") h5_path = join("weights", "keras", weights + ".h5") if isfile(json_path) and isfile(h5_path): print("Keras model & weights found, loading...") with open(json_path, 'r') as file_handle: self.model = model_from_json(file_handle.read()) self.model.load_weights(h5_path) else: print("No Keras model & weights found, import from npy weights.") self.model = layers.build_pspnet(nb_classes=nb_classes, resnet_layers=resnet_layers, input_shape=self.input_shape) self.set_npy_weights(weights)
def run_makefile(make_directory): """ Runs a makefile in a given directory. Args: make_directory: directory where the Makefile is located. """ make_path = path.join(make_directory, "Makefile") if not path.isfile(make_path): raise InternalException(make_path + " does not exist.") shell = spur.LocalShell() try: shell.run(["make", "-C", make_directory]) except Exception as e: raise InternalException(str(e))
def analyze_problems(): """ Checks the sanity of inserted problems. Includes weightmap and grader verification. Returns: A list of error strings describing the problems. """ grader_missing_error = "{}: Missing grader at '{}'." unknown_weightmap_pid = "{}: Has weightmap entry '{}' which does not exist." problems = get_all_problems() errors = [] for problem in problems: if not isfile(join(grader_base_path, problem["grader"])): errors.append(grader_missing_error.format(problem["name"], problem["grader"])) for pid in problem["weightmap"].keys(): if safe_fail(get_problem, pid=pid) is None: errors.append(unknown_weightmap_pid.format(problem["name"], pid)) return errors
def get_generator(pid): """ Gets a handle on a problem generator module. Args: pid: the problem pid Returns: The loaded module """ generator_path = get_generator_path(pid) if not path.isfile(generator_path): raise InternalException("Could not find {}.".format(generator_path)) return imp.load_source(generator_path[:-3], generator_path)
def getgraphsfromdir(self, path=None): """Get the files that are part of the repository (tracked or not). Returns: A list of filepathes. """ if path is None: path = self.getRepoPath() files = [f for f in listdir(path) if isfile(join(path, f))] graphfiles = {} for file in files: format = guess_format(file) if format is not None: graphfiles[file] = format return graphfiles
def _parse(self): """ parse the history file and return a list of tuples(datetime strings, set of distributions/diffs, comments) """ res = [] if not isfile(self.path): return res sep_pat = re.compile(r'==>\s*(.+?)\s*<==') with open(self.path, 'r') as f: lines = f.read().splitlines() for line in lines: line = line.strip() if not line: continue m = sep_pat.match(line) if m: res.append((m.group(1), set(), [])) elif line.startswith('#'): res[-1][2].append(line) else: res[-1][1].add(line) return res
def delete(self, filename=''): """Deletes given file or directory. If no filename is passed, current directory is removed. """ self._raise_if_none() fn = path_join(self.path, filename) try: if isfile(fn): remove(fn) else: removedirs(fn) except OSError as why: if why.errno == errno.ENOENT: pass else: raise why
def test_decoding_uuenc_single_part(self): """ Decodes a single UUEncoded message """ # Input File encoded_filepath = join(self.var_dir, 'uuencoded.tax.jpg.msg') assert isfile(encoded_filepath) # Compare File decoded_filepath = join(self.var_dir, 'uudecoded.tax.jpg') assert isfile(decoded_filepath) # Initialize Codec ud_py = CodecUU(work_dir=self.test_dir) # Read data and decode it with open(encoded_filepath, 'r') as fd_in: article = ud_py.decode(fd_in) # our content should be valid assert isinstance(article, NNTPBinaryContent) # Verify the actual article itself reports itself # as being okay (structurally) assert article.is_valid() is True with open(decoded_filepath, 'r') as fd_in: decoded = fd_in.read() # Compare our processed content with the expected results assert decoded == article.getvalue()
def test_partial_download(self): """ Test the handling of a download that is explicitly ordered to abort after only some content is retrieved. A way of 'peeking' if you will. """ # Input File encoded_filepath = join(self.var_dir, 'uuencoded.tax.jpg.msg') assert isfile(encoded_filepath) # Compare File decoded_filepath = join(self.var_dir, 'uudecoded.tax.jpg') assert isfile(decoded_filepath) # Initialize Codec (restrict content to be no larger then 10 bytes) ud_py = CodecUU(work_dir=self.test_dir, max_bytes=10) # Read data and decode it with open(encoded_filepath, 'r') as fd_in: article = ud_py.decode(fd_in) # our content should be valid assert isinstance(article, NNTPBinaryContent) # Our article should not be considered valid on an # early exit assert article.is_valid() is False with open(decoded_filepath, 'r') as fd_in: decoded = fd_in.read() # Compare our processed content with the expected results length = len(article.getvalue()) # Even though we have't decoded all of our content, we're # still the same as the expected result up to what has been # processed. assert decoded[0:length] == article.getvalue()
def test_nzbfile_generation(self): """ Tests the creation of NZB Files """ nzbfile = join(self.tmp_dir, 'test.nzbfile.nzb') payload = join(self.var_dir, 'uudecoded.tax.jpg') assert isfile(nzbfile) is False # Create our NZB Object nzbobj = NNTPnzb() # create a fake article segpost = NNTPSegmentedPost(basename(payload)) content = NNTPBinaryContent(payload) article = NNTPArticle('testfile', groups='newsreap.is.awesome') # Note that our nzb object segment tracker is not marked as being # complete. This flag gets toggled when we add segments manually to # our nzb object or if we parse an NZB-File assert(nzbobj._segments_loaded is None) # Add our Content to the article article.add(content) # now add our article to the NZBFile segpost.add(article) # now add our Segmented Post to the NZBFile nzbobj.add(segpost) # Since .add() was called, this will be set to True now assert(nzbobj._segments_loaded is True) # Store our file assert nzbobj.save(nzbfile) is True assert isfile(nzbfile) is True
def test_bad_files(self): """ Test different variations of bad file inputs """ # No parameters should create a file nzbfile = join(self.var_dir, 'missing.file.nzb') assert not isfile(nzbfile) nzbobj = NNTPnzb(nzbfile=nzbfile) assert nzbobj.is_valid() is False assert nzbobj.gid() is None # Test Length assert len(nzbobj) == 0
def test_decoding_01(self): """ Open a stream to a file we can read for decoding; This test specifically focuses on var/group.list """ # Initialize Codec ch_py = CodecGroups() encoded_filepath = join(self.var_dir, 'group.list') assert isfile(encoded_filepath) # Read data and decode it with open(encoded_filepath, 'r') as fd_in: # This module always returns 'True' expecting more # but content can be retrieved at any time assert ch_py.decode(fd_in) is True # This is where the value is stored assert isinstance(ch_py.decoded, NNTPMetaContent) assert isinstance(ch_py.decoded.content, list) # The number of lines in group.list parsed should all be valid assert len(ch_py.decoded.content) == ch_py._total_lines # Test our reset ch_py.reset() assert isinstance(ch_py.decoded, NNTPMetaContent) assert isinstance(ch_py.decoded.content, list) assert len(ch_py.decoded.content) == 0 assert len(ch_py.decoded.content) == ch_py._total_lines
def test_decoding_03(self): """ Open a stream to a file we can read for decoding; This test specifically focuses on var/headers.test03.msg """ # Initialize Codec ch_py = CodecHeader() encoded_filepath = join(self.var_dir, 'headers.test03.msg') assert isfile(encoded_filepath) # Read data and decode it with open(encoded_filepath, 'r') as fd_in: # Decodes content and stops when complete assert isinstance(ch_py.decode(fd_in), NNTPHeader) # Read in the white space since it is actually the first line # after the end of headers delimiter assert fd_in.readline().strip() == 'First Line without spaces' #print '\n'.join(["assert ch_py['%s'] == '%s'" % (k, v) \ # for k, v in ch_py.items()]) assert len(ch_py) == 10 # with the 10 lines processed, our line_count # should be set to 10 assert ch_py._lines == 10 # assert False assert ch_py.is_valid() == True
def test_is_valid(self): """ Tests different key combinations that would cause the different return types from is_valid() """ # Initialize Codec ch_py = CodecHeader() encoded_filepath = join(self.var_dir, 'headers.test03.msg') assert isfile(encoded_filepath) # We haven't done any processing yet assert ch_py.is_valid() is None # Populate ourselves with some keys with open(encoded_filepath, 'r') as fd_in: # Decodes content and stops when complete assert isinstance(ch_py.decode(fd_in), NNTPHeader) # keys should be good! assert ch_py.is_valid() is True for k in ( 'DMCA', 'Removed', 'Cancelled', 'Blocked' ): # Intentially create a bad key: ch_py['X-%s' % k] = 'True' # We should fail now assert ch_py.is_valid() is False # it will become valid again once we clear the key del ch_py['X-%s' % k] assert ch_py.is_valid() is True