Python six 模块,next() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用six.next()

项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def _fill_result_cache_to_idx(self, idx):
        self._execute_query()
        if self._result_idx is None:
            self._result_idx = -1

        qty = idx - self._result_idx
        if qty < 1:
            return
        else:
            for idx in range(qty):
                self._result_idx += 1
                while True:
                    try:
                        self._result_cache[self._result_idx] = self._construct_result(self._result_cache[self._result_idx])
                        break
                    except IndexError:
                        self._result_cache.append(next(self._result_generator))
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def __iter__(self):
        self._execute_query()

        idx = 0
        while True:
            if len(self._result_cache) <= idx:
                try:
                    self._result_cache.append(next(self._result_generator))
                except StopIteration:
                    break

            instance = self._result_cache[idx]
            if isinstance(instance, dict):
                self._fill_result_cache_to_idx(idx)
            yield self._result_cache[idx]

            idx += 1
项目:girder_worker    作者:girder    | 项目源码 | 文件源码
def read(self, buf_len):
        """
        Implementation note: due to a constraint of the requests library, the
        buf_len that is used the first time this method is called will cause
        all future requests to ``read`` to have the same ``buf_len`` even if
        a different ``buf_len`` is passed in on subsequent requests.
        """
        if self._iter is None:  # lazy load response body iterator
            method = self.input_spec.get('method', 'GET').upper()
            headers = self.input_spec.get('headers', {})
            params = self.input_spec.get('params', {})
            req = requests.request(
                method, self.input_spec['url'], headers=headers, params=params,
                stream=True, allow_redirects=True)
            req.raise_for_status()  # we have the response headers already
            self._iter = req.iter_content(buf_len, decode_unicode=False)

        try:
            return six.next(self._iter)
        except StopIteration:
            return b''
项目:infi.clickhouse_orm    作者:Infinidat    | 项目源码 | 文件源码
def __getitem__(self, s):
        if isinstance(s, six.integer_types):
            # Single index
            assert s >= 0, 'negative indexes are not supported'
            qs = copy(self)
            qs._limits = (s, 1)
            return six.next(iter(qs))
        else:
            # Slice
            assert s.step in (None, 1), 'step is not supported in slices'
            start = s.start or 0
            stop = s.stop or 2**63 - 1
            assert start >= 0 and stop >= 0, 'negative indexes are not supported'
            assert start <= stop, 'start of slice cannot be smaller than its end'
            qs = copy(self)
            qs._limits = (start, stop - start)
            return qs
项目:infi.clickhouse_orm    作者:Infinidat    | 项目源码 | 文件源码
def select(self, query, model_class=None, settings=None):
        '''
        Performs a query and returns a generator of model instances.

        - `query`: the SQL query to execute.
        - `model_class`: the model class matching the query's table,
          or `None` for getting back instances of an ad-hoc model.
        - `settings`: query settings to send as HTTP GET parameters
        '''
        query += ' FORMAT TabSeparatedWithNamesAndTypes'
        query = self._substitute(query, model_class)
        r = self._send(query, settings, True)
        lines = r.iter_lines()
        field_names = parse_tsv(next(lines))
        field_types = parse_tsv(next(lines))
        model_class = model_class or ModelBase.create_ad_hoc_model(zip(field_names, field_types))
        for line in lines:
            # skip blank line left by WITH TOTALS modifier
            if line:
                yield model_class.from_tsv(line, field_names, self.server_timezone, self)
项目:infi.clickhouse_orm    作者:Infinidat    | 项目源码 | 文件源码
def from_tsv(cls, line, field_names=None, timezone_in_use=pytz.utc, database=None):
        '''
        Create a model instance from a tab-separated line. The line may or may not include a newline.
        The `field_names` list must match the fields defined in the model, but does not have to include all of them.
        If omitted, it is assumed to be the names of all fields in the model, in order of definition.

        - `line`: the TSV-formatted data.
        - `field_names`: names of the model fields in the data.
        - `timezone_in_use`: the timezone to use when parsing dates and datetimes.
        - `database`: if given, sets the database that this instance belongs to.
        '''
        from six import next
        field_names = field_names or [name for name, field in cls._fields]
        values = iter(parse_tsv(line))
        kwargs = {}
        for name in field_names:
            field = getattr(cls, name)
            kwargs[name] = field.to_python(next(values), timezone_in_use)

        obj = cls(**kwargs)
        if database is not None:
            obj.set_database(database)

        return obj
