Python psycopg2.extensions 模块,quote_ident() 实例源码

我们从Python开源项目中,提取了以下26个代码示例,用于说明如何使用psycopg2.extensions.quote_ident()

项目:django-mpathy    作者:craigds    | 项目源码 | 文件源码
def post_migrate_mpathnode(model):
    # Note: model *isn't* a subclass of MPathNode, because django migrations are Weird.
    # if not issubclass(model, MPathNode):
    # Hence the following workaround:
    try:
        ltree_field = model._meta.get_field('ltree')
        if not isinstance(ltree_field, LTreeField):
            return
    except FieldDoesNotExist:
        return

    names = {
        "table": quote_ident(model._meta.db_table, connection.connection),
        "check_constraint": quote_ident('%s__check_ltree' % model._meta.db_table, connection.connection),
    }

    cur = connection.cursor()
    # Check that the ltree is always consistent with being a child of _parent
    cur.execute('''
        ALTER TABLE %(table)s ADD CONSTRAINT %(check_constraint)s CHECK (
            (parent_id IS NOT NULL AND ltree ~ (parent_id::text || '.*{1}')::lquery)
            OR (parent_id IS NULL AND ltree ~ '*{1}'::lquery)
        )
    ''' % names)
项目:psycopg2-for-aws-lambda    作者:iwitaly    | 项目源码 | 文件源码
def test_identifier(self):
        from psycopg2.extensions import quote_ident
        self.assertEqual(quote_ident('blah-blah', self.conn), '"blah-blah"')
        self.assertEqual(quote_ident('quote"inside', self.conn), '"quote""inside"')
项目:psycopg2-for-aws-lambda    作者:iwitaly    | 项目源码 | 文件源码
def test_unicode_ident(self):
        from psycopg2.extensions import quote_ident
        snowman = "\u2603"
        quoted = '"' + snowman + '"'
        if sys.version_info[0] < 3:
            self.assertEqual(quote_ident(snowman, self.conn), quoted.encode('utf8'))
        else:
            self.assertEqual(quote_ident(snowman, self.conn), quoted)
项目:psycopg2-for-aws-lambda    作者:iwitaly    | 项目源码 | 文件源码
def create_replication_slot(self, slot_name, slot_type=None, output_plugin=None):
        """Create streaming replication slot."""

        command = "CREATE_REPLICATION_SLOT %s " % quote_ident(slot_name, self)

        if slot_type is None:
            slot_type = self.connection.replication_type

        if slot_type == REPLICATION_LOGICAL:
            if output_plugin is None:
                raise psycopg2.ProgrammingError(
                    "output plugin name is required to create "
                    "logical replication slot")

            command += "LOGICAL %s" % quote_ident(output_plugin, self)

        elif slot_type == REPLICATION_PHYSICAL:
            if output_plugin is not None:
                raise psycopg2.ProgrammingError(
                    "cannot specify output plugin name when creating "
                    "physical replication slot")

            command += "PHYSICAL"

        else:
            raise psycopg2.ProgrammingError(
                "unrecognized replication type: %s" % repr(slot_type))

        self.execute(command)
项目:psycopg2-for-aws-lambda    作者:iwitaly    | 项目源码 | 文件源码
def drop_replication_slot(self, slot_name):
        """Drop streaming replication slot."""

        command = "DROP_REPLICATION_SLOT %s" % quote_ident(slot_name, self)
        self.execute(command)
项目:psycopg2-for-aws-lambda    作者:iwitaly    | 项目源码 | 文件源码
def test_identifier(self):
        from psycopg2.extensions import quote_ident
        self.assertEqual(quote_ident('blah-blah', self.conn), '"blah-blah"')
        self.assertEqual(quote_ident('quote"inside', self.conn), '"quote""inside"')
项目:psycopg2-for-aws-lambda    作者:iwitaly    | 项目源码 | 文件源码
def test_unicode_ident(self):
        from psycopg2.extensions import quote_ident
        snowman = u"\u2603"
        quoted = '"' + snowman + '"'
        if sys.version_info[0] < 3:
            self.assertEqual(quote_ident(snowman, self.conn), quoted.encode('utf8'))
        else:
            self.assertEqual(quote_ident(snowman, self.conn), quoted)
