Python os.path 模块,join() 实例源码

我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用os.path.join()

项目:pynini    作者:daffidilly    | 项目源码 | 文件源码
def load_data(self):
        # work in the parent of the pages directory, because we
        # want the filenames to begin "pages/...".
        chdir(dirname(self.setup.pages_dir))
        rel = relpath(self.setup.pages_dir)
        for root, dirs, files in walk(rel):
            for filename in files:
                start, ext = splitext(filename)
                if ext in self.setup.data_extensions:
                    #yield root, dirs, filename
                    loader = self.setup.data_loaders.get(ext)
                    path = join(root,filename)
                    if not loader:
                        raise SetupError("Identified data file '%s' by type '%s' but no loader found" % (filename, ext))

                    data_key = join(root, start)
                    loaded_dict = loader.loadf(path)
                    self.data[data_key] = loaded_dict

                    #self.setup.log.debug("data key [%s] ->" % (data_key, ), root, filename, ); pprint.pprint(loaded_dict, sys.stdout)

        #pprint.pprint(self.data, sys.stdout)
        #print("XXXXX data:", self.data)
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def test_uu_encoding(self):
        """
        Test the encoding of data; this is nessisary prior to a post
        """

        # First we take a binary file
        binary_filepath = join(self.var_dir, 'joystick.jpg')
        assert isfile(binary_filepath)

        # Initialize Codec
        encoder = CodecUU(work_dir=self.test_dir)

        content = encoder.encode(binary_filepath)

        # We should have gotten an ASCII Content Object
        assert isinstance(content, NNTPAsciiContent) is True

        # We should actually have content associated with out data
        assert len(content) > 0
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def test_file_load(self):
        """
        Load the simple filters by file
        """
        entry = copy(self.template_entry)
        fb = NNTPFilterBase(paths=join(self.var_dir, 'simple.nrf'))

        # Our hash will start at 0 (Zero)
        assert len(fb._regex_hash) == 0

        # But now we meet our score
        entry['subject'] = 'A great video called "blah.avi"'
        assert fb.blacklist(**entry) == False

        entry['subject'] = 'A malicious file because it is "blah.avi.exe"'
        assert fb.blacklist(**entry) == True

        # Now load the directory; it should just find the same nrf file.
        fbd = NNTPFilterBase(paths=self.var_dir)
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def test_hexdump(self):
        """converts binary content to hexidecimal in a standard
        easy to read format
        """
        # Compare File
        hexdump_file = join(self.var_dir, 'hexdump.txt')
        assert isfile(hexdump_file)

        all_characters = ''.join(map(chr, range(0, 256)))
        with open(hexdump_file, 'r') as fd_in:
            ref_data = fd_in.read()

        # when reading in content, there is a new line appeneded
        # after the last line (even if one isn't otherwise present)
        # rstrip() to just simplify the test by stripping off
        # all trailing whitespace
        assert hexdump(all_characters) == ref_data.rstrip()
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def database_reset(ctx):
    """
    Reset's the database based on the current configuration
    """
    logger.info('Resetting database ...')
    ctx['NNTPSettings'].open(reset=True)
    __db_prep(ctx)

    db_path = join(ctx['NNTPSettings'].base_dir, 'cache', 'search')
    logger.debug('Scanning %s for databases...' % db_path)
    with pushd(db_path, create_if_missing=True):
        for entry in listdir(db_path):
            db_file = join(db_path, entry)
            if not isfile(db_file):
                continue

            try:
                unlink(db_file)
                logger.info('Removed %s ...' % entry)
            except:
                logger.warning('Failed to remove %s ...' % entry)
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def hexdump(src, length=16, sep='.'):
    """
    Displays a hex output of the content it is passed.

    This was based on https://gist.github.com/7h3rAm/5603718 with some
    minor modifications
    """
    allowed = digits + ascii_letters + punctuation + ' '

    print_map = ''.join(((x if x in allowed else '.')
                        for x in map(chr, range(256))))
    lines = []

    for c in xrange(0, len(src), length):
        chars = src[c:c + length]
        hex = ' '.join(["%02x" % ord(x) for x in chars])
        if len(hex) > 24:
            hex = "%s %s" % (hex[:24], hex[24:])
        printable = ''.join(["%s" % (
            (ord(x) <= 127 and print_map[ord(x)]) or sep) for x in chars])
        lines.append("%08x:  %-*s  |%s|" % (c, length * 3, hex, printable))
    return '\n'.join(lines)