项目:cqlmapper    作者:reddit    | 项目源码 | 文件源码
def iter(self, conn):
        """ Return an iterator over all of the objects return by the query.

        :param conn: Cassandra connection wrapper used to execute the query.
        :type: cqlengine.ConnectionInterface subclass
        """
        self._execute_query(conn)

        idx = 0
        while True:
            if len(self._result_cache) <= idx:
                try:
                    self._result_cache.append(next(self._result_generator))
                except StopIteration:
                    break

            instance = self._result_cache[idx]
            if isinstance(instance, dict):
                self._fill_result_cache_to_idx(conn, idx)
            yield self._result_cache[idx]

            idx += 1
项目:azure-cli-shell    作者:Azure    | 项目源码 | 文件源码
def test_second_completion(self):
        self.init3()
        doc = Document(u'crea ')
        gen = self.completer.get_completions(doc, None)
        with self.assertRaises(StopIteration):
            six.next(gen)

        doc = Document(u'create --fun ')
        gen = self.completer.get_completions(doc, None)
        with self.assertRaises(StopIteration):
            six.next(gen)

        doc = Document(u'command d ')
        gen = self.completer.get_completions(doc, None)
        with self.assertRaises(StopIteration):
            six.next(gen)

        doc = Document(u'create --funtimes "life" --hello')
        gen = self.completer.get_completions(doc, None)
        self.assertEqual(six.next(gen), Completion(
            "--helloworld", -7))
项目:pyramid_webpack    作者:stevearc    | 项目源码 | 文件源码
def parse(self, parser):
        # the first token is the token that started the tag.  In our case
        # we only listen to ``'webpack'`` so this will be a name token with
        # `webpack` as value.  We get the line number so that we can give
        # that line number to the nodes we create by hand.
        lineno = six.next(parser.stream).lineno
        ctx_ref = nodes.ContextReference()

        # Parse a single expression that is the 'bundle' or 'config:bundle'
        args = [ctx_ref, parser.parse_expression()]

        # if there is a comma, the user provided an 'extensions' arg
        if parser.stream.skip_if('comma'):
            args.append(parser.parse_expression())
        else:
            args.append(nodes.Const(None))

        # now we parse the body of the cache block up to `endwebpack` and
        # drop the needle (which would always be `endwebpack` in that case)
        body = parser.parse_statements(['name:endwebpack'], drop_needle=True)

        call_args = [nodes.Name('ASSET', 'param')]

        return nodes.CallBlock(self.call_method('_get_graph', args), call_args,
                               [], body).set_lineno(lineno)
项目:magenta    作者:tensorflow    | 项目源码 | 文件源码
def testRandomPartition(self):
    random_partition = pipelines_common.RandomPartition(
        str, ['a', 'b', 'c'], [0.1, 0.4])
    random_nums = [0.55, 0.05, 0.34, 0.99]
    choices = ['c', 'a', 'b', 'c']
    random_partition.rand_func = functools.partial(six.next, iter(random_nums))
    self.assertEqual(random_partition.input_type, str)
    self.assertEqual(random_partition.output_type,
                     {'a': str, 'b': str, 'c': str})
    for i, s in enumerate(['hello', 'qwerty', '1234567890', 'zxcvbnm']):
      results = random_partition.transform(s)
      self.assertTrue(isinstance(results, dict))
      self.assertEqual(set(results.keys()), set(['a', 'b', 'c']))
      self.assertEqual(len(results.values()), 3)
      self.assertEqual(len([l for l in results.values() if l == []]), 2)  # pylint: disable=g-explicit-bool-comparison
      self.assertEqual(results[choices[i]], [s])
