我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用os.path.join()。
def load_data(self): # work in the parent of the pages directory, because we # want the filenames to begin "pages/...". chdir(dirname(self.setup.pages_dir)) rel = relpath(self.setup.pages_dir) for root, dirs, files in walk(rel): for filename in files: start, ext = splitext(filename) if ext in self.setup.data_extensions: #yield root, dirs, filename loader = self.setup.data_loaders.get(ext) path = join(root,filename) if not loader: raise SetupError("Identified data file '%s' by type '%s' but no loader found" % (filename, ext)) data_key = join(root, start) loaded_dict = loader.loadf(path) self.data[data_key] = loaded_dict #self.setup.log.debug("data key [%s] ->" % (data_key, ), root, filename, ); pprint.pprint(loaded_dict, sys.stdout) #pprint.pprint(self.data, sys.stdout) #print("XXXXX data:", self.data)
def test_uu_encoding(self): """ Test the encoding of data; this is nessisary prior to a post """ # First we take a binary file binary_filepath = join(self.var_dir, 'joystick.jpg') assert isfile(binary_filepath) # Initialize Codec encoder = CodecUU(work_dir=self.test_dir) content = encoder.encode(binary_filepath) # We should have gotten an ASCII Content Object assert isinstance(content, NNTPAsciiContent) is True # We should actually have content associated with out data assert len(content) > 0
def test_file_load(self): """ Load the simple filters by file """ entry = copy(self.template_entry) fb = NNTPFilterBase(paths=join(self.var_dir, 'simple.nrf')) # Our hash will start at 0 (Zero) assert len(fb._regex_hash) == 0 # But now we meet our score entry['subject'] = 'A great video called "blah.avi"' assert fb.blacklist(**entry) == False entry['subject'] = 'A malicious file because it is "blah.avi.exe"' assert fb.blacklist(**entry) == True # Now load the directory; it should just find the same nrf file. fbd = NNTPFilterBase(paths=self.var_dir)
def test_hexdump(self): """converts binary content to hexidecimal in a standard easy to read format """ # Compare File hexdump_file = join(self.var_dir, 'hexdump.txt') assert isfile(hexdump_file) all_characters = ''.join(map(chr, range(0, 256))) with open(hexdump_file, 'r') as fd_in: ref_data = fd_in.read() # when reading in content, there is a new line appeneded # after the last line (even if one isn't otherwise present) # rstrip() to just simplify the test by stripping off # all trailing whitespace assert hexdump(all_characters) == ref_data.rstrip()
def database_reset(ctx): """ Reset's the database based on the current configuration """ logger.info('Resetting database ...') ctx['NNTPSettings'].open(reset=True) __db_prep(ctx) db_path = join(ctx['NNTPSettings'].base_dir, 'cache', 'search') logger.debug('Scanning %s for databases...' % db_path) with pushd(db_path, create_if_missing=True): for entry in listdir(db_path): db_file = join(db_path, entry) if not isfile(db_file): continue try: unlink(db_file) logger.info('Removed %s ...' % entry) except: logger.warning('Failed to remove %s ...' % entry)
def hexdump(src, length=16, sep='.'): """ Displays a hex output of the content it is passed. This was based on https://gist.github.com/7h3rAm/5603718 with some minor modifications """ allowed = digits + ascii_letters + punctuation + ' ' print_map = ''.join(((x if x in allowed else '.') for x in map(chr, range(256)))) lines = [] for c in xrange(0, len(src), length): chars = src[c:c + length] hex = ' '.join(["%02x" % ord(x) for x in chars]) if len(hex) > 24: hex = "%s %s" % (hex[:24], hex[24:]) printable = ''.join(["%s" % ( (ord(x) <= 127 and print_map[ord(x)]) or sep) for x in chars]) lines.append("%08x: %-*s |%s|" % (c, length * 3, hex, printable)) return '\n'.join(lines)
def get_all_pages(self): # work in the parent of the pages directory, because we # want the filenames to begin "pages/...". chdir(dirname(self.setup.pages_dir)) rel = relpath(self.setup.pages_dir) for root, dirs, files in walk(rel): # self.config.pages_dir): # examples: # # root='pages' root='pages/categories' # dirs=['categories'] dirs=[] # files=['index.html'] files=['list.html'] # self.setup.log.debug("\nTEMPLATE ROOT: %s" % root) # self.setup.log.debug("TEMPLATE DIRS: %s" % dirs) # self.setup.log.debug("TEMPLATE FILENAMES: %s" % files) # #dir_context = global_context.new_child(data_tree[root]) for filename in files: start, ext = splitext(filename) if ext in self.setup.template_extensions: # if filename.endswith(".html"): # TODO: should this filter be required at all? yield Page(self.setup, filename, join(root, filename))
def enumerate_backups_entities(): """Enumerates the entities of all the available backups""" if isdir(Backuper.backups_dir): # Look for subdirectories for directory in listdir(Backuper.backups_dir): entity_file = path.join(Backuper.backups_dir, directory, 'entity.tlo') # Ensure the entity.pickle file exists if isfile(entity_file): # Load and yield it with open(entity_file, 'rb') as file: with BinaryReader(stream=file) as reader: try: yield reader.tgread_object() except TypeNotFoundError: # Old user, scheme got updated, don't care. pass #endregion #region Backup exists and deletion
def get_processor(aid): """ Returns the processor module for a given achievement. Args: aid: the achievement id Returns: The processor module """ try: path = get_achievement(aid=aid, show_disabled=True)["processor"] base_path = api.config.get_settings()["achievements"]["processor_base_path"] return imp.load_source(path[:-3], join(base_path, path)) except FileNotFoundError: raise InternalException("Achievement processor is offline.")
def get_problem_root(problem_name, absolute=False): """ Installation location for a given problem. Args: problem_name: the problem name. absolute: should return an absolute path. Returns: The tentative installation location. """ problem_root = join(PROBLEM_ROOT, sanitize_name(problem_name)) assert problem_root.startswith(sep) if absolute: return problem_root return problem_root[len(sep):]
def get_problem(problem_path): """ Retrieve a problem spec from a given problem directory. Args: problem_path: path to the root of the problem directory. Returns: A problem object. """ json_path = join(problem_path, "problem.json") problem = json.loads(open(json_path, "r").read()) try: problem_schema(problem) except MultipleInvalid as e: logger.critical("Error validating problem object at '%s'!", json_path) logger.critical(e) raise FatalException return problem
def get_bundle_root(bundle_name, absolute=False): """ Installation location for a given bundle. Args: bundle_name: the bundle name. absolute: should return an absolute path. Returns: The tentative installation location. """ bundle_root = join(BUNDLE_ROOT, sanitize_name(bundle_name)) assert bundle_root.startswith(sep) if absolute: return bundle_root return bundle_root[len(sep):]
def get_all_problem_instances(problem_path): """ Returns a list of instances for a given problem """ instances = [] instances_dir = join(DEPLOYED_ROOT, problem_path) if os.path.isdir(instances_dir): for name in os.listdir(instances_dir): if name.endswith(".json"): try: instance = json.loads(open(join(instances_dir, name)).read()) except Exception as e: continue instances.append(instance) return instances
def clean(args, config): """ Main entrypoint for clean """ lock_file = join(HACKSPORTS_ROOT, "deploy.lock") # remove staging directories if os.path.isdir(STAGING_ROOT): logger.info("Removing the staging directories") shutil.rmtree(STAGING_ROOT) # remove lock file if os.path.isfile(lock_file): logger.info("Removing the stale lock file") os.remove(lock_file) #TODO: potentially perform more cleaning
def files_from_directory(directory, recurse=True, permissions=0o664): """ Returns a list of File objects for every file in a directory. Can recurse optionally. Args: directory: The directory to add files from recurse: Whether or not to recursively add files. Defaults to true permissions: The default permissions for the files. Defaults to 0o664. """ result = [] for root, dirnames, filenames in os.walk(directory): for filename in filenames: result.append(File(join(root, filename), permissions)) if not recurse: break return result
def dump_tabular(): """ Write all of the diagnostics from the current iteration """ vals = [] print("-"*37) for key in G.log_headers: val = G.log_current_row.get(key, "") if hasattr(val, "__float__"): valstr = "%8.3g"%val else: valstr = val print("| %15s | %15s |"%(key, valstr)) vals.append(val) print("-"*37) if G.output_file is not None: if G.first_row: G.output_file.write("\t".join(G.log_headers)) G.output_file.write("\n") G.output_file.write("\t".join(map(str,vals))) G.output_file.write("\n") G.output_file.flush() G.log_current_row.clear() G.first_row=False
def find_rain(src, paths=[]): if src[0] == '/': paths = [''] elif src[0] != '.': paths = get_paths() + paths for path in paths: if isfile(join(path, src) + '.rn'): return join(path, src) + '.rn' elif isfile(join(path, src)) and src.endswith('.rn'): return join(path, src) elif isdir(join(path, src)) and isfile(join(path, src, '_pkg.rn')): return join(path, src, '_pkg.rn') # find any file from a string
def find_name(src): path = os.path.abspath(src) path, name = os.path.split(path) fname, ext = os.path.splitext(name) if fname == '_pkg': _, fname = os.path.split(path) mname = normalize_name(fname) proot = [] while path and os.path.isfile(join(path, '_pkg.rn')): path, name = os.path.split(path) proot.insert(0, normalize_name(name)) if not src.endswith('_pkg.rn'): proot.append(mname) qname = '.'.join(proot) return (qname, mname)
def require(self, *tokens): if self.expect(*tokens): token = self.token self.next() return token if len(tokens) > 1: choices = ', '.join(str(x) for x in tokens) msg = 'Unexpected {!s}; expected one of {}'.format(self.token, choices) else: msg = 'Unexpected {!s}; expected {!s}'.format(self.token, tokens[0]) Q.abort(msg, pos=self.token.pos(file=self.file)) # program :: (stmt NEWLINE)+ EOF
def compile_so(libs): # I don't know how else to find these .so files other than just asking clang # to make a .so file out of all of them clang = os.getenv('CLANG', 'clang') tempdir = tempfile.gettempdir() libname = '.'.join(sorted(libs)) target = join(tempdir, 'lib' + libname + '.so') if not os.path.exists(target): libs = ['-l' + lib for lib in libs] flags = ['-shared'] cmd = [clang, '-o', target] + flags + libs subprocess.check_call(cmd) return target
def traverse_imports(names): """ Walks over all the names imported in a dotted_as_names node. """ pending = [names] while pending: node = pending.pop() if node.type == token.NAME: yield node.value elif node.type == syms.dotted_name: yield "".join([ch.value for ch in node.children]) elif node.type == syms.dotted_as_name: pending.append(node.children[0]) elif node.type == syms.dotted_as_names: pending.extend(node.children[::-2]) else: raise AssertionError("unkown node type")
def __init__(self, name, version, chunks, md5sums, bigtgz_md5sum, count): self.name = name self.version = version self.chunks = chunks self.md5sums = md5sums self.bigtgz_md5sum = bigtgz_md5sum self.count = count self.tmp_dir = op.join(DEFAULT_STORAGE, 'tmp') self.final_dir = op.join( DEFAULT_STORAGE, "20bn-{}-{}".format(self.name, self.version) ) self.big_tgz = op.join( self.tmp_dir, "20bn-{}-{}.tgz".format(self.name, self.version) ) self.tar_dir = op.join( self.tmp_dir, "20bn-{}-{}".format(self.name, self.version) ) self.ensure_directories_exist()
def modified_results(tool): workdir = dirname(dirname(abspath(__file__))) exeresult = run_executable('git', ['-C', workdir, 'ls-files', '-z', '-m', 'tests/examples/*/result_*']) files = exeresult.stdout.split(b'\x00') ignoreexts = bytestr('.cfg .cfg_diff .conf .pro .pro_diff .txt').split() result_filenames = [] for f in files: if not f: continue filename = join(bytestr(workdir), f) _, ext = os.path.splitext(filename) if not ext or ext in ignoreexts: continue if not os.path.exists(filename): continue result_filenames.append(filename) diff_for_files(tool, result_filenames)
def clean_epub_directory(): epubs = listdir(config.EPUB_DIRECTORY) if len(epubs) <= config.MAX_EPUB: return epubs.sort() number_to_delete = len(epubs) - config.MAX_EPUB + 2 deleted = 0 for t in epubs: f = path.join(config.EPUB_DIRECTORY, t) if not path.isfile(f): continue if deleted >= number_to_delete: break try: remove(f) deleted += 1 except OSError: pass
def extract_and_save_bin_to(dir_to_bin, dir_to_source): sets = [s for s in os.listdir(dir_to_source) if s in SETS] for d in sets: path = join(dir_to_source, d) speakers = [s for s in os.listdir(path) if s in SPEAKERS] for s in speakers: path = join(dir_to_source, d, s) output_dir = join(dir_to_bin, d, s) if not tf.gfile.Exists(output_dir): tf.gfile.MakeDirs(output_dir) for f in os.listdir(path): filename = join(path, f) print(filename) if not os.path.isdir(filename): features = extract(filename) labels = SPEAKERS.index(s) * np.ones( [features.shape[0], 1], np.float32, ) b = os.path.splitext(f)[0] features = np.concatenate([features, labels], 1) with open(join(output_dir, '{}.bin'.format(b)), 'wb') as fp: fp.write(features.tostring())
def load_all_url_files(_dir, file_name_prefix): url_list = [] for file_name in os.listdir(_dir): if fnmatch.fnmatch(file_name, file_name_prefix +'*.txt'): file_name = osp.join(_dir, file_name) fp_urls = open(file_name, 'r') #Open the text file called database.txt print 'load URLs from file: ' + file_name i = 0 for line in fp_urls: line = line.strip() if len(line)>0: splits = line.split('\t') url_list.append(splits[0].strip()) i=i+1 print str(i) + ' URLs loaded' fp_urls.close() return url_list ########### End of Functions to Load downloaded urls ########### ############## Functions to get date/time strings ############
def save(self, name_suffix: str) -> str: """ Save current tensorflow graph to a checkpoint named with the given name suffix. The checkpoint will be locaced in self.log_dir directory. :param name_suffix: saved checkpoint name suffix :return: path to the saved checkpoint """ graph_path = path.join(self._log_dir, 'model_{}.graph'.format(name_suffix)) checkpoint_path = path.join(self._log_dir, 'model_{}.ckpt'.format(name_suffix)) frozen_graph_path = path.join(self._log_dir, 'model_{}.pb'.format(name_suffix)) tf.train.write_graph(self._session.graph_def, '', graph_path, as_text=False) self._saver.save(self._session, checkpoint_path) if self._freeze_graph: with tf.Graph().as_default(): freeze_graph(input_graph=graph_path, input_checkpoint=checkpoint_path, output_node_names=self.output_names, output_graph=frozen_graph_path) return checkpoint_path
def train(self): """ Runs a training loop on the model. """ while True: inputs, targets = self.data_reader.get_train_batch(c.BATCH_SIZE, c.SEQ_LEN) print 'Training model...' feed_dict = {self.model.inputs: inputs, self.model.targets: targets} global_step, loss, _ = self.sess.run([self.model.global_step, self.model.loss, self.model.train_op], feed_dict=feed_dict) print 'Step: %d | loss: %f' % (global_step, loss) if global_step % c.MODEL_SAVE_FREQ == 0: print 'Saving model...' self.saver.save(self.sess, join(c.MODEL_SAVE_DIR, self.artist_name + '.ckpt'), global_step=global_step)
def upload_file(upload_file_name, temp): # upload_file_name????? # ??? saveas??? # ?????????,??git???saveas #key = md5(str(time.time())+''.join(random.sample(string.letters, 12))).hexdigest() # key ?????? print u"??????: ", pic_name = raw_input() uuid_6 = uuid.uuid4().get_hex()[:8] #????? key = pic_name+"_"+uuid_6+".png" copyfile(upload_file_name,join(saveas,key)) mime_type = 'image/png' token = q.upload_token(bucket, key) ret, info = put_file(token, key, upload_file_name, mime_type=mime_type, check_crc=True) print 'upload qiniu result:', info assert ret['key'] == key assert ret['hash'] == etag(upload_file_name) os.rename(upload_file_name, upload_file_name+'.old') return domain+'/'+key
def _restart_data(self, format_: str='json') -> None: assert format_ == 'json' with open(join(CURDIR, 'data', 'helloworld.py')) as f: testcode = f.read() self.data = Request({ 'filepath': 'test.py', 'action': 'ParseAST', 'content': testcode, 'language': 'python', }) bufferclass = io.StringIO if format_ == 'json' else io.BytesIO # This will mock the python_driver stdin self.sendbuffer = bufferclass() # This will mock the python_driver stdout self.recvbuffer = bufferclass()
def meta_autodetect_platform(cls): """ Dark magic to autodetect the platform for built-in shellcodes. User-defined shellcodes must define *arch* and *os*. """ abspath = path.abspath join = path.join split = path.split splitext = path.splitext sep = path.sep module = cls.__module__ if module != '__main__': tokens = cls.__module__.split('.') if len(tokens) < 2 or tokens[0] != base_package or \ tokens[1] == base_file: return tokens.insert(-1, 'any') tokens = tokens[1:3] else: module = abspath(sys.modules[module].__file__) if not module.startswith(base_dir): return tokens = module.split(sep) tokens = tokens[len(base_dir.split(sep)):-1] while len(tokens) < 2: tokens.append('any') cls.arch, cls.os = tokens
def find_bad_chars(bytes, bad_chars = None): """ Test the given bytecode against a list of bad characters. :type bytes: str :param bytes: Compiled bytecode to test for bad characters. :type bad_chars: str :param bad_chars: Bad characters to test. Defaults to `default_bad_chars`. :rtype: str :return: Bad characters present in the bytecode. """ if bad_chars is None: bad_chars = default_bad_chars return ''.join( (c for c in bad_chars if c in bytes) )
def good_chars(bad_chars = None): """ Take a bad chars list and generate the opposite good chars list. This can be useful for testing how the vulnerable program filters the characters we feed it. :type bad_chars: str :param bad_chars: Bad characters to test. Defaults to `default_bad_chars`. :rtype: str :return: Good characters. """ if bad_chars is None: bad_chars = default_bad_chars bad_list = set( map(ord, bad_chars) ) return ''.join( (chr(c) for c in xrange(256) if c not in bad_list) )
def random_chars(length, bad_chars = None): """ Generate a string of random characters, avoiding bad characters. This can be useful to randomize the payload of our exploits. :type length: int :param length: How many characters to generate. :type bad_chars: str :param bad_chars: Bad characters to test. Defaults to `default_bad_chars`. :rtype: str :return: String of random characters. """ if bad_chars is None: bad_chars = default_bad_chars c = good_chars(bad_chars) if not c: raise ValueError("All characters are bad!") m = len(c) - 1 randint = random.randint return ''.join( ( c[randint(0, m)] for i in xrange(length) ) ) #-----------------------------------------------------------------------------#
def test_decoding_uuenc_single_part(self): """ Decodes a single UUEncoded message """ # Input File encoded_filepath = join(self.var_dir, 'uuencoded.tax.jpg.msg') assert isfile(encoded_filepath) # Compare File decoded_filepath = join(self.var_dir, 'uudecoded.tax.jpg') assert isfile(decoded_filepath) # Initialize Codec ud_py = CodecUU(work_dir=self.test_dir) # Read data and decode it with open(encoded_filepath, 'r') as fd_in: article = ud_py.decode(fd_in) # our content should be valid assert isinstance(article, NNTPBinaryContent) # Verify the actual article itself reports itself # as being okay (structurally) assert article.is_valid() is True with open(decoded_filepath, 'r') as fd_in: decoded = fd_in.read() # Compare our processed content with the expected results assert decoded == article.getvalue()
def test_NNTPArticle_UU_encode_02(self): """ Test the encoding of fresh new data """ # Our private Key Location tmp_file = join( self.tmp_dir, 'test_NNTPArticle_UU_encode_02.tmp', ) # Create a larger file assert(self.touch(tmp_file, size='1M', random=True)) # Create an NNTPContent Object pointing to our new data content = NNTPBinaryContent(tmp_file) # Create a Yenc Codec instance encoder = CodecUU(work_dir=self.test_dir) # This should produce our yEnc object now encoded = encoder.encode(content) assert isinstance(encoded, NNTPAsciiContent) is True # Now we want to decode the content we just encoded decoded = encoder.decode(encoded) # We should get a Binary Object in return assert isinstance(decoded, NNTPBinaryContent) is True # Our original content should be the same as our decoded # content assert(decoded.crc32() == content.crc32()) assert(decoded.md5() == content.md5())
def test_partial_download(self): """ Test the handling of a download that is explicitly ordered to abort after only some content is retrieved. A way of 'peeking' if you will. """ # Input File encoded_filepath = join(self.var_dir, 'uuencoded.tax.jpg.msg') assert isfile(encoded_filepath) # Compare File decoded_filepath = join(self.var_dir, 'uudecoded.tax.jpg') assert isfile(decoded_filepath) # Initialize Codec (restrict content to be no larger then 10 bytes) ud_py = CodecUU(work_dir=self.test_dir, max_bytes=10) # Read data and decode it with open(encoded_filepath, 'r') as fd_in: article = ud_py.decode(fd_in) # our content should be valid assert isinstance(article, NNTPBinaryContent) # Our article should not be considered valid on an # early exit assert article.is_valid() is False with open(decoded_filepath, 'r') as fd_in: decoded = fd_in.read() # Compare our processed content with the expected results length = len(article.getvalue()) # Even though we have't decoded all of our content, we're # still the same as the expected result up to what has been # processed. assert decoded[0:length] == article.getvalue()
def test_from_content(self): """ Tests the from_content() function of the Mime Object """ # Prepare our Mime Object m = Mime() response = m.from_content(None) assert(isinstance(response, MimeResponse)) response = m.from_content("") assert(isinstance(response, MimeResponse)) response = m.from_content(u"") assert(isinstance(response, MimeResponse)) # First we take a binary file binary_filepath = join(self.var_dir, 'joystick.jpg') assert isfile(binary_filepath) with open(binary_filepath, 'rb') as f: buf = f.read() response = m.from_content(buf) assert(isinstance(response, MimeResponse)) assert(response.type() == 'image/jpeg') assert(response.encoding() == 'binary') # A reverse lookup is done here assert(response.extension() == '.jpeg')
def test_from_file(self): """ Tests the from_file() function of the Mime Object """ # Prepare our Mime Object m = Mime() response = m.from_file(None) assert(response is None) response = m.from_file("") assert(response is None) response = m.from_file(u"") assert(response is None) # First we take a binary file binary_filepath = join(self.var_dir, 'joystick.jpg') response = m.from_file(binary_filepath) assert(isinstance(response, MimeResponse)) assert(response.type() == 'image/jpeg') assert(response.encoding() == 'binary') assert(response.extension() == '.jpg') response = m.from_file(binary_filepath, fullscan=True) assert(isinstance(response, MimeResponse)) assert(response.type() == 'image/jpeg') assert(response.encoding() == 'binary') assert(response.extension() == '.jpg')
def test_from_bestguess(self): """ test from_bestguess() bestguess() does the best of both worlds: from_file() and from_filename(). It never returns None unless you give it bad data. """ # Initialize our mime object m = Mime() # Empty content just gives us an empty response assert(m.from_bestguess(None) is None) assert(m.from_bestguess("") is None) assert(m.from_bestguess(u"") is None) # First we take a binary file image = join(self.var_dir, 'joystick.jpg') c = NNTPContent(image, work_dir=self.tmp_dir) copy = c.copy() # since we have a filename, we can pick it up from that assert(m.from_bestguess(copy.filename).type() == 'image/jpeg') # We can also get it from_file() because even though our temporary # file does not have an extension at all, we can still assert(m.from_bestguess(copy.path()).type() == 'image/jpeg')
def test_rar_multi_files(self): """ Test that we can rar content into multiple files """ # Generate temporary folder to work with work_dir = join(self.tmp_dir, 'CodecRar_Test.rar.multi', 'work') # Initialize Codec cr = CodecRar(work_dir=work_dir, volume_size='100K') # Now we want to prepare a folder filled with temporary content source_dir = join(self.tmp_dir, 'CodecRar_Test.rar', 'my_source') assert isdir(source_dir) is False # create some dummy file entries for i in range(0, 10): # Create some temporary files to work with in our source # directory tmp_file = join(source_dir, 'DSC_IMG%.3d.jpeg' % i) self.touch(tmp_file, size='100K', random=True) # Add our file to the encoding process cr.add(tmp_file) # Now we want to compress this content content = cr.encode() # We should have successfully encoded our content assert isinstance(content, sortedset) assert len(content) == 11 for c in content: assert isinstance(c, NNTPBinaryContent) # Encoded content is attached by default assert c.is_attached() is True
def test_7z_single_file(self): """ Test that we can compress content """ # Generate temporary folder to work with work_dir = join(self.tmp_dir, 'Codec7Zip_Test.rar', 'work') # Initialize Codec cr = Codec7Zip(work_dir=work_dir) # Now we want to prepare a folder filled with temporary content source_dir = join( self.tmp_dir, 'Codec7Zip_Test.7z.single', 'my_source' ) assert isdir(source_dir) is False # create some dummy file entries for i in range(0, 10): # Create some temporary files to work with in our source # directory tmp_file = join(source_dir, 'DSC_IMG%.3d.jpeg' % i) self.touch(tmp_file, size='120K', random=True) # Add our file to the encoding process cr.add(tmp_file) # Now we want to compress this content content = cr.encode() # We should have successfully encoded our content assert isinstance(content, sortedset) assert len(content) == 1 assert isinstance(content[0], NNTPBinaryContent) # Encoded content is attached by default assert content[0].is_attached() is True
def test_7z_multi_files(self): """ Test that we can rar content into multiple files """ # Generate temporary folder to work with work_dir = join(self.tmp_dir, 'Codec7Zip_Test.rar.multi', 'work') # Initialize Codec cr = Codec7Zip(work_dir=work_dir, volume_size='100K') # Now we want to prepare a folder filled with temporary content source_dir = join(self.tmp_dir, 'Codec7Zip_Test.rar', 'my_source') assert isdir(source_dir) is False # create some dummy file entries for i in range(0, 10): # Create some temporary files to work with in our source # directory tmp_file = join(source_dir, 'DSC_IMG%.3d.jpeg' % i) self.touch(tmp_file, size='100K', random=True) # Add our file to the encoding process cr.add(tmp_file) # Now we want to compress this content content = cr.encode() # We should have successfully encoded our content assert isinstance(content, sortedset) assert len(content) == 11 for c in content: assert isinstance(c, NNTPBinaryContent) # Encoded content is attached by default assert c.is_attached() is True
def test_nzbfile_generation(self): """ Tests the creation of NZB Files """ nzbfile = join(self.tmp_dir, 'test.nzbfile.nzb') payload = join(self.var_dir, 'uudecoded.tax.jpg') assert isfile(nzbfile) is False # Create our NZB Object nzbobj = NNTPnzb() # create a fake article segpost = NNTPSegmentedPost(basename(payload)) content = NNTPBinaryContent(payload) article = NNTPArticle('testfile', groups='newsreap.is.awesome') # Note that our nzb object segment tracker is not marked as being # complete. This flag gets toggled when we add segments manually to # our nzb object or if we parse an NZB-File assert(nzbobj._segments_loaded is None) # Add our Content to the article article.add(content) # now add our article to the NZBFile segpost.add(article) # now add our Segmented Post to the NZBFile nzbobj.add(segpost) # Since .add() was called, this will be set to True now assert(nzbobj._segments_loaded is True) # Store our file assert nzbobj.save(nzbfile) is True assert isfile(nzbfile) is True
def test_bad_files(self): """ Test different variations of bad file inputs """ # No parameters should create a file nzbfile = join(self.var_dir, 'missing.file.nzb') assert not isfile(nzbfile) nzbobj = NNTPnzb(nzbfile=nzbfile) assert nzbobj.is_valid() is False assert nzbobj.gid() is None # Test Length assert len(nzbobj) == 0
def test_decoding_01(self): """ Open a stream to a file we can read for decoding; This test specifically focuses on var/group.list """ # Initialize Codec ch_py = CodecGroups() encoded_filepath = join(self.var_dir, 'group.list') assert isfile(encoded_filepath) # Read data and decode it with open(encoded_filepath, 'r') as fd_in: # This module always returns 'True' expecting more # but content can be retrieved at any time assert ch_py.decode(fd_in) is True # This is where the value is stored assert isinstance(ch_py.decoded, NNTPMetaContent) assert isinstance(ch_py.decoded.content, list) # The number of lines in group.list parsed should all be valid assert len(ch_py.decoded.content) == ch_py._total_lines # Test our reset ch_py.reset() assert isinstance(ch_py.decoded, NNTPMetaContent) assert isinstance(ch_py.decoded.content, list) assert len(ch_py.decoded.content) == 0 assert len(ch_py.decoded.content) == ch_py._total_lines
def test_decoding_03(self): """ Open a stream to a file we can read for decoding; This test specifically focuses on var/headers.test03.msg """ # Initialize Codec ch_py = CodecHeader() encoded_filepath = join(self.var_dir, 'headers.test03.msg') assert isfile(encoded_filepath) # Read data and decode it with open(encoded_filepath, 'r') as fd_in: # Decodes content and stops when complete assert isinstance(ch_py.decode(fd_in), NNTPHeader) # Read in the white space since it is actually the first line # after the end of headers delimiter assert fd_in.readline().strip() == 'First Line without spaces' #print '\n'.join(["assert ch_py['%s'] == '%s'" % (k, v) \ # for k, v in ch_py.items()]) assert len(ch_py) == 10 # with the 10 lines processed, our line_count # should be set to 10 assert ch_py._lines == 10 # assert False assert ch_py.is_valid() == True
def test_is_valid(self): """ Tests different key combinations that would cause the different return types from is_valid() """ # Initialize Codec ch_py = CodecHeader() encoded_filepath = join(self.var_dir, 'headers.test03.msg') assert isfile(encoded_filepath) # We haven't done any processing yet assert ch_py.is_valid() is None # Populate ourselves with some keys with open(encoded_filepath, 'r') as fd_in: # Decodes content and stops when complete assert isinstance(ch_py.decode(fd_in), NNTPHeader) # keys should be good! assert ch_py.is_valid() is True for k in ( 'DMCA', 'Removed', 'Cancelled', 'Blocked' ): # Intentially create a bad key: ch_py['X-%s' % k] = 'True' # We should fail now assert ch_py.is_valid() is False # it will become valid again once we clear the key del ch_py['X-%s' % k] assert ch_py.is_valid() is True
def test_invalid_split_cases(self): """ Test errors that are generated out of the split function """ work_dir = join(self.tmp_dir, 'NNTPContent_Test.chunk') # Now we want to load it into a NNTPContent object content = NNTPContent(work_dir=work_dir) # Nothing to split gives an error assert(content.split() is None) tmp_file = join(self.tmp_dir, 'NNTPContent_Test.chunk', '5K.rar') assert(isfile(tmp_file) is False) assert(self.touch(tmp_file, size='1MB', random=True) is True) assert(isfile(tmp_file) is True) # Now we want to load it into a NNTPContent object content = NNTPContent(filepath=tmp_file, work_dir=self.tmp_dir) # No size to split on gives an error assert(content.split(size=0) is None) assert(content.split(size=-1) is None) assert(content.split(size=None) is None) assert(content.split(size='bad_string') is None) # Invalid Memory Limit assert(content.split(mem_buf=0) is None) assert(content.split(mem_buf=-1) is None) assert(content.split(mem_buf=None) is None) assert(content.split(mem_buf='bad_string') is None)