我们从Python开源项目中,提取了以下35个代码示例,用于说明如何使用django.utils.six.moves.zip()。
def previous_current_next(items): """ From http://www.wordaligned.org/articles/zippy-triples-served-with-python Creates an iterator which returns (previous, current, next) triples, with ``None`` filling in when there is no previous or next available. """ extend = itertools.chain([None], items, [None]) prev, cur, nex = itertools.tee(extend, 3) # Advancing an iterator twice when we know there are two items (the # two Nones at the start and at the end) will never fail except if # `items` is some funny StopIteration-raising generator. There's no point # in swallowing this exception. next(cur) next(nex) next(nex) return zip(prev, cur, nex)
def get_combinator_sql(self, combinator, all): features = self.connection.features compilers = [ query.get_compiler(self.using, self.connection) for query in self.query.combined_queries ] if not features.supports_slicing_ordering_in_compound: for query, compiler in zip(self.query.combined_queries, compilers): if query.low_mark or query.high_mark: raise DatabaseError('LIMIT/OFFSET not allowed in subqueries of compound statements.') if compiler.get_order_by(): raise DatabaseError('ORDER BY not allowed in subqueries of compound statements.') parts = (compiler.as_sql() for compiler in compilers) combinator_sql = self.connection.ops.set_operators[combinator] if all and combinator == 'union': combinator_sql += ' ALL' braces = '({})' if features.supports_slicing_ordering_in_compound else '{}' sql_parts, args_parts = zip(*((braces.format(sql), args) for sql, args in parts)) result = [' {} '.format(combinator_sql).join(sql_parts)] params = [] for part in args_parts: params.extend(part) return result, params
def get_combinator_sql(self, combinator, all): features = self.connection.features compilers = [ query.get_compiler(self.using, self.connection) for query in self.query.combined_queries if not query.is_empty() ] if not features.supports_slicing_ordering_in_compound: for query, compiler in zip(self.query.combined_queries, compilers): if query.low_mark or query.high_mark: raise DatabaseError('LIMIT/OFFSET not allowed in subqueries of compound statements.') if compiler.get_order_by(): raise DatabaseError('ORDER BY not allowed in subqueries of compound statements.') parts = (compiler.as_sql() for compiler in compilers) combinator_sql = self.connection.ops.set_operators[combinator] if all and combinator == 'union': combinator_sql += ' ALL' braces = '({})' if features.supports_slicing_ordering_in_compound else '{}' sql_parts, args_parts = zip(*((braces.format(sql), args) for sql, args in parts)) result = [' {} '.format(combinator_sql).join(sql_parts)] params = [] for part in args_parts: params.extend(part) return result, params
def assemble_as_sql(self, fields, value_rows): """ Take a sequence of N fields and a sequence of M rows of values, generate placeholder SQL and parameters for each field and value, and return a pair containing: * a sequence of M rows of N SQL placeholder strings, and * a sequence of M rows of corresponding parameter values. Each placeholder string may contain any number of '%s' interpolation strings, and each parameter row will contain exactly as many params as the total number of '%s's in the corresponding placeholder row. """ if not value_rows: return [], [] # list of (sql, [params]) tuples for each object to be saved # Shape: [n_objs][n_fields][2] rows_of_fields_as_sql = ( (self.field_as_sql(field, v) for field, v in zip(fields, row)) for row in value_rows ) # tuple like ([sqls], [[params]s]) for each object to be saved # Shape: [n_objs][2][n_fields] sql_and_param_pair_rows = (zip(*row) for row in rows_of_fields_as_sql) # Extract separate lists for placeholders and params. # Each of these has shape [n_objs][n_fields] placeholder_rows, param_rows = zip(*sql_and_param_pair_rows) # Params for each field are still lists, and need to be flattened. param_rows = [[p for ps in row for p in ps] for row in param_rows] return placeholder_rows, param_rows
def from_db(cls, db, field_names, values): if cls._deferred: new = cls(**dict(zip(field_names, values))) else: new = cls(*values) new._state.adding = False new._state.db = db return new
def build_aggregate(self, queryset, compiler): agg = super(ManyToOneConstructor, self).build_aggregate(queryset, compiler) # Any ordering needs to be plucked from the query set and added into the JSONAgg that we will be build # because SQL kwargs = {} if queryset.ordered: kwargs['order_by'] = next(zip(*queryset.query.get_compiler(connection=compiler.connection).get_order_by())) queryset.query.clear_ordering(True) # many_to_one is a bit of a misnomer, the field we have is the "one" side return JSONAgg(agg, **kwargs)
def _salt_cipher_secret(secret): """ Given a secret (assumed to be a string of CSRF_ALLOWED_CHARS), generate a token by adding a salt and using it to encrypt the secret. """ salt = _get_new_csrf_string() chars = CSRF_ALLOWED_CHARS pairs = zip((chars.index(x) for x in secret), (chars.index(x) for x in salt)) cipher = ''.join(chars[(x + y) % len(chars)] for x, y in pairs) return salt + cipher
def _unsalt_cipher_token(token): """ Given a token (assumed to be a string of CSRF_ALLOWED_CHARS, of length CSRF_TOKEN_LENGTH, and that its first half is a salt), use it to decrypt the second half to produce the original secret. """ salt = token[:CSRF_SECRET_LENGTH] token = token[CSRF_SECRET_LENGTH:] chars = CSRF_ALLOWED_CHARS pairs = zip((chars.index(x) for x in token), (chars.index(x) for x in salt)) secret = ''.join(chars[x - y] for x, y in pairs) # Note negative values are ok return secret
def resolve_columns(self, row, fields=()): """ This routine is necessary so that distances and geometries returned from extra selection SQL get resolved appropriately into Python objects. """ values = [] aliases = list(self.query.extra_select) # Have to set a starting row number offset that is used for # determining the correct starting row index -- needed for # doing pagination with Oracle. rn_offset = 0 if self.connection.ops.oracle: if self.query.high_mark is not None or self.query.low_mark: rn_offset = 1 index_start = rn_offset + len(aliases) # Converting any extra selection values (e.g., geometries and # distance objects added by GeoQuerySet methods). values = [self.query.convert_values(v, self.query.extra_select_fields.get(a, None), self.connection) for v, a in zip(row[rn_offset:index_start], aliases)] if self.connection.ops.oracle or getattr(self.query, 'geo_values', False): # We resolve the rest of the columns if we're on Oracle or if # the `geo_values` attribute is defined. for value, field in zip_longest(row[index_start:], fields): values.append(self.query.convert_values(value, field, self.connection)) else: values.extend(row[index_start:]) return tuple(values) #### Routines unique to GeoQuery ####
def as_sql(self): # We don't need quote_name_unless_alias() here, since these are all # going to be column names (so we can avoid the extra overhead). qn = self.connection.ops.quote_name opts = self.query.get_meta() result = ['INSERT INTO %s' % qn(opts.db_table)] has_fields = bool(self.query.fields) fields = self.query.fields if has_fields else [opts.pk] result.append('(%s)' % ', '.join(qn(f.column) for f in fields)) if has_fields: value_rows = [ [self.prepare_value(field, self.pre_save_val(field, obj)) for field in fields] for obj in self.query.objs ] else: # An empty object. value_rows = [[self.connection.ops.pk_default_value()] for _ in self.query.objs] fields = [None] # Currently the backends just accept values when generating bulk # queries and generate their own placeholders. Doing that isn't # necessary and it should be possible to use placeholders and # expressions in bulk inserts too. can_bulk = (not self.return_id and self.connection.features.has_bulk_insert) placeholder_rows, param_rows = self.assemble_as_sql(fields, value_rows) if self.return_id and self.connection.features.can_return_id_from_insert: params = param_rows[0] col = "%s.%s" % (qn(opts.db_table), qn(opts.pk.column)) result.append("VALUES (%s)" % ", ".join(placeholder_rows[0])) r_fmt, r_params = self.connection.ops.return_insert_id() # Skip empty r_fmt to allow subclasses to customize behavior for # 3rd party backends. Refs #19096. if r_fmt: result.append(r_fmt % col) params += r_params return [(" ".join(result), tuple(params))] if can_bulk: result.append(self.connection.ops.bulk_insert_sql(fields, placeholder_rows)) return [(" ".join(result), tuple(p for ps in param_rows for p in ps))] else: return [ (" ".join(result + ["VALUES (%s)" % ", ".join(p)]), vals) for p, vals in zip(placeholder_rows, param_rows) ]
def flatten_result(source): """ Turns the given source sequence into a list of reg-exp possibilities and their arguments. Returns a list of strings and a list of argument lists. Each of the two lists will be of the same length. """ if source is None: return [''], [[]] if isinstance(source, Group): if source[1] is None: params = [] else: params = [source[1]] return [source[0]], [params] result = [''] result_args = [[]] pos = last = 0 for pos, elt in enumerate(source): if isinstance(elt, six.string_types): continue piece = ''.join(source[last:pos]) if isinstance(elt, Group): piece += elt[0] param = elt[1] else: param = None last = pos + 1 for i in range(len(result)): result[i] += piece if param: result_args[i].append(param) if isinstance(elt, (Choice, NonCapture)): if isinstance(elt, NonCapture): elt = [elt] inner_result, inner_args = [], [] for item in elt: res, args = flatten_result(item) inner_result.extend(res) inner_args.extend(args) new_result = [] new_args = [] for item, args in zip(result, result_args): for i_item, i_args in zip(inner_result, inner_args): new_result.append(item + i_item) new_args.append(args[:] + i_args) result = new_result result_args = new_args if pos >= last: piece = ''.join(source[last:]) for i in range(len(result)): result[i] += piece return result, result_args
def parse_nested(cls, instance, field, nested, datas): if field.many_to_one or field.one_to_one: datas = (datas, ) ps = [] # Fun caveat of throwing everything into JSON, it doesn't support datetimes. Everything gets sent back as iso8601 strings # Make a list of all fields that should be datetimes and parse them ahead of time dts = [i for i, f in enumerate(field.related_model._meta.concrete_fields) if isinstance(f, models.DateTimeField)] for data in datas or []: if data is None: ps.append(None) continue data, nested_data = data[:-len(nested) or None], data[-len(nested):] for i in dts: data[i] = util.parse_datetime(data[i]) # from_db expects the final argument to be a tuple of fields in the order of concrete_fields parsed = field.related_model.from_db(instance._state.db, None, data) for (f, n), d in zip(nested.items(), nested_data): cls.parse_nested(parsed, f, n, d) if field.remote_field.concrete: setattr(parsed, field.remote_field.get_cache_name(), instance) ps.append(parsed) if (field.many_to_one or field.one_to_one) and ps: return setattr(instance, field.get_cache_name(), ps[0]) if not hasattr(instance, '_prefetched_objects_cache'): instance._prefetched_objects_cache = {} if hasattr(field, 'get_accessor_name'): accessor_name = field.get_accessor_name() else: accessor_name = field.name # get_queryset() sets a bunch of attributes for us and will respect any custom managers instance._prefetched_objects_cache[field.name] = getattr(instance, accessor_name).get_queryset() instance._prefetched_objects_cache[field.name]._result_cache = ps instance._prefetched_objects_cache[field.name]._prefetch_done = True
def as_sql(self): # We don't need quote_name_unless_alias() here, since these are all # going to be column names (so we can avoid the extra overhead). qn = self.connection.ops.quote_name opts = self.query.get_meta() result = ['INSERT INTO %s' % qn(opts.db_table)] has_fields = bool(self.query.fields) fields = self.query.fields if has_fields else [opts.pk] result.append('(%s)' % ', '.join(qn(f.column) for f in fields)) if has_fields: value_rows = [ [self.prepare_value(field, self.pre_save_val(field, obj)) for field in fields] for obj in self.query.objs ] else: # An empty object. value_rows = [[self.connection.ops.pk_default_value()] for _ in self.query.objs] fields = [None] # Currently the backends just accept values when generating bulk # queries and generate their own placeholders. Doing that isn't # necessary and it should be possible to use placeholders and # expressions in bulk inserts too. can_bulk = (not self.return_id and self.connection.features.has_bulk_insert) placeholder_rows, param_rows = self.assemble_as_sql(fields, value_rows) if self.return_id and self.connection.features.can_return_id_from_insert: if self.connection.features.can_return_ids_from_bulk_insert: result.append(self.connection.ops.bulk_insert_sql(fields, placeholder_rows)) params = param_rows else: result.append("VALUES (%s)" % ", ".join(placeholder_rows[0])) params = [param_rows[0]] col = "%s.%s" % (qn(opts.db_table), qn(opts.pk.column)) r_fmt, r_params = self.connection.ops.return_insert_id() # Skip empty r_fmt to allow subclasses to customize behavior for # 3rd party backends. Refs #19096. if r_fmt: result.append(r_fmt % col) params += [r_params] return [(" ".join(result), tuple(chain.from_iterable(params)))] if can_bulk: result.append(self.connection.ops.bulk_insert_sql(fields, placeholder_rows)) return [(" ".join(result), tuple(p for ps in param_rows for p in ps))] else: return [ (" ".join(result + ["VALUES (%s)" % ", ".join(p)]), vals) for p, vals in zip(placeholder_rows, param_rows) ]
def copy_plugins_to(old_plugins, to_placeholder, to_language=None, parent_plugin_id=None, no_signals=False): """ Copies a list of plugins to a placeholder to a language. """ # TODO: Refactor this and copy_plugins to cleanly separate plugin tree/node # copying and remove the need for the mutating parameter old_parent_cache. old_parent_cache = {} # For subplugin copy, top-level plugin's parent must be nulled # before copying. if old_plugins: old_parent = old_plugins[0].parent for old_plugin in old_plugins: if old_plugin.parent == old_parent: old_plugin.parent = old_plugin.parent_id = None new_plugins = [] for old in old_plugins: new_plugins.append( old.copy_plugin(to_placeholder, to_language or old.language, old_parent_cache, no_signals)) if new_plugins and parent_plugin_id: from cms.models import CMSPlugin parent_plugin = CMSPlugin.objects.get(pk=parent_plugin_id) for idx, plugin in enumerate(new_plugins): if plugin.parent_id is None: plugin.parent_id = parent_plugin_id # Always use update fields to avoid side-effects. # In this case "plugin" has invalid values for internal fields # like numchild. # The invalid value is only in memory because the instance # was never updated. plugin.save(update_fields=['parent']) new_plugins[idx] = plugin.move(parent_plugin, pos="last-child") plugins_ziplist = list(zip(new_plugins, old_plugins)) # this magic is needed for advanced plugins like Text Plugins that can have # nested plugins and need to update their content based on the new plugins. for new_plugin, old_plugin in plugins_ziplist: new_instance = new_plugin.get_plugin_instance()[0] if new_instance: new_instance._no_reorder = True new_instance.post_copy(old_plugin, plugins_ziplist) # returns information about originals and copies return plugins_ziplist
def as_sql(self): # We don't need quote_name_unless_alias() here, since these are all # going to be column names (so we can avoid the extra overhead). qn = self.connection.ops.quote_name opts = self.query.get_meta() result = ['INSERT INTO %s' % qn(opts.db_table)] has_fields = bool(self.query.fields) fields = self.query.fields if has_fields else [opts.pk] result.append('(%s)' % ', '.join(qn(f.column) for f in fields)) if has_fields: params = values = [ [ f.get_db_prep_save( getattr(obj, f.attname) if self.query.raw else f.pre_save(obj, True), connection=self.connection ) for f in fields ] for obj in self.query.objs ] else: values = [[self.connection.ops.pk_default_value()] for obj in self.query.objs] params = [[]] fields = [None] can_bulk = (not any(hasattr(field, "get_placeholder") for field in fields) and not self.return_id and self.connection.features.has_bulk_insert) if can_bulk: placeholders = [["%s"] * len(fields)] else: placeholders = [ [self.placeholder(field, v) for field, v in zip(fields, val)] for val in values ] # Oracle Spatial needs to remove some values due to #10888 params = self.connection.ops.modify_insert_params(placeholders, params) if self.return_id and self.connection.features.can_return_id_from_insert: params = params[0] col = "%s.%s" % (qn(opts.db_table), qn(opts.pk.column)) result.append("VALUES (%s)" % ", ".join(placeholders[0])) r_fmt, r_params = self.connection.ops.return_insert_id() # Skip empty r_fmt to allow subclasses to customize behavior for # 3rd party backends. Refs #19096. if r_fmt: result.append(r_fmt % col) params += r_params return [(" ".join(result), tuple(params))] if can_bulk: result.append(self.connection.ops.bulk_insert_sql(fields, len(values))) return [(" ".join(result), tuple(v for val in values for v in val))] else: return [ (" ".join(result + ["VALUES (%s)" % ", ".join(p)]), vals) for p, vals in zip(placeholders, params) ]