项目:ngraph    作者:NervanaSystems    | 项目源码 | 文件源码
def __next__(self):
        """
        Returns a new minibatch of data with each call.

        Yields:
            tuple: The next minibatch which includes both features and labels.
        """

        if self.index >= self.total_iterations:
            raise StopIteration

        i1 = (self.start + self.index * self.batch_size) % self.ndata
        bsz = min(self.batch_size, self.ndata - i1)
        oslice1 = slice(i1, i1 + bsz)
        self.index += 1

        if self.batch_size > bsz:
            batch_bufs = {k: np.concatenate([src[oslice1], src[:self.batch_size - bsz]])
                          for k, src in self.data_arrays.items()}
        else:
            batch_bufs = {k: src[oslice1] for k, src in self.data_arrays.items()}

        batch_bufs['iteration'] = self.index
        return batch_bufs
项目:catalyst    作者:enigmampc    | 项目源码 | 文件源码
def get_market(self, symbol):
        """
        The CCXT market.

        Parameters
        ----------
        symbol:
            The CCXT symbol.

        Returns
        -------
        dict[str, Object]

        """
        s = self.get_symbol(symbol)
        market = next(
            (market for market in self.markets if market['symbol'] == s),
            None,
        )
        return market
项目:django-spectator    作者:philgyford    | 项目源码 | 文件源码
def find_unique(self, model_instance, field, iterator, *args):
        # exclude the current model instance from the queryset used in finding
        # next valid hash
        queryset = self.get_queryset(model_instance.__class__, field)
        if model_instance.pk:
            queryset = queryset.exclude(pk=model_instance.pk)

        # form a kwarg dict used to impliment any unique_together contraints
        kwargs = {}
        for params in model_instance._meta.unique_together:
            if self.attname in params:
                for param in params:
                    kwargs[param] = getattr(model_instance, param, None)

        new = six.next(iterator)
        kwargs[self.attname] = new
        while not new or queryset.filter(**kwargs):
            new = six.next(iterator)
            kwargs[self.attname] = new
        setattr(model_instance, self.attname, new)
        return new
项目:zap-api-python    作者:zaproxy    | 项目源码 | 文件源码
def scan(self, url=None, recurse=None, inscopeonly=None, scanpolicyname=None, method=None, postdata=None, contextid=None, apikey=''):
        """
        Runs the active scanner against the given URL and/or Context. Optionally, the 'recurse' parameter can be used to scan URLs under the given URL, the parameter 'inScopeOnly' can be used to constrain the scan to URLs that are in scope (ignored if a Context is specified), the parameter 'scanPolicyName' allows to specify the scan policy (if none is given it uses the default scan policy), the parameters 'method' and 'postData' allow to select a given request in conjunction with the given URL.
        """
        params = {'apikey': apikey}
        if url is not None:
            params['url'] = url
        if recurse is not None:
            params['recurse'] = recurse
        if inscopeonly is not None:
            params['inScopeOnly'] = inscopeonly
        if scanpolicyname is not None:
            params['scanPolicyName'] = scanpolicyname
        if method is not None:
            params['method'] = method
        if postdata is not None:
            params['postData'] = postdata
        if contextid is not None:
            params['contextId'] = contextid
        return six.next(six.itervalues(self.zap._request(self.zap.base + 'ascan/action/scan/', params)))
项目:zap-api-python    作者:zaproxy    | 项目源码 | 文件源码
def scan_as_user(self, url=None, contextid=None, userid=None, recurse=None, scanpolicyname=None, method=None, postdata=None, apikey=''):
        """
        Active Scans from the perspective of a User, obtained using the given Context ID and User ID. See 'scan' action for more details.
        """
        params = {'apikey': apikey}
        if url is not None:
            params['url'] = url
        if contextid is not None:
            params['contextId'] = contextid
        if userid is not None:
            params['userId'] = userid
        if recurse is not None:
            params['recurse'] = recurse
        if scanpolicyname is not None:
            params['scanPolicyName'] = scanpolicyname
        if method is not None:
            params['method'] = method
        if postdata is not None:
            params['postData'] = postdata
        return six.next(six.itervalues(self.zap._request(self.zap.base + 'ascan/action/scanAsUser/', params)))
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def test_iter(self):
        keys = ['first', 'middle', 'last']
        values = list(range(len(keys)))
        items = list(zip(keys, values))
        om = OrderedMap(items)

        itr = iter(om)
        self.assertEqual(sum([1 for _ in itr]), len(keys))
        self.assertRaises(StopIteration, six.next, itr)

        self.assertEqual(list(iter(om)), keys)
        self.assertEqual(list(six.iteritems(om)), items)
        self.assertEqual(list(six.itervalues(om)), values)
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def first(self):
        try:
            return six.next(iter(self))
        except StopIteration:
            return None