项目:pynini    作者:daffidilly    | 项目源码 | 文件源码
def get_all_pages(self):
        # work in the parent of the pages directory, because we
        # want the filenames to begin "pages/...".
        chdir(dirname(self.setup.pages_dir))
        rel = relpath(self.setup.pages_dir)

        for root, dirs, files in walk(rel):  # self.config.pages_dir):

            # examples:
            #
            #  root='pages'              root='pages/categories'
            #  dirs=['categories']       dirs=[]
            #  files=['index.html']      files=['list.html']

            # self.setup.log.debug("\nTEMPLATE ROOT: %s" % root)
            # self.setup.log.debug("TEMPLATE DIRS: %s" % dirs)
            # self.setup.log.debug("TEMPLATE FILENAMES: %s" % files)
            # #dir_context = global_context.new_child(data_tree[root])

            for filename in files:
                start, ext = splitext(filename)
                if ext in self.setup.template_extensions:
                    # if filename.endswith(".html"):  # TODO: should this filter be required at all?
                    yield Page(self.setup, filename, join(root, filename))
项目:Telebackup    作者:LonamiWebs    | 项目源码 | 文件源码
def enumerate_backups_entities():
        """Enumerates the entities of all the available backups"""
        if isdir(Backuper.backups_dir):

            # Look for subdirectories
            for directory in listdir(Backuper.backups_dir):
                entity_file = path.join(Backuper.backups_dir, directory, 'entity.tlo')

                # Ensure the entity.pickle file exists
                if isfile(entity_file):

                    # Load and yield it
                    with open(entity_file, 'rb') as file:
                        with BinaryReader(stream=file) as reader:
                            try:
                                yield reader.tgread_object()
                            except TypeNotFoundError:
                                # Old user, scheme got updated, don't care.
                                pass

    #endregion

    #region Backup exists and deletion
项目:picoCTF    作者:picoCTF    | 项目源码 | 文件源码
def get_processor(aid):
    """
    Returns the processor module for a given achievement.

    Args:
        aid: the achievement id
    Returns:
        The processor module
    """

    try:
        path = get_achievement(aid=aid, show_disabled=True)["processor"]
        base_path = api.config.get_settings()["achievements"]["processor_base_path"]
        return imp.load_source(path[:-3], join(base_path, path))
    except FileNotFoundError:
        raise InternalException("Achievement processor is offline.")
项目:picoCTF    作者:picoCTF    | 项目源码 | 文件源码
def get_problem_root(problem_name, absolute=False):
    """
    Installation location for a given problem.

    Args:
        problem_name: the problem name.
        absolute: should return an absolute path.

    Returns:
        The tentative installation location.
    """

    problem_root = join(PROBLEM_ROOT, sanitize_name(problem_name))

    assert problem_root.startswith(sep)
    if absolute:
        return problem_root

    return problem_root[len(sep):]
项目:picoCTF    作者:picoCTF    | 项目源码 | 文件源码
def get_problem(problem_path):
    """
    Retrieve a problem spec from a given problem directory.

    Args:
        problem_path: path to the root of the problem directory.

    Returns:
        A problem object.
    """

    json_path = join(problem_path, "problem.json")
    problem = json.loads(open(json_path, "r").read())

    try:
        problem_schema(problem)
    except MultipleInvalid as e:
        logger.critical("Error validating problem object at '%s'!", json_path)
        logger.critical(e)
        raise FatalException

    return problem
项目:picoCTF    作者:picoCTF    | 项目源码 | 文件源码
def get_bundle_root(bundle_name, absolute=False):
    """
    Installation location for a given bundle.

    Args:
        bundle_name: the bundle name.
        absolute: should return an absolute path.

    Returns:
        The tentative installation location.
    """

    bundle_root = join(BUNDLE_ROOT, sanitize_name(bundle_name))

    assert bundle_root.startswith(sep)
    if absolute:
        return bundle_root

    return bundle_root[len(sep):]
项目:picoCTF    作者:picoCTF    | 项目源码 | 文件源码
def get_all_problem_instances(problem_path):
    """ Returns a list of instances for a given problem """

    instances = []
    instances_dir = join(DEPLOYED_ROOT, problem_path)
    if os.path.isdir(instances_dir):
        for name in os.listdir(instances_dir):
            if name.endswith(".json"):
                try:
                    instance = json.loads(open(join(instances_dir, name)).read())
                except Exception as e:
                    continue

                instances.append(instance)

    return instances
项目:picoCTF    作者:picoCTF    | 项目源码 | 文件源码
def clean(args, config):
    """ Main entrypoint for clean """

    lock_file = join(HACKSPORTS_ROOT, "deploy.lock")

    # remove staging directories
    if os.path.isdir(STAGING_ROOT):
        logger.info("Removing the staging directories")
        shutil.rmtree(STAGING_ROOT)

    # remove lock file
    if os.path.isfile(lock_file):
        logger.info("Removing the stale lock file")
        os.remove(lock_file)

    #TODO: potentially perform more cleaning
项目:picoCTF    作者:picoCTF    | 项目源码 | 文件源码
def files_from_directory(directory, recurse=True, permissions=0o664):
    """
    Returns a list of File objects for every file in a directory. Can recurse optionally.

    Args:
        directory: The directory to add files from
        recurse: Whether or not to recursively add files. Defaults to true
        permissions: The default permissions for the files. Defaults to 0o664.
    """

    result = []

    for root, dirnames, filenames in os.walk(directory):
        for filename in filenames:
            result.append(File(join(root, filename), permissions))
        if not recurse:
            break

    return result
