• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python assertions.assert_invalid函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中assertions.assert_invalid函数的典型用法代码示例。如果您正苦于以下问题:Python assert_invalid函数的具体用法?Python assert_invalid怎么用?Python assert_invalid使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了assert_invalid函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: type_test

    def type_test(self):
        """
        Smoke test basic TYPE operations:

        - CREATE a type
        - CREATE a table using that type
        - ALTER the type and CREATE another table
        - DROP the tables and type
        - CREATE another table using the DROPped type and assert it fails with an InvalidRequest
        # TODO run SELECTs to make sure each statement works
        # TODO is this even necessary given the existence of the auth_tests?
        """
        session = self.prepare()

        session.execute("CREATE TYPE address_t (street text, city text, zip_code int)")
        session.execute("CREATE TABLE test4 (id int PRIMARY KEY, address frozen<address_t>)")

        session.execute("ALTER TYPE address_t ADD phones set<text>")
        session.execute("CREATE TABLE test5 (id int PRIMARY KEY, address frozen<address_t>)")

        session.execute("DROP TABLE test4")
        session.execute("DROP TABLE test5")
        session.execute("DROP TYPE address_t")
        assert_invalid(session,
                       "CREATE TABLE test6 (id int PRIMARY KEY, address frozen<address_t>)",
                       expected=InvalidRequest)
开发者ID:itskarlsson,项目名称:cassandra-dtest,代码行数:26,代码来源:cql_tests.py


示例2: index_test

    def index_test(self):
        """
        Smoke test CQL statements related to indexes:

        - CREATE a table
        - CREATE an index on that table
        - INSERT 10 values into the table
        - SELECT from the table over the indexed value and assert the expected values come back
        - drop the index
        - assert SELECTing over the indexed value raises an InvalidRequest
        # TODO run SELECTs to make sure each statement works
        """
        session = self.prepare()

        session.execute("CREATE TABLE test3 (k int PRIMARY KEY, v1 int, v2 int)")
        session.execute("CREATE INDEX testidx ON test3 (v1)")

        for i in range(0, 10):
            session.execute("INSERT INTO test3 (k, v1, v2) VALUES ({i}, {i}, {i})".format(i=i))

        res = session.execute("SELECT * FROM test3 WHERE v1 = 0")
        self.assertEqual(rows_to_list(res), [[0, 0, 0]])

        session.execute("DROP INDEX testidx")

        assert_invalid(session, "SELECT * FROM test3 where v1 = 0", expected=InvalidRequest)
开发者ID:itskarlsson,项目名称:cassandra-dtest,代码行数:26,代码来源:cql_tests.py


示例3: allow_filtering_test

    def allow_filtering_test(self):
        """Test that allow filtering works as usual for a materialized view"""

        session = self.prepare()

        session.execute("CREATE TABLE t (id int PRIMARY KEY, v int, v2 text, v3 decimal)")
        session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t "
                         "WHERE v IS NOT NULL AND id IS NOT NULL PRIMARY KEY (v, id)"))
        session.execute(("CREATE MATERIALIZED VIEW t_by_v2 AS SELECT * FROM t "
                         "WHERE v2 IS NOT NULL AND id IS NOT NULL PRIMARY KEY (v2, id)"))

        for i in xrange(1000):
            session.execute("INSERT INTO t (id, v, v2, v3) VALUES ({v}, {v}, 'a', 3.0)".format(v=i))

        for i in xrange(1000):
            assert_one(session, "SELECT * FROM t_by_v WHERE v = {v}".format(v=i), [i, i, 'a', 3.0])

        rows = session.execute("SELECT * FROM t_by_v2 WHERE v2 = 'a'")
        self.assertEqual(len(rows), 1000, "Expected 1000 rows but got {}".format(len(rows)))

        assert_invalid(session, "SELECT * FROM t_by_v WHERE v = 1 AND v2 = 'a'")
        assert_invalid(session, "SELECT * FROM t_by_v2 WHERE v2 = 'a' AND v = 1")

        for i in xrange(1000):
            assert_one(
                session,
                "SELECT * FROM t_by_v WHERE v = {} AND v3 = 3.0 ALLOW FILTERING".format(i),
                [i, i, 'a', 3.0]
            )
            assert_one(
                session,
                "SELECT * FROM t_by_v2 WHERE v2 = 'a' AND v = {} ALLOW FILTERING".format(i),
                ['a', i, i, 3.0]
            )