项目:girder_worker    作者:girder    | 项目源码 | 文件源码
def private_folder(girder_client):
    me = girder_client.get('user/me')
    try:
        folder = six.next(
            girder_client.listFolder(
                me['_id'], parentFolderType='user', name='Private'))
    except StopIteration:
        raise Exception("User doesn't have a Private folder.")

    yield folder
项目:girder_worker    作者:girder    | 项目源码 | 文件源码
def read(self, buf_len):
        """
        Implementation note: due to a constraint of the requests library, the
        buf_len that is used the first time this method is called will cause
        all future requests to ``read`` to have the same ``buf_len`` even if
        a different ``buf_len`` is passed in on subsequent requests.
        """
        if self._iter is None:  # lazy load response body iterator
            self._iter = self._client.downloadFileAsIterator(self._file_id, buf_len)

        try:
            return six.next(self._iter)
        except StopIteration:
            return b''
项目:pykbart    作者:chill17    | 项目源码 | 文件源码
def __init__(self, file_handle, delimiter='\t'):
        self.reader = csv.reader(file_handle, delimiter=delimiter, encoding='utf-8')
        self.fields = list(six.next(self.reader))
项目:pykbart    作者:chill17    | 项目源码 | 文件源码
def __next__(self):
        return KbartRecord(six.next(self.reader), fields=self.fields)
项目:cqlmapper    作者:reddit    | 项目源码 | 文件源码
def _fill_result_cache_to_idx(self, conn, idx):
        """
        Fill the result cache to the given index.

        :param conn: Cassandra connection wrapper used to execute the query.
        :type: cqlengine.ConnectionInterface subclass
        :param idx: Index value to fill the result cache to
        :type: int
        """
        self._execute_query(conn)
        if self._result_idx is None:
            self._result_idx = -1

        qty = idx - self._result_idx
        if qty < 1:
            return
        else:
            for idx in range(qty):
                self._result_idx += 1
                while True:
                    try:
                        result = self._construct_result(
                            self._result_cache[self._result_idx]
                        )
                        self._result_cache[self._result_idx] = result
                        break
                    except IndexError:
                        self._result_cache.append(next(self._result_generator))
项目:cqlmapper    作者:reddit    | 项目源码 | 文件源码
def first(self, conn):
        try:
            return six.next(self.iter(conn))
        except StopIteration:
            return None
项目:azure-cli-shell    作者:Azure    | 项目源码 | 文件源码
def test_command_completion(self):
        """ tests general command completion """
        self.init1()

        doc = Document(u'')
        gen = self.completer.get_completions(doc, None)
        self.assertEqual(six.next(gen), Completion("command"))
        self.assertEqual(six.next(gen), Completion("create"))

        doc = Document(u'c')
        gen = self.completer.get_completions(doc, None)
        self.assertEqual(six.next(gen), Completion("command", -1))
        self.assertEqual(six.next(gen), Completion("create", -1))

        doc = Document(u'cr')
        gen = self.completer.get_completions(doc, None)
        self.assertEqual(six.next(gen), Completion("create", -2))

        doc = Document(u'command ')
        gen = self.completer.get_completions(doc, None)
        self.assertEqual(six.next(gen), Completion("can"))

        doc = Document(u'create ')
        gen = self.completer.get_completions(doc, None)
        with self.assertRaises(StopIteration):
            six.next(gen)
项目:azure-cli-shell    作者:Azure    | 项目源码 | 文件源码
def test_param_completion(self):
        """ tests param completion """
        self.init2()
        doc = Document(u'create -')
        gen = self.completer.get_completions(doc, None)
        self.assertEqual(six.next(gen), Completion(
            "-funtime", -1, display_meta="There is no work life balance, it's just your life"))

        doc = Document(u'command can -')
        gen = self.completer.get_completions(doc, None)
        with self.assertRaises(StopIteration):
            six.next(gen)