项目:distributional_perspective_on_RL    作者:Kiwoo    | 项目源码 | 文件源码
def dump_tabular():
    """
    Write all of the diagnostics from the current iteration
    """
    vals = []
    print("-"*37)
    for key in G.log_headers:
        val = G.log_current_row.get(key, "")
        if hasattr(val, "__float__"): valstr = "%8.3g"%val
        else: valstr = val
        print("| %15s | %15s |"%(key, valstr))
        vals.append(val)
    print("-"*37)
    if G.output_file is not None:
        if G.first_row:
            G.output_file.write("\t".join(G.log_headers))
            G.output_file.write("\n")
        G.output_file.write("\t".join(map(str,vals)))
        G.output_file.write("\n")
        G.output_file.flush()
    G.log_current_row.clear()
    G.first_row=False
项目:rain    作者:scizzorz    | 项目源码 | 文件源码
def find_rain(src, paths=[]):
  if src[0] == '/':
    paths = ['']
  elif src[0] != '.':
    paths = get_paths() + paths

  for path in paths:
    if isfile(join(path, src) + '.rn'):
      return join(path, src) + '.rn'
    elif isfile(join(path, src)) and src.endswith('.rn'):
      return join(path, src)
    elif isdir(join(path, src)) and isfile(join(path, src, '_pkg.rn')):
      return join(path, src, '_pkg.rn')


# find any file from a string
项目:rain    作者:scizzorz    | 项目源码 | 文件源码
def find_name(src):
  path = os.path.abspath(src)
  path, name = os.path.split(path)
  fname, ext = os.path.splitext(name)

  if fname == '_pkg':
    _, fname = os.path.split(path)

  mname = normalize_name(fname)

  proot = []
  while path and os.path.isfile(join(path, '_pkg.rn')):
    path, name = os.path.split(path)
    proot.insert(0, normalize_name(name))

  if not src.endswith('_pkg.rn'):
    proot.append(mname)

  qname = '.'.join(proot)

  return (qname, mname)
项目:rain    作者:scizzorz    | 项目源码 | 文件源码
def require(self, *tokens):
    if self.expect(*tokens):
      token = self.token
      self.next()
      return token

    if len(tokens) > 1:
      choices = ', '.join(str(x) for x in tokens)
      msg = 'Unexpected {!s}; expected one of {}'.format(self.token, choices)
    else:
      msg = 'Unexpected {!s}; expected {!s}'.format(self.token, tokens[0])

    Q.abort(msg, pos=self.token.pos(file=self.file))


# program :: (stmt NEWLINE)+ EOF
项目:rain    作者:scizzorz    | 项目源码 | 文件源码
def compile_so(libs):
  # I don't know how else to find these .so files other than just asking clang
  # to make a .so file out of all of them

  clang = os.getenv('CLANG', 'clang')

  tempdir = tempfile.gettempdir()
  libname = '.'.join(sorted(libs))
  target = join(tempdir, 'lib' + libname + '.so')

  if not os.path.exists(target):
    libs = ['-l' + lib for lib in libs]
    flags = ['-shared']

    cmd = [clang, '-o', target] + flags + libs
    subprocess.check_call(cmd)

  return target
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def traverse_imports(names):
    """
    Walks over all the names imported in a dotted_as_names node.
    """
    pending = [names]
    while pending:
        node = pending.pop()
        if node.type == token.NAME:
            yield node.value
        elif node.type == syms.dotted_name:
            yield "".join([ch.value for ch in node.children])
        elif node.type == syms.dotted_as_name:
            pending.append(node.children[0])
        elif node.type == syms.dotted_as_names:
            pending.extend(node.children[::-2])
        else:
            raise AssertionError("unkown node type")
项目:twentybn-dl    作者:TwentyBN    | 项目源码 | 文件源码
def __init__(self, name, version, chunks, md5sums, bigtgz_md5sum, count):
        self.name = name
        self.version = version
        self.chunks = chunks
        self.md5sums = md5sums
        self.bigtgz_md5sum = bigtgz_md5sum
        self.count = count
        self.tmp_dir = op.join(DEFAULT_STORAGE, 'tmp')
        self.final_dir = op.join(
            DEFAULT_STORAGE,
            "20bn-{}-{}".format(self.name, self.version)
        )
        self.big_tgz = op.join(
            self.tmp_dir,
            "20bn-{}-{}.tgz".format(self.name, self.version)
        )
        self.tar_dir = op.join(
            self.tmp_dir,
            "20bn-{}-{}".format(self.name, self.version)
        )
        self.ensure_directories_exist()
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def modified_results(tool):
    workdir = dirname(dirname(abspath(__file__)))
    exeresult = run_executable('git', ['-C', workdir, 'ls-files', '-z', '-m',
                                       'tests/examples/*/result_*'])
    files = exeresult.stdout.split(b'\x00')
    ignoreexts = bytestr('.cfg .cfg_diff .conf .pro .pro_diff .txt').split()
    result_filenames = []
    for f in files:
        if not f:
            continue
        filename = join(bytestr(workdir), f)
        _, ext = os.path.splitext(filename)
        if not ext or ext in ignoreexts:
            continue
        if not os.path.exists(filename):
            continue
        result_filenames.append(filename)
    diff_for_files(tool, result_filenames)