开发者ID:emolsson,项目名称:cassandra-dtest,代码行数:34,代码来源:materialized_views_test.py


示例4: check_permissions

    def check_permissions(self, node, upgraded):
        # use an exclusive connection to ensure we only talk to the specified node
        klaus = self.patient_exclusive_cql_connection(node, user='klaus', password='12345', timeout=20)
        # klaus is a superuser, so should be able to list all permissions
        # the output of LIST PERMISSIONS changes slightly with #7653 adding
        # a new role column to results, so we need to tailor our check
        # based on whether the node has been upgraded or not
        if not upgraded:
            assert_all(klaus,
                       'LIST ALL PERMISSIONS',
                       [['michael', '<table ks.cf1>', 'MODIFY'],
                        ['michael', '<table ks.cf2>', 'SELECT']])
        else:
            assert_all(klaus,
                       'LIST ALL PERMISSIONS',
                       [['michael', 'michael', '<table ks.cf1>', 'MODIFY'],
                        ['michael', 'michael', '<table ks.cf2>', 'SELECT']])

        klaus.shutdown()

        michael = self.patient_exclusive_cql_connection(node, user='michael', password='54321')
        michael.execute('INSERT INTO ks.cf1 (id, val) VALUES (0,0)')
        michael.execute('SELECT * FROM ks.cf2')
        assert_invalid(michael,
                       'SELECT * FROM ks.cf1',
                       'User michael has no SELECT permission on <table ks.cf1> or any of its parents',
                       Unauthorized)
        michael.shutdown()
开发者ID:agarwalmohit,项目名称:cassandra-dtest,代码行数:28,代码来源:upgrade_internal_auth_test.py


示例5: keyspace_test

    def keyspace_test(self):
        """
        Smoke test that basic keyspace operations work:

        - create a keyspace
        - USE that keyspace
        - ALTER it
        - #TODO: assert the ALTER worked
        - DROP it
        - attempt to USE it again, asserting it raises an InvalidRequest exception
        """
        session = self.prepare(create_keyspace=False)

        session.execute("CREATE KEYSPACE ks WITH replication = "
                        "{ 'class':'SimpleStrategy', 'replication_factor':1} "
                        "AND DURABLE_WRITES = true")

        session.execute("USE ks")

        session.execute("ALTER KEYSPACE ks WITH replication = "
                        "{ 'class' : 'NetworkTopologyStrategy', 'dc1' : 1 } "
                        "AND DURABLE_WRITES = false")

        session.execute("DROP KEYSPACE ks")
        assert_invalid(session, "USE ks", expected=InvalidRequest)
开发者ID:itskarlsson,项目名称:cassandra-dtest,代码行数:25,代码来源:cql_tests.py


示例6: udf_with_udt_test

    def udf_with_udt_test(self):
        """
        Test UDFs that operate on non-frozen UDTs.
        @jira_ticket CASSANDRA-7423
        @since 3.6
        """
        session = self.prepare()
        session.execute("create type test (a text, b int);")
        session.execute("create function funk(udt test) called on null input returns int language java as 'return Integer.valueOf(udt.getInt(\"b\"));';")

        if LooseVersion(self.cluster.version()) >= LooseVersion('3.6'):
            frozen_vals = (False, True)
        else:
            frozen_vals = (True,)

        for frozen in frozen_vals:
            debug("Using {} UDTs".format("frozen" if frozen else "non-frozen"))

            table_name = "tab_frozen" if frozen else "tab"
            column_type = "frozen<test>" if frozen else "test"
            session.execute("create table {} (key int primary key, udt {});".format(table_name, column_type))

            session.execute("insert into %s (key, udt) values (1, {a: 'un', b:1});" % (table_name,))
            session.execute("insert into %s (key, udt) values (2, {a: 'deux', b:2});" % (table_name,))
            session.execute("insert into %s (key, udt) values (3, {a: 'trois', b:3});" % (table_name,))

            assert_one(session, "select sum(funk(udt)) from {}".format(table_name), [6])

            assert_invalid(session, "drop type test;")
开发者ID:c-kodman,项目名称:cassandra-dtest,代码行数:29,代码来源:user_functions_test.py