项目:azure-cli-shell    作者:Azure    | 项目源码 | 文件源码
def test_param_double(self):
        """ tests not generating doubles for parameters """
        self.init3()
        doc = Document(u'create -f --')
        gen = self.completer.get_completions(doc, None)
        self.assertEqual(six.next(gen), Completion(
            "--helloworld", -2))

        doc = Document(u'create -f -')
        gen = self.completer.get_completions(doc, None)
        with self.assertRaises(StopIteration):
            six.next(gen)
项目:azure-cli-shell    作者:Azure    | 项目源码 | 文件源码
def test_substring_completion(self):
        self.init4()
        doc = Document(u'create')
        gen = self.completer.get_completions(doc, None)
        self.assertEqual(six.next(gen), Completion(
            "createmore", -6))
项目:warcio    作者:webrecorder    | 项目源码 | 文件源码
def __next__(self):
        return six.next(self.the_iter)
项目:ngraph    作者:NervanaSystems    | 项目源码 | 文件源码
def next(self):
        return self.__next__()
项目:ngraph    作者:NervanaSystems    | 项目源码 | 文件源码
def __iter__(self):
        """
        Returns a new minibatch of data with each call.
        Yields:
            dictionary: The next minibatch
                samples[key]: numpy array with shape (batch_size, seq_len, feature_dim)
        """

        while self.current_iter < self.total_iterations:
            for batch_idx in range(self.batch_size):
                if self.shuffle:
                    strt_idx = self.start + (self.current_iter * self.stride)
                    seq_start = strt_idx + (batch_idx * self.nbatches * self.seq_len)
                else:
                    strt_idx = self.start + (self.current_iter * self.batch_size * self.stride)
                    seq_start = strt_idx + (batch_idx * self.stride)

                idcs = np.arange(seq_start, seq_start + self.seq_len) % self.ndata
                for key in self.data_arrays.keys():
                    self.samples[key][batch_idx] = self.data_arrays[key][idcs]

            self.current_iter += 1

            if self.reverse_target:
                self.samples[self.tgt_key][:] = self.samples[self.tgt_key][:, ::-1]

            if self.get_prev_target:
                self.samples['prev_tgt'] = np.roll(self.samples[self.tgt_key], shift=1, axis=1)

            if self.include_iteration is True:
                self.samples['iteration'] = self.index
            yield self.samples
项目:cmd2    作者:python-cmd2    | 项目源码 | 文件源码
def parsed(self, raw):
        """ This function is where the actual parsing of each line occurs.

        :param raw: str - the line of text as it was entered
        :return: ParsedString - custom subclass of str with extra attributes
        """
        if isinstance(raw, ParsedString):
            p = raw
        else:
            # preparse is an overridable hook; default makes no changes
            s = self.preparse(raw)
            s = self.input_source_parser.transformString(s.lstrip())
            s = self.commentGrammars.transformString(s)
            for (shortcut, expansion) in self.shortcuts:
                if s.lower().startswith(shortcut):
                    s = s.replace(shortcut, expansion + ' ', 1)
                    break
            try:
                result = self.main_parser.parseString(s)
            except pyparsing.ParseException:
                # If we have a parsing failure, treat it is an empty command and move to next prompt
                result = self.main_parser.parseString('')
            result['raw'] = raw
            result['command'] = result.multilineCommand or result.command
            result = self.postparse(result)
            p = ParsedString(result.args)
            p.parsed = result
            p.parser = self.parsed
        return p
项目:cmd2    作者:python-cmd2    | 项目源码 | 文件源码
def _transform_transcript_expected(self, s):
        """parse the string with slashed regexes into a valid regex"""
        regex = ''
        start = 0

        while True:
            (regex, first_slash_pos, start) = self._escaped_find(regex, s, start, False)
            if first_slash_pos == -1:
                # no more slashes, add the rest of the string and bail
                regex += re.escape(s[start:])
                break
            else:
                # there is a slash, add everything we have found so far
                # add stuff before the first slash as plain text
                regex += re.escape(s[start:first_slash_pos])
                start = first_slash_pos+1
                # and go find the next one
                (regex, second_slash_pos, start) = self._escaped_find(regex, s, start, True)
                if second_slash_pos > 0:
                    # add everything between the slashes (but not the slashes)
                    # as a regular expression
                    regex += s[start:second_slash_pos]
                    # and change where we start looking for slashed on the
                    # turn through the loop
                    start = second_slash_pos + 1
                else:
                    # No closing slash, we have to add the first slash,
                    # and the rest of the text
                    regex += re.escape(s[start-1:])
                    break
        return regex