项目:MercrediFiction    作者:Meewan    | 项目源码 | 文件源码
def clean_epub_directory():
    epubs = listdir(config.EPUB_DIRECTORY)
    if len(epubs) <= config.MAX_EPUB:
        return

    epubs.sort()

    number_to_delete = len(epubs) - config.MAX_EPUB + 2
    deleted = 0
    for t in epubs:
        f = path.join(config.EPUB_DIRECTORY, t)
        if not path.isfile(f):
            continue
        if deleted >= number_to_delete:
            break
        try:
            remove(f)
            deleted += 1
        except OSError:
            pass
项目:vae-npvc    作者:JeremyCCHsu    | 项目源码 | 文件源码
def extract_and_save_bin_to(dir_to_bin, dir_to_source):
    sets = [s for s in os.listdir(dir_to_source) if s in SETS]
    for d in sets:
        path = join(dir_to_source, d)
        speakers = [s for s in os.listdir(path) if s in SPEAKERS]
        for s in speakers:
            path = join(dir_to_source, d, s)
            output_dir = join(dir_to_bin, d, s)
            if not tf.gfile.Exists(output_dir):
                tf.gfile.MakeDirs(output_dir)
            for f in os.listdir(path):
                filename = join(path, f)
                print(filename)
                if not os.path.isdir(filename):
                    features = extract(filename)
                    labels = SPEAKERS.index(s) * np.ones(
                        [features.shape[0], 1],
                        np.float32,
                    )
                    b = os.path.splitext(f)[0]
                    features = np.concatenate([features, labels], 1)
                    with open(join(output_dir, '{}.bin'.format(b)), 'wb') as fp:
                        fp.write(features.tostring())
项目:HandDetection    作者:YunqiuXu    | 项目源码 | 文件源码
def load_all_url_files(_dir, file_name_prefix):
    url_list = []

    for file_name in os.listdir(_dir):
        if fnmatch.fnmatch(file_name, file_name_prefix +'*.txt'):
            file_name = osp.join(_dir, file_name)
            fp_urls = open(file_name, 'r')        #Open the text file called database.txt
            print 'load URLs from file: ' + file_name

            i = 0
            for line in fp_urls:
                line = line.strip()
                if len(line)>0:
                    splits = line.split('\t')
                    url_list.append(splits[0].strip())
                    i=i+1
            print str(i) + ' URLs loaded'
            fp_urls.close()

    return url_list         
########### End of Functions to Load downloaded urls ###########

############## Functions to get date/time strings ############
项目:cxflow-tensorflow    作者:Cognexa    | 项目源码 | 文件源码
def save(self, name_suffix: str) -> str:
        """
        Save current tensorflow graph to a checkpoint named with the given name suffix.

        The checkpoint will be locaced in self.log_dir directory.
        :param name_suffix: saved checkpoint name suffix
        :return: path to the saved checkpoint
        """
        graph_path = path.join(self._log_dir, 'model_{}.graph'.format(name_suffix))
        checkpoint_path = path.join(self._log_dir, 'model_{}.ckpt'.format(name_suffix))
        frozen_graph_path = path.join(self._log_dir, 'model_{}.pb'.format(name_suffix))

        tf.train.write_graph(self._session.graph_def, '', graph_path, as_text=False)

        self._saver.save(self._session, checkpoint_path)

        if self._freeze_graph:
            with tf.Graph().as_default():
                freeze_graph(input_graph=graph_path,
                             input_checkpoint=checkpoint_path,
                             output_node_names=self.output_names,
                             output_graph=frozen_graph_path)

        return checkpoint_path
项目:encore.ai    作者:dyelax    | 项目源码 | 文件源码
def train(self):
        """
        Runs a training loop on the model.
        """
        while True:
            inputs, targets = self.data_reader.get_train_batch(c.BATCH_SIZE, c.SEQ_LEN)
            print 'Training model...'

            feed_dict = {self.model.inputs: inputs, self.model.targets: targets}
            global_step, loss, _ = self.sess.run([self.model.global_step,
                                                  self.model.loss,
                                                  self.model.train_op],
                                                 feed_dict=feed_dict)

            print 'Step: %d | loss: %f' % (global_step, loss)
            if global_step % c.MODEL_SAVE_FREQ == 0:
                print 'Saving model...'
                self.saver.save(self.sess, join(c.MODEL_SAVE_DIR, self.artist_name + '.ckpt'),
                                global_step=global_step)