示例7: drop_table_test

    def drop_table_test(self):
        """Test that we cannot drop a table without deleting its MVs first"""

        session = self.prepare(user_table=True)

        result = session.execute(("SELECT * FROM system_schema.materialized_views "
                                  "WHERE keyspace_name='ks' AND table_name='users'"))
        self.assertEqual(
            len(result), 1,
            "Expecting {} materialized view, got {}".format(1, len(result))
        )

        assert_invalid(
            session,
            "DROP TABLE ks.users;",
            "Cannot drop table when materialized views still depend on it"
        )

        result = session.execute(("SELECT * FROM system_schema.materialized_views "
                                  "WHERE keyspace_name='ks' AND table_name='users'"))
        self.assertEqual(
            len(result), 1,
            "Expecting {} materialized view, got {}".format(1, len(result))
        )

        session.execute("DROP MATERIALIZED VIEW ks.users_by_state;")
        session.execute("DROP TABLE ks.users;")

        result = session.execute(("SELECT * FROM system_schema.materialized_views "
                                  "WHERE keyspace_name='ks' AND table_name='users'"))
        self.assertEqual(
            len(result), 0,
            "Expecting {} materialized view, got {}".format(1, len(result))
        )
开发者ID:emolsson,项目名称:cassandra-dtest,代码行数:34,代码来源:materialized_views_test.py


示例8: drop_column_compact_test

    def drop_column_compact_test(self):
        session = self.prepare()

        session.execute("USE ks")
        session.execute("CREATE TABLE cf (key int PRIMARY KEY, c1 int, c2 int) WITH COMPACT STORAGE")

        assert_invalid(session, "ALTER TABLE cf DROP c1", "Cannot drop columns from a")
开发者ID:WorksApplications,项目名称:cassandra-dtest,代码行数:7,代码来源:schema_test.py


示例9: table_test

    def table_test(self):
        """
        CREATE TABLE, ALTER TABLE, TRUNCATE TABLE, DROP TABLE statements
        """
        session = self.prepare()

        session.execute("CREATE TABLE test1 (k int PRIMARY KEY, v1 int)")
        session.execute("CREATE TABLE test2 (k int, c1 int, v1 int, PRIMARY KEY (k, c1)) WITH COMPACT STORAGE")

        session.execute("ALTER TABLE test1 ADD v2 int")

        for i in xrange(0, 10):
            session.execute("INSERT INTO test1 (k, v1, v2) VALUES (%d, %d, %d)" % (i, i, i))
            session.execute("INSERT INTO test2 (k, c1, v1) VALUES (%d, %d, %d)" % (i, i, i))

        res = sorted(session.execute("SELECT * FROM test1"))
        assert rows_to_list(res) == [[i, i, i] for i in xrange(0, 10)], res

        res = sorted(session.execute("SELECT * FROM test2"))
        assert rows_to_list(res) == [[i, i, i] for i in xrange(0, 10)], res

        session.execute("TRUNCATE test1")
        session.execute("TRUNCATE test2")

        res = session.execute("SELECT * FROM test1")
        assert rows_to_list(res) == [], res

        res = session.execute("SELECT * FROM test2")
        assert rows_to_list(res) == [], res

        session.execute("DROP TABLE test1")
        session.execute("DROP TABLE test2")

        assert_invalid(session, "SELECT * FROM test1", expected=InvalidRequest)
        assert_invalid(session, "SELECT * FROM test2", expected=InvalidRequest)
开发者ID:steveandwang,项目名称:cassandra-dtest,代码行数:35,代码来源:cql_tests.py


示例10: udf_with_udt_keyspace_isolation_test

    def udf_with_udt_keyspace_isolation_test(self):
        """
        Ensure functions dont allow a UDT from another keyspace
        @jira_ticket CASSANDRA-9409
        @since 2.2
        """
        session = self.prepare()

        session.execute("create type udt (a text, b int);")
        self.create_ks(session, 'user_ks', 1)

        # ensure we cannot use a udt from another keyspace as function argument
        assert_invalid(
            session,
            "CREATE FUNCTION overloaded(v ks.udt) called on null input RETURNS text LANGUAGE java AS 'return \"f1\";'",
            "Statement on keyspace user_ks cannot refer to a user type in keyspace ks"
        )

        # ensure we cannot use a udt from another keyspace as return value
        assert_invalid(
            session,
            ("CREATE FUNCTION test(v text) called on null input RETURNS ks.udt "
             "LANGUAGE java AS 'return null;';"),
            "Statement on keyspace user_ks cannot refer to a user type in keyspace ks"
        )