项目:django-spectator    作者:philgyford    | 项目源码 | 文件源码
def create_slug(self, model_instance, add):
        # get fields to populate from and slug field to set
        if not isinstance(self._populate_from, (list, tuple)):
            self._populate_from = (self._populate_from, )
        slug_field = model_instance._meta.get_field(self.attname)

        if add or self.overwrite:
            # slugify the original field content and set next step to 2
            slug_for_field = lambda lookup_value: self.slugify_func(self.get_slug_fields(model_instance, lookup_value))
            slug = self.separator.join(map(slug_for_field, self._populate_from))
            start = 2
        else:
            # get slug from the current model instance
            slug = getattr(model_instance, self.attname)
            # model_instance is being modified, and overwrite is False,
            # so instead of doing anything, just return the current slug
            return slug

        # strip slug depending on max_length attribute of the slug field
        # and clean-up
        self.slug_len = slug_field.max_length
        if self.slug_len:
            slug = slug[:self.slug_len]
        slug = self._slug_strip(slug)
        original_slug = slug

        if self.allow_duplicates:
            setattr(model_instance, self.attname, slug)
            return slug

        return super(AutoSlugField, self).find_unique(
            model_instance, slug_field, self.slug_generator(original_slug, start))
项目:RealtimePythonChat    作者:quangtqag    | 项目源码 | 文件源码
def _generate_ack_id(self, sid, namespace, callback):
        """Generate a unique identifier for an ACK packet."""
        namespace = namespace or '/'
        if sid not in self.callbacks:
            self.callbacks[sid] = {}
        if namespace not in self.callbacks[sid]:
            self.callbacks[sid][namespace] = {0: itertools.count(1)}
        id = six.next(self.callbacks[sid][namespace][0])
        self.callbacks[sid][namespace][id] = callback
        return id
项目:zap-api-python    作者:zaproxy    | 项目源码 | 文件源码
def monitor(self, id, message, apikey=''):
        """
        This component is optional and therefore the API will only work if it is installed
        """
        return six.next(six.itervalues(self.zap._request(self.zap.base + 'pnh/action/monitor/', {'id': id, 'message': message, 'apikey': apikey})))
项目:zap-api-python    作者:zaproxy    | 项目源码 | 文件源码
def oracle(self, id, apikey=''):
        """
        This component is optional and therefore the API will only work if it is installed
        """
        return six.next(six.itervalues(self.zap._request(self.zap.base + 'pnh/action/oracle/', {'id': id, 'apikey': apikey})))
项目:zap-api-python    作者:zaproxy    | 项目源码 | 文件源码
def start_monitoring(self, url, apikey=''):
        """
        This component is optional and therefore the API will only work if it is installed
        """
        return six.next(six.itervalues(self.zap._request(self.zap.base + 'pnh/action/startMonitoring/', {'url': url, 'apikey': apikey})))
项目:zap-api-python    作者:zaproxy    | 项目源码 | 文件源码
def stop_monitoring(self, id, apikey=''):
        """
        This component is optional and therefore the API will only work if it is installed
        """
        return six.next(six.itervalues(self.zap._request(self.zap.base + 'pnh/action/stopMonitoring/', {'id': id, 'apikey': apikey})))
项目:zap-api-python    作者:zaproxy    | 项目源码 | 文件源码
def list_engines(self):
        """
        Lists the script engines available
        """
        return six.next(six.itervalues(self.zap._request(self.zap.base + 'script/view/listEngines/')))