项目:psycopg2-for-aws-lambda    作者:iwitaly    | 项目源码 | 文件源码
def create_replication_slot(self, slot_name, slot_type=None, output_plugin=None):
        """Create streaming replication slot."""

        command = "CREATE_REPLICATION_SLOT %s " % quote_ident(slot_name, self)

        if slot_type is None:
            slot_type = self.connection.replication_type

        if slot_type == REPLICATION_LOGICAL:
            if output_plugin is None:
                raise psycopg2.ProgrammingError(
                    "output plugin name is required to create "
                    "logical replication slot")

            command += "LOGICAL %s" % quote_ident(output_plugin, self)

        elif slot_type == REPLICATION_PHYSICAL:
            if output_plugin is not None:
                raise psycopg2.ProgrammingError(
                    "cannot specify output plugin name when creating "
                    "physical replication slot")

            command += "PHYSICAL"

        else:
            raise psycopg2.ProgrammingError(
                "unrecognized replication type: %s" % repr(slot_type))

        self.execute(command)
项目:psycopg2-for-aws-lambda    作者:iwitaly    | 项目源码 | 文件源码
def drop_replication_slot(self, slot_name):
        """Drop streaming replication slot."""

        command = "DROP_REPLICATION_SLOT %s" % quote_ident(slot_name, self)
        self.execute(command)
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def test_identifier(self):
        from psycopg2.extensions import quote_ident
        self.assertEqual(quote_ident('blah-blah', self.conn), '"blah-blah"')
        self.assertEqual(quote_ident('quote"inside', self.conn), '"quote""inside"')
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def test_unicode_ident(self):
        from psycopg2.extensions import quote_ident
        snowman = "\u2603"
        quoted = '"' + snowman + '"'
        if sys.version_info[0] < 3:
            self.assertEqual(quote_ident(snowman, self.conn), quoted.encode('utf8'))
        else:
            self.assertEqual(quote_ident(snowman, self.conn), quoted)
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def create_replication_slot(self, slot_name, slot_type=None, output_plugin=None):
        """Create streaming replication slot."""

        command = "CREATE_REPLICATION_SLOT %s " % quote_ident(slot_name, self)

        if slot_type is None:
            slot_type = self.connection.replication_type

        if slot_type == REPLICATION_LOGICAL:
            if output_plugin is None:
                raise psycopg2.ProgrammingError(
                    "output plugin name is required to create "
                    "logical replication slot")

            command += "LOGICAL %s" % quote_ident(output_plugin, self)

        elif slot_type == REPLICATION_PHYSICAL:
            if output_plugin is not None:
                raise psycopg2.ProgrammingError(
                    "cannot specify output plugin name when creating "
                    "physical replication slot")

            command += "PHYSICAL"

        else:
            raise psycopg2.ProgrammingError(
                "unrecognized replication type: %s" % repr(slot_type))

        self.execute(command)
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def drop_replication_slot(self, slot_name):
        """Drop streaming replication slot."""

        command = "DROP_REPLICATION_SLOT %s" % quote_ident(slot_name, self)
        self.execute(command)
项目:nmbs-realtime-feed    作者:datamindedbe    | 项目源码 | 文件源码
def test_identifier(self):
        from psycopg2.extensions import quote_ident
        self.assertEqual(quote_ident('blah-blah', self.conn), '"blah-blah"')
        self.assertEqual(quote_ident('quote"inside', self.conn), '"quote""inside"')
项目:nmbs-realtime-feed    作者:datamindedbe    | 项目源码 | 文件源码
def test_unicode_ident(self):
        from psycopg2.extensions import quote_ident
        snowman = u"\u2603"
        quoted = '"' + snowman + '"'
        if sys.version_info[0] < 3:
            self.assertEqual(quote_ident(snowman, self.conn), quoted.encode('utf8'))
        else:
            self.assertEqual(quote_ident(snowman, self.conn), quoted)