开发者ID:lvsanche,项目名称:cassandra-dtest,代码行数:25,代码来源:user_functions_test.py


示例11: test_multi_index_filtering_query

    def test_multi_index_filtering_query(self):
        """
        asserts that having multiple indexes that cover all predicates still requires ALLOW FILTERING to also be present
        """
        cluster = self.cluster
        cluster.populate(1).start()
        node1, = cluster.nodelist()
        session = self.patient_cql_connection(node1)
        session.execute("CREATE KEYSPACE ks WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': '1'};")
        session.execute("USE ks;")
        session.execute("CREATE TABLE tbl (id uuid primary key, c0 text, c1 text, c2 text);")
        session.execute("CREATE INDEX ix_tbl_c0 ON tbl(c0);")
        session.execute("CREATE INDEX ix_tbl_c1 ON tbl(c1);")
        session.execute("INSERT INTO tbl (id, c0, c1, c2) values (uuid(), 'a', 'b', 'c');")
        session.execute("INSERT INTO tbl (id, c0, c1, c2) values (uuid(), 'a', 'b', 'c');")
        session.execute("INSERT INTO tbl (id, c0, c1, c2) values (uuid(), 'q', 'b', 'c');")
        session.execute("INSERT INTO tbl (id, c0, c1, c2) values (uuid(), 'a', 'e', 'f');")
        session.execute("INSERT INTO tbl (id, c0, c1, c2) values (uuid(), 'a', 'e', 'f');")

        rows = list(session.execute("SELECT * FROM tbl WHERE c0 = 'a';"))
        self.assertEqual(4, len(rows))

        stmt = "SELECT * FROM tbl WHERE c0 = 'a' AND c1 = 'b';"
        assert_invalid(session, stmt, "Cannot execute this query as it might involve data filtering and thus may have "
                                      "unpredictable performance. If you want to execute this query despite the "
                                      "performance unpredictability, use ALLOW FILTERING")

        rows = list(session.execute("SELECT * FROM tbl WHERE c0 = 'a' AND c1 = 'b' ALLOW FILTERING;"))
        self.assertEqual(2, len(rows))
开发者ID:blambov,项目名称:cassandra-dtest,代码行数:29,代码来源:secondary_indexes_test.py


示例12: test_type_as_part_of_pkey

    def test_type_as_part_of_pkey(self):
        """Tests user types as part of a composite pkey"""
        # make sure we can define a table with a user type as part of the pkey
        # and do a basic insert/query of data in that table.
        cluster = self.cluster
        cluster.populate(3).start()
        node1, node2, node3 = cluster.nodelist()
        session = self.patient_cql_connection(node1)
        self.create_ks(session, 'user_type_pkeys', 2)
        session.default_consistency_level = ConsistencyLevel.LOCAL_QUORUM

        stmt = """
              CREATE TYPE t_person_name (
              first text,
              middle text,
              last text
            )
           """
        session.execute(stmt)

        stmt = """
              CREATE TABLE person_likes (
              id uuid,
              name frozen<t_person_name>,
              like text,
              PRIMARY KEY ((id, name))
              )
           """
        session.execute(stmt)
        # Make sure the schema propagate
        time.sleep(2)

        _id = uuid.uuid4()

        stmt = """
              INSERT INTO person_likes (id, name, like)
              VALUES ({id}, {{first:'Nero', middle:'Claudius Caesar Augustus', last:'Germanicus'}}, 'arson');
           """.format(id=_id)
        session.execute(stmt)

        # attempt to query without the user type portion of the pkey and confirm there is an error
        stmt = """
              SELECT id, name.first from person_likes where id={id};
           """.format(id=_id)

        if self.cluster.version() >= '2.2':
            assert_invalid(session, stmt, 'Partition key parts: name must be restricted as other parts are')
        else:
            assert_invalid(session, stmt, 'Partition key part name must be restricted since preceding part is')

        stmt = """
              SELECT id, name.first, like from person_likes where id={id} and name = {{first:'Nero', middle: 'Claudius Caesar Augustus', last: 'Germanicus'}};
           """.format(id=_id)
        rows = session.execute(stmt)

        row_uuid, first_name, like = rows[0]
        self.assertEqual(first_name, u'Nero')
        self.assertEqual(like, u'arson')