项目:zap-api-python    作者:zaproxy    | 项目源码 | 文件源码
def enable(self, scriptname, apikey=''):
        """
        Enables the script with the given name
        """
        return six.next(six.itervalues(self.zap._request(self.zap.base + 'script/action/enable/', {'scriptName': scriptname, 'apikey': apikey})))
项目:zap-api-python    作者:zaproxy    | 项目源码 | 文件源码
def disable(self, scriptname, apikey=''):
        """
        Disables the script with the given name
        """
        return six.next(six.itervalues(self.zap._request(self.zap.base + 'script/action/disable/', {'scriptName': scriptname, 'apikey': apikey})))
项目:zap-api-python    作者:zaproxy    | 项目源码 | 文件源码
def load(self, scriptname, scripttype, scriptengine, filename, scriptdescription=None, charset=None, apikey=''):
        """
        Loads a script into ZAP from the given local file, with the given name, type and engine, optionally with a description, and a charset name to read the script (the charset name is required if the script is not in UTF-8, for example, in ISO-8859-1).
        """
        params = {'scriptName': scriptname, 'scriptType': scripttype, 'scriptEngine': scriptengine, 'fileName': filename, 'apikey': apikey}
        if scriptdescription is not None:
            params['scriptDescription'] = scriptdescription
        if charset is not None:
            params['charset'] = charset
        return six.next(six.itervalues(self.zap._request(self.zap.base + 'script/action/load/', params)))
项目:zap-api-python    作者:zaproxy    | 项目源码 | 文件源码
def remove(self, scriptname, apikey=''):
        """
        Removes the script with the given name
        """
        return six.next(six.itervalues(self.zap._request(self.zap.base + 'script/action/remove/', {'scriptName': scriptname, 'apikey': apikey})))
项目:zap-api-python    作者:zaproxy    | 项目源码 | 文件源码
def run_stand_alone_script(self, scriptname, apikey=''):
        """
        Runs the stand alone script with the give name
        """
        return six.next(six.itervalues(self.zap._request(self.zap.base + 'script/action/runStandAloneScript/', {'scriptName': scriptname, 'apikey': apikey})))
项目:zap-api-python    作者:zaproxy    | 项目源码 | 文件源码
def import_url(self, url, hostoverride=None, apikey=''):
        """
        Import an Open API definition from a URL, hostOverride allows the host to be replaced
        This component is optional and therefore the API will only work if it is installed
        """
        params = {'url': url, 'apikey': apikey}
        if hostoverride is not None:
            params['hostOverride'] = hostoverride
        return six.next(six.itervalues(self.zap._request(self.zap.base + 'openapi/action/importUrl/', params)))
项目:zap-api-python    作者:zaproxy    | 项目源码 | 文件源码
def option_tokens_names(self):
        """
        Lists the names of all anti-CSRF tokens
        """
        return six.next(six.itervalues(self.zap._request(self.zap.base + 'acsrf/view/optionTokensNames/')))
项目:zap-api-python    作者:zaproxy    | 项目源码 | 文件源码
def add_option_token(self, string, apikey=''):
        """
        Adds an anti-CSRF token with the given name, enabled by default
        """
        return six.next(six.itervalues(self.zap._request(self.zap.base + 'acsrf/action/addOptionToken/', {'String': string, 'apikey': apikey})))
项目:zap-api-python    作者:zaproxy    | 项目源码 | 文件源码
def remove_option_token(self, string, apikey=''):
        """
        Removes the anti-CSRF token with the given name
        """
        return six.next(six.itervalues(self.zap._request(self.zap.base + 'acsrf/action/removeOptionToken/', {'String': string, 'apikey': apikey})))
项目:zap-api-python    作者:zaproxy    | 项目源码 | 文件源码
def status(self, scanid=None):
        params = {}
        if scanid is not None:
            params['scanId'] = scanid
        return six.next(six.itervalues(self.zap._request(self.zap.base + 'ascan/view/status/', params)))
项目:zap-api-python    作者:zaproxy    | 项目源码 | 文件源码
def messages_ids(self, scanid):
        """
        Gets the IDs of the messages sent during the scan with the given ID. A message can be obtained with 'message' core view.
        """
        return six.next(six.itervalues(self.zap._request(self.zap.base + 'ascan/view/messagesIds/', {'scanId': scanid})))