项目:figbed    作者:wwj718    | 项目源码 | 文件源码
def upload_file(upload_file_name, temp):
    # upload_file_name?????
    # ??? saveas???
    #  ?????????,??git???saveas
    #key = md5(str(time.time())+''.join(random.sample(string.letters, 12))).hexdigest()
    # key ??????
    print u"??????: ",
    pic_name = raw_input()
    uuid_6 = uuid.uuid4().get_hex()[:8] #?????
    key = pic_name+"_"+uuid_6+".png"
    copyfile(upload_file_name,join(saveas,key))
    mime_type = 'image/png'
    token = q.upload_token(bucket, key)
    ret, info = put_file(token, key, upload_file_name, mime_type=mime_type, check_crc=True)
    print 'upload qiniu result:', info
    assert ret['key'] == key
    assert ret['hash'] == etag(upload_file_name)
    os.rename(upload_file_name, upload_file_name+'.old')
    return domain+'/'+key
项目:python-driver    作者:bblfsh    | 项目源码 | 文件源码
def _restart_data(self, format_: str='json') -> None:
        assert format_ == 'json'

        with open(join(CURDIR, 'data', 'helloworld.py')) as f:
            testcode = f.read()

        self.data = Request({
            'filepath': 'test.py',
            'action': 'ParseAST',
            'content': testcode,
            'language': 'python',
        })

        bufferclass = io.StringIO if format_ == 'json' else io.BytesIO

        # This will mock the python_driver stdin
        self.sendbuffer = bufferclass()
        # This will mock the python_driver stdout
        self.recvbuffer = bufferclass()
项目:shellgen    作者:MarioVilas    | 项目源码 | 文件源码
def meta_autodetect_platform(cls):
    """
    Dark magic to autodetect the platform for built-in shellcodes.

    User-defined shellcodes must define *arch* and *os*.
    """
    abspath = path.abspath
    join = path.join
    split = path.split
    splitext = path.splitext
    sep = path.sep
    module = cls.__module__
    if module != '__main__':
        tokens = cls.__module__.split('.')
        if len(tokens) < 2 or tokens[0] != base_package or \
                              tokens[1] == base_file:
            return
        tokens.insert(-1, 'any')
        tokens = tokens[1:3]
    else:
        module = abspath(sys.modules[module].__file__)
        if not module.startswith(base_dir):
            return
        tokens = module.split(sep)
        tokens = tokens[len(base_dir.split(sep)):-1]
        while len(tokens) < 2:
            tokens.append('any')
    cls.arch, cls.os = tokens
项目:shellgen    作者:MarioVilas    | 项目源码 | 文件源码
def find_bad_chars(bytes, bad_chars = None):
    """
    Test the given bytecode against a list of bad characters.

    :type  bytes: str
    :param bytes: Compiled bytecode to test for bad characters.

    :type  bad_chars: str
    :param bad_chars: Bad characters to test.
        Defaults to `default_bad_chars`.

    :rtype:  str
    :return: Bad characters present in the bytecode.
    """
    if bad_chars is None:
        bad_chars = default_bad_chars
    return ''.join( (c for c in bad_chars if c in bytes) )
项目:shellgen    作者:MarioVilas    | 项目源码 | 文件源码
def good_chars(bad_chars = None):
    """
    Take a bad chars list and generate the opposite good chars list.

    This can be useful for testing how the vulnerable program filters the
    characters we feed it.

    :type  bad_chars: str
    :param bad_chars: Bad characters to test.
        Defaults to `default_bad_chars`.

    :rtype:  str
    :return: Good characters.
    """
    if bad_chars is None:
        bad_chars = default_bad_chars
    bad_list = set( map(ord, bad_chars) )
    return ''.join( (chr(c) for c in xrange(256) if c not in bad_list) )
项目:shellgen    作者:MarioVilas    | 项目源码 | 文件源码
def random_chars(length, bad_chars = None):
    """
    Generate a string of random characters, avoiding bad characters.

    This can be useful to randomize the payload of our exploits.

    :type  length: int
    :param length: How many characters to generate.

    :type  bad_chars: str
    :param bad_chars: Bad characters to test.
        Defaults to `default_bad_chars`.

    :rtype:  str
    :return: String of random characters.
    """
    if bad_chars is None:
        bad_chars = default_bad_chars
    c = good_chars(bad_chars)
    if not c:
        raise ValueError("All characters are bad!")
    m = len(c) - 1
    randint = random.randint
    return ''.join( ( c[randint(0, m)] for i in xrange(length) ) )