项目:nmbs-realtime-feed    作者:datamindedbe    | 项目源码 | 文件源码
def create_replication_slot(self, slot_name, slot_type=None, output_plugin=None):
        """Create streaming replication slot."""

        command = "CREATE_REPLICATION_SLOT %s " % quote_ident(slot_name, self)

        if slot_type is None:
            slot_type = self.connection.replication_type

        if slot_type == REPLICATION_LOGICAL:
            if output_plugin is None:
                raise psycopg2.ProgrammingError(
                    "output plugin name is required to create "
                    "logical replication slot")

            command += "LOGICAL %s" % quote_ident(output_plugin, self)

        elif slot_type == REPLICATION_PHYSICAL:
            if output_plugin is not None:
                raise psycopg2.ProgrammingError(
                    "cannot specify output plugin name when creating "
                    "physical replication slot")

            command += "PHYSICAL"

        else:
            raise psycopg2.ProgrammingError(
                "unrecognized replication type: %s" % repr(slot_type))

        self.execute(command)
项目:nmbs-realtime-feed    作者:datamindedbe    | 项目源码 | 文件源码
def drop_replication_slot(self, slot_name):
        """Drop streaming replication slot."""

        command = "DROP_REPLICATION_SLOT %s" % quote_ident(slot_name, self)
        self.execute(command)
项目:flask    作者:bobohope    | 项目源码 | 文件源码
def test_identifier(self):
        from psycopg2.extensions import quote_ident
        self.assertEqual(quote_ident('blah-blah', self.conn), '"blah-blah"')
        self.assertEqual(quote_ident('quote"inside', self.conn), '"quote""inside"')
项目:flask    作者:bobohope    | 项目源码 | 文件源码
def test_unicode_ident(self):
        from psycopg2.extensions import quote_ident
        snowman = u"\u2603"
        quoted = '"' + snowman + '"'
        if sys.version_info[0] < 3:
            self.assertEqual(quote_ident(snowman, self.conn), quoted.encode('utf8'))
        else:
            self.assertEqual(quote_ident(snowman, self.conn), quoted)
项目:flask    作者:bobohope    | 项目源码 | 文件源码
def create_replication_slot(self, slot_name, slot_type=None, output_plugin=None):
        """Create streaming replication slot."""

        command = "CREATE_REPLICATION_SLOT %s " % quote_ident(slot_name, self)

        if slot_type is None:
            slot_type = self.connection.replication_type

        if slot_type == REPLICATION_LOGICAL:
            if output_plugin is None:
                raise psycopg2.ProgrammingError(
                    "output plugin name is required to create "
                    "logical replication slot")

            command += "LOGICAL %s" % quote_ident(output_plugin, self)

        elif slot_type == REPLICATION_PHYSICAL:
            if output_plugin is not None:
                raise psycopg2.ProgrammingError(
                    "cannot specify output plugin name when creating "
                    "physical replication slot")

            command += "PHYSICAL"

        else:
            raise psycopg2.ProgrammingError(
                "unrecognized replication type: %s" % repr(slot_type))

        self.execute(command)
项目:flask    作者:bobohope    | 项目源码 | 文件源码
def drop_replication_slot(self, slot_name):
        """Drop streaming replication slot."""

        command = "DROP_REPLICATION_SLOT %s" % quote_ident(slot_name, self)
        self.execute(command)
项目:psycopg2-for-aws-lambda    作者:iwitaly    | 项目源码 | 文件源码
def start_replication(self, slot_name=None, slot_type=None, start_lsn=0,
                          timeline=0, options=None, decode=False):
        """Start replication stream."""

        command = "START_REPLICATION "

        if slot_type is None:
            slot_type = self.connection.replication_type

        if slot_type == REPLICATION_LOGICAL:
            if slot_name:
                command += "SLOT %s " % quote_ident(slot_name, self)
            else:
                raise psycopg2.ProgrammingError(
                    "slot name is required for logical replication")

            command += "LOGICAL "

        elif slot_type == REPLICATION_PHYSICAL:
            if slot_name:
                command += "SLOT %s " % quote_ident(slot_name, self)
            # don't add "PHYSICAL", before 9.4 it was just START_REPLICATION XXX/XXX

        else:
            raise psycopg2.ProgrammingError(
                "unrecognized replication type: %s" % repr(slot_type))

        if type(start_lsn) is str:
            lsn = start_lsn.split('/')
            lsn = "%X/%08X" % (int(lsn[0], 16), int(lsn[1], 16))
        else:
            lsn = "%X/%08X" % ((start_lsn >> 32) & 0xFFFFFFFF,
                               start_lsn & 0xFFFFFFFF)

        command += lsn

        if timeline != 0:
            if slot_type == REPLICATION_LOGICAL:
                raise psycopg2.ProgrammingError(
                    "cannot specify timeline for logical replication")

            command += " TIMELINE %d" % timeline

        if options:
            if slot_type == REPLICATION_PHYSICAL:
                raise psycopg2.ProgrammingError(
                    "cannot specify output plugin options for physical replication")

            command += " ("
            for k, v in options.items():
                if not command.endswith('('):
                    command += ", "
                command += "%s %s" % (quote_ident(k, self), _A(str(v)))
            command += ")"

        self.start_replication_expert(command, decode=decode)

    # allows replication cursors to be used in select.select() directly
项目:psycopg2-for-aws-lambda    作者:iwitaly    | 项目源码 | 文件源码
def start_replication(self, slot_name=None, slot_type=None, start_lsn=0,
                          timeline=0, options=None, decode=False):
        """Start replication stream."""

        command = "START_REPLICATION "

        if slot_type is None:
            slot_type = self.connection.replication_type

        if slot_type == REPLICATION_LOGICAL:
            if slot_name:
                command += "SLOT %s " % quote_ident(slot_name, self)
            else:
                raise psycopg2.ProgrammingError(
                    "slot name is required for logical replication")

            command += "LOGICAL "

        elif slot_type == REPLICATION_PHYSICAL:
            if slot_name:
                command += "SLOT %s " % quote_ident(slot_name, self)
            # don't add "PHYSICAL", before 9.4 it was just START_REPLICATION XXX/XXX

        else:
            raise psycopg2.ProgrammingError(
                "unrecognized replication type: %s" % repr(slot_type))

        if type(start_lsn) is str:
            lsn = start_lsn.split('/')
            lsn = "%X/%08X" % (int(lsn[0], 16), int(lsn[1], 16))
        else:
            lsn = "%X/%08X" % ((start_lsn >> 32) & 0xFFFFFFFF,
                               start_lsn & 0xFFFFFFFF)

        command += lsn

        if timeline != 0:
            if slot_type == REPLICATION_LOGICAL:
                raise psycopg2.ProgrammingError(
                    "cannot specify timeline for logical replication")

            command += " TIMELINE %d" % timeline

        if options:
            if slot_type == REPLICATION_PHYSICAL:
                raise psycopg2.ProgrammingError(
                    "cannot specify output plugin options for physical replication")

            command += " ("
            for k, v in options.iteritems():
                if not command.endswith('('):
                    command += ", "
                command += "%s %s" % (quote_ident(k, self), _A(str(v)))
            command += ")"

        self.start_replication_expert(command, decode=decode)

    # allows replication cursors to be used in select.select() directly
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def start_replication(self, slot_name=None, slot_type=None, start_lsn=0,
                          timeline=0, options=None, decode=False):
        """Start replication stream."""

        command = "START_REPLICATION "

        if slot_type is None:
            slot_type = self.connection.replication_type

        if slot_type == REPLICATION_LOGICAL:
            if slot_name:
                command += "SLOT %s " % quote_ident(slot_name, self)
            else:
                raise psycopg2.ProgrammingError(
                    "slot name is required for logical replication")

            command += "LOGICAL "

        elif slot_type == REPLICATION_PHYSICAL:
            if slot_name:
                command += "SLOT %s " % quote_ident(slot_name, self)
            # don't add "PHYSICAL", before 9.4 it was just START_REPLICATION XXX/XXX

        else:
            raise psycopg2.ProgrammingError(
                "unrecognized replication type: %s" % repr(slot_type))

        if type(start_lsn) is str:
            lsn = start_lsn.split('/')
            lsn = "%X/%08X" % (int(lsn[0], 16), int(lsn[1], 16))
        else:
            lsn = "%X/%08X" % ((start_lsn >> 32) & 0xFFFFFFFF,
                               start_lsn & 0xFFFFFFFF)

        command += lsn

        if timeline != 0:
            if slot_type == REPLICATION_LOGICAL:
                raise psycopg2.ProgrammingError(
                    "cannot specify timeline for logical replication")

            command += " TIMELINE %d" % timeline

        if options:
            if slot_type == REPLICATION_PHYSICAL:
                raise psycopg2.ProgrammingError(
                    "cannot specify output plugin options for physical replication")

            command += " ("
            for k, v in options.items():
                if not command.endswith('('):
                    command += ", "
                command += "%s %s" % (quote_ident(k, self), _A(str(v)))
            command += ")"

        self.start_replication_expert(command, decode=decode)

    # allows replication cursors to be used in select.select() directly
项目:nmbs-realtime-feed    作者:datamindedbe    | 项目源码 | 文件源码
def start_replication(self, slot_name=None, slot_type=None, start_lsn=0,
                          timeline=0, options=None, decode=False):
        """Start replication stream."""

        command = "START_REPLICATION "

        if slot_type is None:
            slot_type = self.connection.replication_type

        if slot_type == REPLICATION_LOGICAL:
            if slot_name:
                command += "SLOT %s " % quote_ident(slot_name, self)
            else:
                raise psycopg2.ProgrammingError(
                    "slot name is required for logical replication")

            command += "LOGICAL "

        elif slot_type == REPLICATION_PHYSICAL:
            if slot_name:
                command += "SLOT %s " % quote_ident(slot_name, self)
            # don't add "PHYSICAL", before 9.4 it was just START_REPLICATION XXX/XXX

        else:
            raise psycopg2.ProgrammingError(
                "unrecognized replication type: %s" % repr(slot_type))

        if type(start_lsn) is str:
            lsn = start_lsn.split('/')
            lsn = "%X/%08X" % (int(lsn[0], 16), int(lsn[1], 16))
        else:
            lsn = "%X/%08X" % ((start_lsn >> 32) & 0xFFFFFFFF,
                               start_lsn & 0xFFFFFFFF)

        command += lsn

        if timeline != 0:
            if slot_type == REPLICATION_LOGICAL:
                raise psycopg2.ProgrammingError(
                    "cannot specify timeline for logical replication")

            command += " TIMELINE %d" % timeline

        if options:
            if slot_type == REPLICATION_PHYSICAL:
                raise psycopg2.ProgrammingError(
                    "cannot specify output plugin options for physical replication")

            command += " ("
            for k, v in options.iteritems():
                if not command.endswith('('):
                    command += ", "
                command += "%s %s" % (quote_ident(k, self), _A(str(v)))
            command += ")"

        self.start_replication_expert(command, decode=decode)

    # allows replication cursors to be used in select.select() directly
项目:flask    作者:bobohope    | 项目源码 | 文件源码
def start_replication(self, slot_name=None, slot_type=None, start_lsn=0,
                          timeline=0, options=None, decode=False):
        """Start replication stream."""

        command = "START_REPLICATION "

        if slot_type is None:
            slot_type = self.connection.replication_type

        if slot_type == REPLICATION_LOGICAL:
            if slot_name:
                command += "SLOT %s " % quote_ident(slot_name, self)
            else:
                raise psycopg2.ProgrammingError(
                    "slot name is required for logical replication")

            command += "LOGICAL "

        elif slot_type == REPLICATION_PHYSICAL:
            if slot_name:
                command += "SLOT %s " % quote_ident(slot_name, self)
            # don't add "PHYSICAL", before 9.4 it was just START_REPLICATION XXX/XXX

        else:
            raise psycopg2.ProgrammingError(
                "unrecognized replication type: %s" % repr(slot_type))

        if type(start_lsn) is str:
            lsn = start_lsn.split('/')
            lsn = "%X/%08X" % (int(lsn[0], 16), int(lsn[1], 16))
        else:
            lsn = "%X/%08X" % ((start_lsn >> 32) & 0xFFFFFFFF,
                               start_lsn & 0xFFFFFFFF)

        command += lsn

        if timeline != 0:
            if slot_type == REPLICATION_LOGICAL:
                raise psycopg2.ProgrammingError(
                    "cannot specify timeline for logical replication")

            command += " TIMELINE %d" % timeline

        if options:
            if slot_type == REPLICATION_PHYSICAL:
                raise psycopg2.ProgrammingError(
                    "cannot specify output plugin options for physical replication")

            command += " ("
            for k, v in options.iteritems():
                if not command.endswith('('):
                    command += ", "
                command += "%s %s" % (quote_ident(k, self), _A(str(v)))
            command += ")"

        self.start_replication_expert(command, decode=decode)

    # allows replication cursors to be used in select.select() directly