开发者ID:c-kodman,项目名称:cassandra-dtest,代码行数:58,代码来源:user_types_test.py


示例13: only_one_timestamp_is_valid_test

 def only_one_timestamp_is_valid_test(self):
     """ Test that TIMESTAMP must not be used in the statements within the batch. """
     session = self.prepare()
     assert_invalid(session, """
         BEGIN BATCH USING TIMESTAMP 1111111111111111
         INSERT INTO users (id, firstname, lastname) VALUES (0, 'Jack', 'Sparrow') USING TIMESTAMP 2
         INSERT INTO users (id, firstname, lastname) VALUES (1, 'Will', 'Turner')
         APPLY BATCH
     """, matching="Timestamp must be set either on BATCH or individual statements")
开发者ID:alipourm,项目名称:cassandra-dtest,代码行数:9,代码来源:batch_test.py


示例14: cant_create_existing_user_test

    def cant_create_existing_user_test(self):
        self.prepare()

        session = self.get_session(user="cassandra", password="cassandra")
        session.execute("CREATE USER '[email protected]' WITH PASSWORD '12345' NOSUPERUSER")
        assert_invalid(
            session,
            "CREATE USER '[email protected]' WITH PASSWORD '12345' NOSUPERUSER",
            "[email protected] already exists",
        )
开发者ID:JeremiahDJordan,项目名称:cassandra-dtest,代码行数:10,代码来源:auth_test.py


示例15: unlogged_batch_rejects_counter_mutations_test

 def unlogged_batch_rejects_counter_mutations_test(self):
     """ Test that unlogged batch rejects counter mutations """
     cursor = self.prepare()
     assert_invalid(cursor, """
         BEGIN UNLOGGED BATCH
         INSERT INTO users (id, firstname, lastname) VALUES (0, 'Jack', 'Sparrow')
         INSERT INTO users (id, firstname, lastname) VALUES (2, 'Elizabeth', 'Swann')
         UPDATE clicks SET total = total + 1 WHERE userid = 1 AND url = 'http://foo.com'
         APPLY BATCH
     """, matching="Counter mutations are only allowed in COUNTER batches")
开发者ID:AlfredChenxf,项目名称:cassandra-dtest,代码行数:10,代码来源:batch_test.py


示例16: user_cant_drop_themselves_test

    def user_cant_drop_themselves_test(self):
        self.prepare()

        session = self.get_session(user="cassandra", password="cassandra")
        # handle different error messages between versions pre and post 2.2.0
        assert_invalid(
            session,
            "DROP USER cassandra",
            "(Users aren't allowed to DROP themselves|Cannot DROP primary role for current login)",
        )
开发者ID:JeremiahDJordan,项目名称:cassandra-dtest,代码行数:10,代码来源:auth_test.py