#-----------------------------------------------------------------------------#
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def test_decoding_uuenc_single_part(self):
        """
        Decodes a single UUEncoded message
        """
        # Input File
        encoded_filepath = join(self.var_dir, 'uuencoded.tax.jpg.msg')
        assert isfile(encoded_filepath)

        # Compare File
        decoded_filepath = join(self.var_dir, 'uudecoded.tax.jpg')
        assert isfile(decoded_filepath)

        # Initialize Codec
        ud_py = CodecUU(work_dir=self.test_dir)

        # Read data and decode it
        with open(encoded_filepath, 'r') as fd_in:
            article = ud_py.decode(fd_in)

        # our content should be valid
        assert isinstance(article, NNTPBinaryContent)

        # Verify the actual article itself reports itself
        # as being okay (structurally)
        assert article.is_valid() is True

        with open(decoded_filepath, 'r') as fd_in:
            decoded = fd_in.read()

        # Compare our processed content with the expected results
        assert decoded == article.getvalue()
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def test_NNTPArticle_UU_encode_02(self):
        """
        Test the encoding of fresh new data
        """

        # Our private Key Location
        tmp_file = join(
            self.tmp_dir,
            'test_NNTPArticle_UU_encode_02.tmp',
        )

        # Create a larger file
        assert(self.touch(tmp_file, size='1M', random=True))

        # Create an NNTPContent Object pointing to our new data
        content = NNTPBinaryContent(tmp_file)

        # Create a Yenc Codec instance
        encoder = CodecUU(work_dir=self.test_dir)

        # This should produce our yEnc object now
        encoded = encoder.encode(content)
        assert isinstance(encoded, NNTPAsciiContent) is True

        # Now we want to decode the content we just encoded
        decoded = encoder.decode(encoded)

        # We should get a Binary Object in return
        assert isinstance(decoded, NNTPBinaryContent) is True

        # Our original content should be the same as our decoded
        # content
        assert(decoded.crc32() == content.crc32())
        assert(decoded.md5() == content.md5())
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def test_partial_download(self):
        """
        Test the handling of a download that is explicitly ordered to abort
        after only some content is retrieved.  A way of 'peeking' if you will.
        """

        # Input File
        encoded_filepath = join(self.var_dir, 'uuencoded.tax.jpg.msg')
        assert isfile(encoded_filepath)

        # Compare File
        decoded_filepath = join(self.var_dir, 'uudecoded.tax.jpg')
        assert isfile(decoded_filepath)

        # Initialize Codec (restrict content to be no larger then 10 bytes)
        ud_py = CodecUU(work_dir=self.test_dir, max_bytes=10)

        # Read data and decode it
        with open(encoded_filepath, 'r') as fd_in:
            article = ud_py.decode(fd_in)

        # our content should be valid
        assert isinstance(article, NNTPBinaryContent)

        # Our article should not be considered valid on an
        # early exit
        assert article.is_valid() is False

        with open(decoded_filepath, 'r') as fd_in:
            decoded = fd_in.read()

        # Compare our processed content with the expected results
        length = len(article.getvalue())

        # Even though we have't decoded all of our content, we're
        # still the same as the expected result up to what has been
        # processed.
        assert decoded[0:length] == article.getvalue()
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def test_from_content(self):
        """
        Tests the from_content() function of the Mime Object
        """

        # Prepare our Mime Object
        m = Mime()

        response = m.from_content(None)
        assert(isinstance(response, MimeResponse))
        response = m.from_content("")
        assert(isinstance(response, MimeResponse))
        response = m.from_content(u"")
        assert(isinstance(response, MimeResponse))

        # First we take a binary file
        binary_filepath = join(self.var_dir, 'joystick.jpg')
        assert isfile(binary_filepath)
        with open(binary_filepath, 'rb') as f:
            buf = f.read()

        response = m.from_content(buf)
        assert(isinstance(response, MimeResponse))
        assert(response.type() == 'image/jpeg')
        assert(response.encoding() == 'binary')
        # A reverse lookup is done here
        assert(response.extension() == '.jpeg')
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def test_from_file(self):
        """
        Tests the from_file() function of the Mime Object
        """

        # Prepare our Mime Object
        m = Mime()

        response = m.from_file(None)
        assert(response is None)
        response = m.from_file("")
        assert(response is None)
        response = m.from_file(u"")
        assert(response is None)

        # First we take a binary file
        binary_filepath = join(self.var_dir, 'joystick.jpg')

        response = m.from_file(binary_filepath)
        assert(isinstance(response, MimeResponse))
        assert(response.type() == 'image/jpeg')
        assert(response.encoding() == 'binary')
        assert(response.extension() == '.jpg')

        response = m.from_file(binary_filepath, fullscan=True)
        assert(isinstance(response, MimeResponse))
        assert(response.type() == 'image/jpeg')
        assert(response.encoding() == 'binary')
        assert(response.extension() == '.jpg')
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def test_from_bestguess(self):
        """
        test from_bestguess()

        bestguess() does the best of both worlds: from_file() and
        from_filename().  It never returns None unless you give it
        bad data.
        """

        # Initialize our mime object
        m = Mime()

        # Empty content just gives us an empty response
        assert(m.from_bestguess(None) is None)
        assert(m.from_bestguess("") is None)
        assert(m.from_bestguess(u"") is None)

        # First we take a binary file
        image = join(self.var_dir, 'joystick.jpg')
        c = NNTPContent(image, work_dir=self.tmp_dir)
        copy = c.copy()

        # since we have a filename, we can pick it up from that
        assert(m.from_bestguess(copy.filename).type() == 'image/jpeg')
        # We can also get it from_file() because even though our temporary
        # file does not have an extension at all, we can still
        assert(m.from_bestguess(copy.path()).type() == 'image/jpeg')
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def test_rar_multi_files(self):
        """
        Test that we can rar content into multiple files

        """
        # Generate temporary folder to work with
        work_dir = join(self.tmp_dir, 'CodecRar_Test.rar.multi', 'work')

        # Initialize Codec
        cr = CodecRar(work_dir=work_dir, volume_size='100K')

        # Now we want to prepare a folder filled with temporary content
        source_dir = join(self.tmp_dir, 'CodecRar_Test.rar', 'my_source')
        assert isdir(source_dir) is False

        # create some dummy file entries
        for i in range(0, 10):
            # Create some temporary files to work with in our source
            # directory
            tmp_file = join(source_dir, 'DSC_IMG%.3d.jpeg' % i)
            self.touch(tmp_file, size='100K', random=True)
            # Add our file to the encoding process
            cr.add(tmp_file)

        # Now we want to compress this content
        content = cr.encode()

        # We should have successfully encoded our content
        assert isinstance(content, sortedset)
        assert len(content) == 11
        for c in content:
            assert isinstance(c, NNTPBinaryContent)
            # Encoded content is attached by default
            assert c.is_attached() is True
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def test_7z_single_file(self):
        """
        Test that we can compress content

        """
        # Generate temporary folder to work with
        work_dir = join(self.tmp_dir, 'Codec7Zip_Test.rar', 'work')

        # Initialize Codec
        cr = Codec7Zip(work_dir=work_dir)

        # Now we want to prepare a folder filled with temporary content
        source_dir = join(
            self.tmp_dir, 'Codec7Zip_Test.7z.single', 'my_source'
        )
        assert isdir(source_dir) is False

        # create some dummy file entries
        for i in range(0, 10):
            # Create some temporary files to work with in our source
            # directory
            tmp_file = join(source_dir, 'DSC_IMG%.3d.jpeg' % i)
            self.touch(tmp_file, size='120K', random=True)
            # Add our file to the encoding process
            cr.add(tmp_file)

        # Now we want to compress this content
        content = cr.encode()

        # We should have successfully encoded our content
        assert isinstance(content, sortedset)
        assert len(content) == 1
        assert isinstance(content[0], NNTPBinaryContent)

        # Encoded content is attached by default
        assert content[0].is_attached() is True
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def test_7z_multi_files(self):
        """
        Test that we can rar content into multiple files

        """
        # Generate temporary folder to work with
        work_dir = join(self.tmp_dir, 'Codec7Zip_Test.rar.multi', 'work')

        # Initialize Codec
        cr = Codec7Zip(work_dir=work_dir, volume_size='100K')

        # Now we want to prepare a folder filled with temporary content
        source_dir = join(self.tmp_dir, 'Codec7Zip_Test.rar', 'my_source')
        assert isdir(source_dir) is False

        # create some dummy file entries
        for i in range(0, 10):
            # Create some temporary files to work with in our source
            # directory
            tmp_file = join(source_dir, 'DSC_IMG%.3d.jpeg' % i)
            self.touch(tmp_file, size='100K', random=True)
            # Add our file to the encoding process
            cr.add(tmp_file)

        # Now we want to compress this content
        content = cr.encode()

        # We should have successfully encoded our content
        assert isinstance(content, sortedset)
        assert len(content) == 11
        for c in content:
            assert isinstance(c, NNTPBinaryContent)
            # Encoded content is attached by default
            assert c.is_attached() is True
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def test_nzbfile_generation(self):
        """
        Tests the creation of NZB Files
        """
        nzbfile = join(self.tmp_dir, 'test.nzbfile.nzb')
        payload = join(self.var_dir, 'uudecoded.tax.jpg')
        assert isfile(nzbfile) is False
        # Create our NZB Object
        nzbobj = NNTPnzb()

        # create a fake article
        segpost = NNTPSegmentedPost(basename(payload))
        content = NNTPBinaryContent(payload)

        article = NNTPArticle('testfile', groups='newsreap.is.awesome')

        # Note that our nzb object segment tracker is not marked as being
        # complete. This flag gets toggled when we add segments manually to
        # our nzb object or if we parse an NZB-File
        assert(nzbobj._segments_loaded is None)

        # Add our Content to the article
        article.add(content)
        # now add our article to the NZBFile
        segpost.add(article)
        # now add our Segmented Post to the NZBFile
        nzbobj.add(segpost)

        # Since .add() was called, this will be set to True now
        assert(nzbobj._segments_loaded is True)

        # Store our file
        assert nzbobj.save(nzbfile) is True
        assert isfile(nzbfile) is True
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def test_bad_files(self):
        """
        Test different variations of bad file inputs
        """
        # No parameters should create a file
        nzbfile = join(self.var_dir, 'missing.file.nzb')
        assert not isfile(nzbfile)

        nzbobj = NNTPnzb(nzbfile=nzbfile)
        assert nzbobj.is_valid() is False
        assert nzbobj.gid() is None

        # Test Length
        assert len(nzbobj) == 0
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def test_decoding_01(self):
        """
        Open a stream to a file we can read for decoding; This test
        specifically focuses on var/group.list
        """

        # Initialize Codec
        ch_py = CodecGroups()

        encoded_filepath = join(self.var_dir, 'group.list')
        assert isfile(encoded_filepath)

        # Read data and decode it
        with open(encoded_filepath, 'r') as fd_in:
            # This module always returns 'True' expecting more
            # but content can be retrieved at any time
            assert ch_py.decode(fd_in) is True

        # This is where the value is stored
        assert isinstance(ch_py.decoded, NNTPMetaContent)
        assert isinstance(ch_py.decoded.content, list)

        # The number of lines in group.list parsed should all be valid
        assert len(ch_py.decoded.content) == ch_py._total_lines

        # Test our reset
        ch_py.reset()

        assert isinstance(ch_py.decoded, NNTPMetaContent)
        assert isinstance(ch_py.decoded.content, list)
        assert len(ch_py.decoded.content) == 0
        assert len(ch_py.decoded.content) == ch_py._total_lines
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def test_decoding_03(self):
        """
        Open a stream to a file we can read for decoding; This test
        specifically focuses on var/headers.test03.msg
        """

        # Initialize Codec
        ch_py = CodecHeader()

        encoded_filepath = join(self.var_dir, 'headers.test03.msg')
        assert isfile(encoded_filepath)

        # Read data and decode it
        with open(encoded_filepath, 'r') as fd_in:
            # Decodes content and stops when complete
            assert isinstance(ch_py.decode(fd_in), NNTPHeader)

            # Read in the white space since it is actually the first line
            # after the end of headers delimiter
            assert fd_in.readline().strip() == 'First Line without spaces'

        #print '\n'.join(["assert ch_py['%s'] == '%s'" % (k, v) \
        #                 for k, v in ch_py.items()])

        assert len(ch_py) == 10

        # with the 10 lines processed, our line_count
        # should be set to 10
        assert ch_py._lines == 10

        # assert False
        assert ch_py.is_valid() == True
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def test_is_valid(self):
        """
        Tests different key combinations that would cause the different
        return types from is_valid()
        """

        # Initialize Codec
        ch_py = CodecHeader()

        encoded_filepath = join(self.var_dir, 'headers.test03.msg')
        assert isfile(encoded_filepath)

        # We haven't done any processing yet
        assert ch_py.is_valid() is None

        # Populate ourselves with some keys
        with open(encoded_filepath, 'r') as fd_in:
            # Decodes content and stops when complete
            assert isinstance(ch_py.decode(fd_in), NNTPHeader)

        # keys should be good!
        assert ch_py.is_valid() is True

        for k in ( 'DMCA', 'Removed', 'Cancelled', 'Blocked' ):
            # Intentially create a bad key:
            ch_py['X-%s' % k] = 'True'

            # We should fail now
            assert ch_py.is_valid() is False

            # it will become valid again once we clear the key
            del ch_py['X-%s' % k]
            assert ch_py.is_valid() is True
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def test_invalid_split_cases(self):
        """
        Test errors that are generated out of the split function
        """
        work_dir = join(self.tmp_dir, 'NNTPContent_Test.chunk')
        # Now we want to load it into a NNTPContent object
        content = NNTPContent(work_dir=work_dir)

        # Nothing to split gives an error
        assert(content.split() is None)

        tmp_file = join(self.tmp_dir, 'NNTPContent_Test.chunk', '5K.rar')
        assert(isfile(tmp_file) is False)
        assert(self.touch(tmp_file, size='1MB', random=True) is True)
        assert(isfile(tmp_file) is True)

        # Now we want to load it into a NNTPContent object
        content = NNTPContent(filepath=tmp_file, work_dir=self.tmp_dir)

        # No size to split on gives an error
        assert(content.split(size=0) is None)
        assert(content.split(size=-1) is None)
        assert(content.split(size=None) is None)
        assert(content.split(size='bad_string') is None)

        # Invalid Memory Limit
        assert(content.split(mem_buf=0) is None)
        assert(content.split(mem_buf=-1) is None)
        assert(content.split(mem_buf=None) is None)
        assert(content.split(mem_buf='bad_string') is None)