示例17: test_multiple_indexes_on_single_map_column

    def test_multiple_indexes_on_single_map_column(self):
        """
        verifying functionality of multiple unique secondary indexes on a single column
        @jira_ticket CASSANDRA-7771
        @since 3.0
        """
        cluster = self.cluster
        cluster.populate(1).start()
        [node1] = cluster.nodelist()
        session = self.patient_cql_connection(node1)
        self.create_ks(session, "map_double_index", 1)
        session.execute(
            """
                CREATE TABLE map_tbl (
                    id uuid primary key,
                    amap map<text, int>
                )
            """
        )
        session.execute("CREATE INDEX map_keys ON map_tbl(keys(amap))")
        session.execute("CREATE INDEX map_values ON map_tbl(amap)")
        session.execute("CREATE INDEX map_entries ON map_tbl(entries(amap))")

        # multiple indexes on a single column are allowed but identical duplicate indexes are not
        assert_invalid(
            session,
            "CREATE INDEX map_values_2 ON map_tbl(amap)",
            "Index map_values_2 is a duplicate of existing index map_values",
        )

        session.execute("INSERT INTO map_tbl (id, amap) values (uuid(), {'foo': 1, 'bar': 2});")
        session.execute("INSERT INTO map_tbl (id, amap) values (uuid(), {'faz': 1, 'baz': 2});")

        value_search = session.execute("SELECT * FROM map_tbl WHERE amap CONTAINS 1")
        self.assertEqual(2, len(value_search), "incorrect number of rows when querying on map values")

        key_search = session.execute("SELECT * FROM map_tbl WHERE amap CONTAINS KEY 'foo'")
        self.assertEqual(1, len(key_search), "incorrect number of rows when querying on map keys")

        entries_search = session.execute("SELECT * FROM map_tbl WHERE amap['foo'] = 1")
        self.assertEqual(1, len(entries_search), "incorrect number of rows when querying on map entries")

        session.cluster.refresh_schema_metadata()
        table_meta = session.cluster.metadata.keyspaces["map_double_index"].tables["map_tbl"]
        self.assertEqual(3, len(table_meta.indexes))
        self.assertItemsEqual(["map_keys", "map_values", "map_entries"], table_meta.indexes)
        self.assertEqual(3, len(session.cluster.metadata.keyspaces["map_double_index"].indexes))

        self.assertTrue("map_keys" in table_meta.export_as_string())
        self.assertTrue("map_values" in table_meta.export_as_string())
        self.assertTrue("map_entries" in table_meta.export_as_string())

        session.execute("DROP TABLE map_tbl")
        session.cluster.refresh_schema_metadata()
        self.assertEqual(0, len(session.cluster.metadata.keyspaces["map_double_index"].indexes))
开发者ID:emolsson,项目名称:cassandra-dtest,代码行数:55,代码来源:secondary_indexes_test.py


示例18: counter_batch_rejects_regular_mutations_test

 def counter_batch_rejects_regular_mutations_test(self):
     """ Test that counter batch rejects non-counter mutations """
     cursor = self.prepare()
     assert_invalid(cursor, """
         BEGIN COUNTER BATCH
         UPDATE clicks SET total = total + 1 WHERE userid = 1 and url = 'http://foo.com'
         UPDATE clicks SET total = total + 1 WHERE userid = 1 and url = 'http://bar.com'
         UPDATE clicks SET total = total + 1 WHERE userid = 2 and url = 'http://baz.com'
         INSERT INTO users (id, firstname, lastname) VALUES (0, 'Jack', 'Sparrow')
         APPLY BATCH
     """, matching="Only counter mutations are allowed in COUNTER batches")
开发者ID:AlfredChenxf,项目名称:cassandra-dtest,代码行数:11,代码来源:batch_test.py


示例19: unlogged_batch_rejects_counter_mutations_test

    def unlogged_batch_rejects_counter_mutations_test(self):
        """ Test that unlogged batch rejects counter mutations """
        session = self.prepare()
        err = "Counter and non-counter mutations cannot exist in the same batch"

        assert_invalid(session, """
            BEGIN UNLOGGED BATCH
            INSERT INTO users (id, firstname, lastname) VALUES (0, 'Jack', 'Sparrow')
            INSERT INTO users (id, firstname, lastname) VALUES (2, 'Elizabeth', 'Swann')
            UPDATE clicks SET total = total + 1 WHERE userid = 1 AND url = 'http://foo.com'
            APPLY BATCH
            """, matching=err)
开发者ID:alipourm,项目名称:cassandra-dtest,代码行数:12,代码来源:batch_test.py


示例20: logged_batch_rejects_counter_mutations_test

    def logged_batch_rejects_counter_mutations_test(self):
        """ Test that logged batch rejects counter mutations """
        session = self.prepare()
        err = "Cannot include a counter statement in a logged batch"

        assert_invalid(session, """
            BEGIN BATCH
            INSERT INTO users (id, firstname, lastname) VALUES (0, 'Jack', 'Sparrow')
            INSERT INTO users (id, firstname, lastname) VALUES (1, 'Will', 'Turner')
            UPDATE clicks SET total = total + 1 WHERE userid = 1 and url = 'http://foo.com'
            APPLY BATCH
            """, matching=err)
开发者ID:alipourm,项目名称:cassandra-dtest,代码行数:12,代码来源:batch_test.py



注:本文中的assertions.assert_invalid函数示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python assertions.assert_none函数代码示例发布时间:2022-05-24
下一篇:
Python assertions.assert_almost_equal函数代码示例发布时间:2022-05-24
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap