本文整理汇总了Python中assertions.assert_all函数的典型用法代码示例。如果您正苦于以下问题:Python assert_all函数的具体用法?Python assert_all怎么用?Python assert_all使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了assert_all函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: collection_map_ttl_test
def collection_map_ttl_test(self):
"""
Test that ttl has a granularity of elements using a map collection.
"""
self.prepare(default_time_to_live=6)
self.session1.execute("ALTER TABLE ttl_table ADD mymap map<int, int>;""")
start = time.time()
self.session1.execute("""
INSERT INTO ttl_table (key, col1, mymap) VALUES (%d, %d, %s);
""" % (1, 1, '{1:1,2:2,3:3,4:4,5:5}'))
self.session1.execute("""
UPDATE ttl_table USING TTL 2 SET mymap[1] = 42, mymap[5] = 42 WHERE key=1;
""")
assert_all(
self.session1,
"SELECT * FROM ttl_table;",
[[1, 1, None, None, OrderedDict([(1, 42), (2, 2), (3, 3), (4, 4), (5, 42)])]]
)
self.smart_sleep(start, 4)
assert_all(
self.session1,
"SELECT * FROM ttl_table;",
[[1, 1, None, None, OrderedDict([(2, 2), (3, 3), (4, 4)])]]
)
self.smart_sleep(start, 8)
assert_row_count(self.session1, 'ttl_table', 0)
开发者ID:thobbs,项目名称:cassandra-dtest,代码行数:28,代码来源:ttl_test.py
示例2: check_permissions
def check_permissions(self, node, upgraded):
# use an exclusive connection to ensure we only talk to the specified node
klaus = self.patient_exclusive_cql_connection(node, user='klaus', password='12345', timeout=20)
# klaus is a superuser, so should be able to list all permissions
# the output of LIST PERMISSIONS changes slightly with #7653 adding
# a new role column to results, so we need to tailor our check
# based on whether the node has been upgraded or not
if not upgraded:
assert_all(klaus,
'LIST ALL PERMISSIONS',
[['michael', '<table ks.cf1>', 'MODIFY'],
['michael', '<table ks.cf2>', 'SELECT']])
else:
assert_all(klaus,
'LIST ALL PERMISSIONS',
[['michael', 'michael', '<table ks.cf1>', 'MODIFY'],
['michael', 'michael', '<table ks.cf2>', 'SELECT']])
klaus.shutdown()
michael = self.patient_exclusive_cql_connection(node, user='michael', password='54321')
michael.execute('INSERT INTO ks.cf1 (id, val) VALUES (0,0)')
michael.execute('SELECT * FROM ks.cf2')
assert_invalid(michael,
'SELECT * FROM ks.cf1',
'User michael has no SELECT permission on <table ks.cf1> or any of its parents',
Unauthorized)
michael.shutdown()
开发者ID:agarwalmohit,项目名称:cassandra-dtest,代码行数:28,代码来源:upgrade_internal_auth_test.py
示例3: ttl_is_replicated_test
def ttl_is_replicated_test(self):
"""
Test that the ttl setting is replicated properly on all nodes
"""
self.prepare(default_time_to_live=5)
session1 = self.patient_exclusive_cql_connection(self.node1)
session2 = self.patient_exclusive_cql_connection(self.node2)
session1.execute("USE ks;")
session2.execute("USE ks;")
query = SimpleStatement(
"INSERT INTO ttl_table (key, col1) VALUES (1, 1);",
consistency_level=ConsistencyLevel.ALL
)
session1.execute(query)
assert_all(
session1,
"SELECT * FROM ttl_table;",
[[1, 1, None, None]],
cl=ConsistencyLevel.ALL
)
ttl_session1 = session1.execute('SELECT ttl(col1) FROM ttl_table;')
ttl_session2 = session2.execute('SELECT ttl(col1) FROM ttl_table;')
# since the two queries are not executed simultaneously, the remaining
# TTLs can differ by one second
self.assertLessEqual(abs(ttl_session1[0][0] - ttl_session2[0][0]), 1)
time.sleep(7)
assert_none(session1, "SELECT * FROM ttl_table;", cl=ConsistencyLevel.ALL)
开发者ID:thobbs,项目名称:cassandra-dtest,代码行数:31,代码来源:ttl_test.py
示例4: ttl_is_replicated_test
def ttl_is_replicated_test(self):
"""
Test that the ttl setting is replicated properly on all nodes
"""
self.prepare(default_time_to_live=5)
cursor1 = self.patient_exclusive_cql_connection(self.node1)
cursor2 = self.patient_exclusive_cql_connection(self.node2)
cursor1.execute("USE ks;")
cursor2.execute("USE ks;")
query = SimpleStatement(
"INSERT INTO ttl_table (key, col1) VALUES (1, 1);",
consistency_level=ConsistencyLevel.ALL
)
cursor1.execute(query)
assert_all(
cursor1,
"SELECT * FROM ttl_table;",
[[1, 1, None, None]],
cl=ConsistencyLevel.ALL
)
ttl_cursor1 = cursor1.execute('SELECT ttl(col1) FROM ttl_table;')
ttl_cursor2 = cursor2.execute('SELECT ttl(col1) FROM ttl_table;')
assert_almost_equal(ttl_cursor1[0][0], ttl_cursor2[0][0], error=0.05)
time.sleep(7)
assert_none(cursor1, "SELECT * FROM ttl_table;", cl=ConsistencyLevel.ALL)
开发者ID:slivne,项目名称:cassandra-dtest,代码行数:28,代码来源:ttl_test.py
示例5: upgrade_with_range_tombstones_test
def upgrade_with_range_tombstones_test(self):
"""
Checks sstable including range tombstone can be read after upgrade.
@jira_ticket CASSANDRA-10360
"""
ROWS = 100
session = self._setup_cluster()
session.execute('CREATE TABLE t (k int, t1 int, t2 int, PRIMARY KEY (k, t1, t2))')
for n in range(ROWS):
session.execute("INSERT INTO t(k, t1, t2) VALUES (0, 0, %d)" % n)
session.execute("DELETE FROM t WHERE k=0 AND t1=0")
for n in range(0, ROWS, 2):
session.execute("INSERT INTO t(k, t1, t2) VALUES (0, 0, %d)" % n)
session = self._do_upgrade()
assert_all(session, "SELECT * FROM t WHERE k = 0", [[0, 0, n] for n in range(0, ROWS, 2)])
self.cluster.compact()
开发者ID:c-kodman,项目名称:cassandra-dtest,代码行数:25,代码来源:storage_engine_upgrade_test.py
示例6: drop_column_queries_test
def drop_column_queries_test(self):
session = self.prepare()
session.execute("USE ks")
session.execute("CREATE TABLE cf (key int PRIMARY KEY, c1 int, c2 int)")
session.execute("CREATE INDEX ON cf(c2)")
# insert some data.
session.execute("INSERT INTO cf (key, c1, c2) VALUES (0, 1, 2)")
session.execute("INSERT INTO cf (key, c1, c2) VALUES (1, 2, 3)")
session.execute("INSERT INTO cf (key, c1, c2) VALUES (2, 3, 4)")
# drop and readd c1.
session.execute("ALTER TABLE cf DROP c1")
session.execute("ALTER TABLE cf ADD c1 int")
# add another row.
session.execute("INSERT INTO cf (key, c1, c2) VALUES (3, 4, 5)")
# test that old (pre-drop) c1 values aren't returned and new ones are.
assert_all(session, "SELECT c1 FROM cf", [[None], [None], [None], [4]], ignore_order=True)
assert_all(session, "SELECT * FROM cf", [[0, None, 2], [1, None, 3], [2, None, 4], [3, 4, 5]], ignore_order=True)
assert_one(session, "SELECT c1 FROM cf WHERE key = 0", [None])
assert_one(session, "SELECT c1 FROM cf WHERE key = 3", [4])
assert_one(session, "SELECT * FROM cf WHERE c2 = 2", [0, None, 2])
assert_one(session, "SELECT * FROM cf WHERE c2 = 5", [3, 4, 5])
开发者ID:alipourm,项目名称:cassandra-dtest,代码行数:31,代码来源:schema_test.py
示例7: collection_set_ttl_test
def collection_set_ttl_test(self):
"""
Test that ttl has a granularity of elements using a set collection.
"""
self.prepare(default_time_to_live=10)
self.session1.execute("ALTER TABLE ttl_table ADD myset set<int>;""")
start = time.time()
self.session1.execute("""
INSERT INTO ttl_table (key, col1, myset) VALUES (%d, %d, %s);
""" % (1, 1, '{1,2,3,4,5}'))
self.session1.execute("""
UPDATE ttl_table USING TTL 3 SET myset = myset + {42} WHERE key=1;
""")
assert_all(
self.session1,
"SELECT * FROM ttl_table;",
[[1, 1, None, None, sortedset([1, 2, 3, 4, 5, 42])]]
)
self.smart_sleep(start, 5)
assert_all(
self.session1,
"SELECT * FROM ttl_table;",
[[1, 1, None, None, sortedset([1, 2, 3, 4, 5])]]
)
self.smart_sleep(start, 12)
assert_row_count(self.session1, 'ttl_table', 0)
开发者ID:thobbs,项目名称:cassandra-dtest,代码行数:28,代码来源:ttl_test.py
示例8: replace_with_reset_resume_state_test
def replace_with_reset_resume_state_test(self):
"""Test replace with resetting bootstrap progress"""
cluster = self.cluster
cluster.populate(3).start()
node1, node2, node3 = cluster.nodelist()
node1.stress(['write', 'n=100K', 'no-warmup', '-schema', 'replication(factor=3)'])
session = self.patient_cql_connection(node1)
stress_table = 'keyspace1.standard1'
query = SimpleStatement('select * from %s LIMIT 1' % stress_table, consistency_level=ConsistencyLevel.THREE)
initial_data = rows_to_list(session.execute(query))
node3.stop(gently=False)
# kill node1 in the middle of streaming to let it fail
t = InterruptBootstrap(node1)
t.start()
# replace node 3 with node 4
debug("Starting node 4 to replace node 3")
node4 = Node('node4', cluster=cluster, auto_bootstrap=True, thrift_interface=('127.0.0.4', 9160),
storage_interface=('127.0.0.4', 7000), jmx_port='7400', remote_debug_port='0',
initial_token=None, binary_interface=('127.0.0.4', 9042))
# keep timeout low so that test won't hang
node4.set_configuration_options(values={'streaming_socket_timeout_in_ms': 1000})
cluster.add(node4, False)
try:
node4.start(jvm_args=["-Dcassandra.replace_address_first_boot=127.0.0.3"], wait_other_notice=False)
except NodeError:
pass # node doesn't start as expected
t.join()
node1.start()
# restart node4 bootstrap with resetting bootstrap state
node4.stop()
mark = node4.mark_log()
node4.start(jvm_args=[
"-Dcassandra.replace_address_first_boot=127.0.0.3",
"-Dcassandra.reset_bootstrap_progress=true"
])
# check if we reset bootstrap state
node4.watch_log_for("Resetting bootstrap progress to start fresh", from_mark=mark)
# wait for node3 ready to query
node4.watch_log_for("Listening for thrift clients...", from_mark=mark)
# check if 2nd bootstrap succeeded
assert_bootstrap_state(self, node4, 'COMPLETED')
# query should work again
debug("Stopping old nodes")
node1.stop(gently=False, wait_other_notice=True)
node2.stop(gently=False, wait_other_notice=True)
debug("Verifying data on new node.")
session = self.patient_exclusive_cql_connection(node4)
assert_all(session, 'SELECT * from {} LIMIT 1'.format(stress_table),
expected=initial_data,
cl=ConsistencyLevel.ONE)
开发者ID:thobbs,项目名称:cassandra-dtest,代码行数:60,代码来源:replace_address_test.py
示例9: resumable_replace_test
def resumable_replace_test(self):
"""
Test resumable bootstrap while replacing node. Feature introduced in
2.2 with ticket https://issues.apache.org/jira/browse/CASSANDRA-8838
@jira_ticket https://issues.apache.org/jira/browse/CASSANDRA-8838
"""
cluster = self.cluster
cluster.populate(3).start()
node1, node2, node3 = cluster.nodelist()
node1.stress(['write', 'n=100K', 'no-warmup', '-schema', 'replication(factor=3)'])
session = self.patient_cql_connection(node1)
stress_table = 'keyspace1.standard1'
query = SimpleStatement('select * from %s LIMIT 1' % stress_table, consistency_level=ConsistencyLevel.THREE)
initial_data = rows_to_list(session.execute(query))
node3.stop(gently=False)
# kill node1 in the middle of streaming to let it fail
t = InterruptBootstrap(node1)
t.start()
# replace node 3 with node 4
debug("Starting node 4 to replace node 3")
node4 = Node('node4', cluster=cluster, auto_bootstrap=True, thrift_interface=('127.0.0.4', 9160),
storage_interface=('127.0.0.4', 7000), jmx_port='7400', remote_debug_port='0',
initial_token=None, binary_interface=('127.0.0.4', 9042))
# keep timeout low so that test won't hang
node4.set_configuration_options(values={'streaming_socket_timeout_in_ms': 1000})
cluster.add(node4, False)
try:
node4.start(jvm_args=["-Dcassandra.replace_address_first_boot=127.0.0.3"], wait_other_notice=False)
except NodeError:
pass # node doesn't start as expected
t.join()
# bring back node1 and invoke nodetool bootstrap to resume bootstrapping
node1.start()
node4.nodetool('bootstrap resume')
# check if we skipped already retrieved ranges
node4.watch_log_for("already available. Skipping streaming.")
# wait for node3 ready to query
node4.watch_log_for("Listening for thrift clients...")
# check if 2nd bootstrap succeeded
assert_bootstrap_state(self, node4, 'COMPLETED')
# query should work again
debug("Stopping old nodes")
node1.stop(gently=False, wait_other_notice=True)
node2.stop(gently=False, wait_other_notice=True)
debug("Verifying data on new node.")
session = self.patient_exclusive_cql_connection(node4)
assert_all(session, 'SELECT * from {} LIMIT 1'.format(stress_table),
expected=initial_data,
cl=ConsistencyLevel.ONE)
开发者ID:thobbs,项目名称:cassandra-dtest,代码行数:59,代码来源:replace_address_test.py
示例10: unlogged_batch_accepts_regular_mutations_test
def unlogged_batch_accepts_regular_mutations_test(self):
""" Test that unlogged batch accepts regular mutations """
session = self.prepare()
session.execute("""
BEGIN UNLOGGED BATCH
INSERT INTO users (id, firstname, lastname) VALUES (0, 'Jack', 'Sparrow')
INSERT INTO users (id, firstname, lastname) VALUES (2, 'Elizabeth', 'Swann')
APPLY BATCH
""")
assert_all(session, "SELECT * FROM users", [[0, u'Jack', u'Sparrow'], [2, u'Elizabeth', u'Swann']])
开发者ID:alipourm,项目名称:cassandra-dtest,代码行数:10,代码来源:batch_test.py
示例11: logged_batch_accepts_regular_mutations_test
def logged_batch_accepts_regular_mutations_test(self):
""" Test that logged batch accepts regular mutations """
session = self.prepare()
session.execute("""
BEGIN BATCH
INSERT INTO users (id, firstname, lastname) VALUES (0, 'Jack', 'Sparrow')
INSERT INTO users (id, firstname, lastname) VALUES (1, 'Will', 'Turner')
APPLY BATCH
""")
assert_all(session, "SELECT * FROM users", [[1, u'Will', u'Turner'], [0, u'Jack', u'Sparrow']])
开发者ID:alipourm,项目名称:cassandra-dtest,代码行数:10,代码来源:batch_test.py
示例12: batch_uses_proper_timestamp_test
def batch_uses_proper_timestamp_test(self):
""" Test that each statement will be executed with provided BATCH timestamp """
session = self.prepare()
session.execute("""
BEGIN BATCH USING TIMESTAMP 1111111111111111
INSERT INTO users (id, firstname, lastname) VALUES (0, 'Jack', 'Sparrow')
INSERT INTO users (id, firstname, lastname) VALUES (1, 'Will', 'Turner')
APPLY BATCH
""")
query = "SELECT id, writetime(firstname), writetime(lastname) FROM users"
assert_all(session, query, [[1, 1111111111111111, 1111111111111111], [0, 1111111111111111, 1111111111111111]])
开发者ID:alipourm,项目名称:cassandra-dtest,代码行数:11,代码来源:batch_test.py
示例13: counter_batch_accepts_counter_mutations_test
def counter_batch_accepts_counter_mutations_test(self):
""" Test that counter batch accepts counter mutations """
session = self.prepare()
session.execute("""
BEGIN COUNTER BATCH
UPDATE clicks SET total = total + 1 WHERE userid = 1 and url = 'http://foo.com'
UPDATE clicks SET total = total + 1 WHERE userid = 1 and url = 'http://bar.com'
UPDATE clicks SET total = total + 1 WHERE userid = 2 and url = 'http://baz.com'
APPLY BATCH
""")
assert_all(session, "SELECT total FROM clicks", [[1], [1], [1]])
开发者ID:alipourm,项目名称:cassandra-dtest,代码行数:11,代码来源:batch_test.py
示例14: logged_batch_doesnt_throw_uae_test
def logged_batch_doesnt_throw_uae_test(self):
""" Test that logged batch DOES NOT throw UAE if there are at least 2 live nodes """
session = self.prepare(nodes=3)
self.cluster.nodelist()[-1].stop(wait_other_notice=True)
query = SimpleStatement("""
BEGIN BATCH
INSERT INTO users (id, firstname, lastname) VALUES (0, 'Jack', 'Sparrow')
INSERT INTO users (id, firstname, lastname) VALUES (1, 'Will', 'Turner')
APPLY BATCH
""", consistency_level=ConsistencyLevel.ANY)
session.execute(query)
assert_all(session, "SELECT * FROM users", [[1, u'Will', u'Turner'], [0, u'Jack', u'Sparrow']])
开发者ID:alipourm,项目名称:cassandra-dtest,代码行数:12,代码来源:batch_test.py
示例15: update_single_column_ttl_test
def update_single_column_ttl_test(self):
""" Test that specifying a TTL on a single column works """
self.prepare()
self.session1.execute("""
INSERT INTO ttl_table (key, col1, col2, col3) VALUES (%d, %d, %d, %d);
""" % (1, 1, 1, 1))
start = time.time()
self.session1.execute("UPDATE ttl_table USING TTL 3 set col1=42 where key=%s;" % (1,))
assert_all(self.session1, "SELECT * FROM ttl_table;", [[1, 42, 1, 1]])
self.smart_sleep(start, 5)
assert_all(self.session1, "SELECT * FROM ttl_table;", [[1, None, 1, 1]])
开发者ID:thobbs,项目名称:cassandra-dtest,代码行数:13,代码来源:ttl_test.py
示例16: remove_column_ttl_test
def remove_column_ttl_test(self):
"""
Test that removing a column ttl works.
"""
self.prepare()
start = time.time()
self.session1.execute("""
INSERT INTO ttl_table (key, col1, col2, col3) VALUES (%d, %d, %d, %d) USING TTL 2;
""" % (1, 1, 1, 1))
self.session1.execute("UPDATE ttl_table set col1=42 where key=%s;" % (1,))
self.smart_sleep(start, 4)
assert_all(self.session1, "SELECT * FROM ttl_table;", [[1, 42, None, None]])
开发者ID:thobbs,项目名称:cassandra-dtest,代码行数:14,代码来源:ttl_test.py
示例17: table_test
def table_test(self):
"""
Smoke test that basic table operations work:
- create 2 tables, one with and one without COMPACT STORAGE
- ALTER the table without COMPACT STORAGE, adding a column
For each of those tables:
- insert 10 values
- SELECT * and assert the values are there
- TRUNCATE the table
- SELECT * and assert there are no values
- DROP the table
- SELECT * and assert the statement raises an InvalidRequest
# TODO run SELECTs to make sure each statement works
"""
session = self.prepare()
ks_meta = UpdatingKeyspaceMetadataWrapper(session.cluster, ks_name='ks')
session.execute("CREATE TABLE test1 (k int PRIMARY KEY, v1 int)")
self.assertIn('test1', ks_meta.tables)
session.execute("CREATE TABLE test2 (k int, c1 int, v1 int, PRIMARY KEY (k, c1)) WITH COMPACT STORAGE")
self.assertIn('test2', ks_meta.tables)
t1_meta = UpdatingTableMetadataWrapper(session.cluster, ks_name='ks', table_name='test1')
session.execute("ALTER TABLE test1 ADD v2 int")
self.assertIn('v2', t1_meta.columns)
for i in range(0, 10):
session.execute("INSERT INTO test1 (k, v1, v2) VALUES ({i}, {i}, {i})".format(i=i))
session.execute("INSERT INTO test2 (k, c1, v1) VALUES ({i}, {i}, {i})".format(i=i))
assert_all(session, "SELECT * FROM test1", [[i, i, i] for i in range(0, 10)], ignore_order=True)
assert_all(session, "SELECT * FROM test2", [[i, i, i] for i in range(0, 10)], ignore_order=True)
session.execute("TRUNCATE test1")
session.execute("TRUNCATE test2")
assert_none(session, "SELECT * FROM test1")
assert_none(session, "SELECT * FROM test2")
session.execute("DROP TABLE test1")
self.assertNotIn('test1', ks_meta.tables)
session.execute("DROP TABLE test2")
self.assertNotIn('test2', ks_meta.tables)
开发者ID:alipourm,项目名称:cassandra-dtest,代码行数:50,代码来源:cql_tests.py
示例18: upgrade_with_statics_test
def upgrade_with_statics_test(self):
"""
Validates we can read legacy sstables with static columns.
"""
PARTITIONS = 1
ROWS = 10
session = self._setup_cluster()
session.execute('CREATE TABLE t (k int, s1 int static, s2 int static, t int, v1 int, v2 int, PRIMARY KEY (k, t))')
for n in range(PARTITIONS):
for r in range(ROWS):
session.execute("INSERT INTO t(k, s1, s2, t, v1, v2) VALUES (%d, %d, %d, %d, %d, %d)" % (n, r, r + 1, r, r, r + 1))
session = self._do_upgrade()
for n in range(PARTITIONS):
assert_all(session,
"SELECT * FROM t WHERE k = %d" % (n),
[[n, v, ROWS - 1, ROWS, v, v + 1] for v in range(ROWS)])
assert_all(session,
"SELECT * FROM t WHERE k = %d ORDER BY t DESC" % (n),
[[n, v, ROWS - 1, ROWS, v, v + 1] for v in range(ROWS - 1, -1, -1)])
self.cluster.compact()
for n in range(PARTITIONS):
assert_all(session,
"SELECT * FROM t WHERE k = %d" % (n),
[[n, v, ROWS - 1, ROWS, v, v + 1] for v in range(ROWS)])
assert_all(session,
"SELECT * FROM t WHERE k = %d ORDER BY t DESC" % (n),
[[n, v, ROWS - 1, ROWS, v, v + 1] for v in range(ROWS - 1, -1, -1)])
开发者ID:c-kodman,项目名称:cassandra-dtest,代码行数:34,代码来源:storage_engine_upgrade_test.py
示例19: update_multiple_columns_ttl_test
def update_multiple_columns_ttl_test(self):
""" Test that specifying a TTL on multiple columns works """
self.prepare()
self.cursor1.execute("""
INSERT INTO ttl_table (key, col1, col2, col3) VALUES (%d, %d, %d, %d);
""" % (1, 1, 1, 1))
start = time.time()
self.cursor1.execute("""
UPDATE ttl_table USING TTL 2 set col1=42, col2=42, col3=42 where key=%s;
""" % (1,))
assert_all(self.cursor1, "SELECT * FROM ttl_table;", [[1, 42, 42, 42]])
self.smart_sleep(start, 4)
assert_all(self.cursor1, "SELECT * FROM ttl_table;", [[1, None, None, None]])
开发者ID:exg77,项目名称:cassandra-dtest,代码行数:15,代码来源:ttl_test.py
示例20: ttl_is_respected_on_repair_test
def ttl_is_respected_on_repair_test(self):
""" Test that ttl is respected on repair """
self.prepare()
self.session1.execute("""
ALTER KEYSPACE ks WITH REPLICATION =
{'class' : 'SimpleStrategy', 'replication_factor' : 1};
""")
self.session1.execute("""
INSERT INTO ttl_table (key, col1) VALUES (1, 1) USING TTL 5;
""")
self.session1.execute("""
INSERT INTO ttl_table (key, col1) VALUES (2, 2) USING TTL 1000;
""")
assert_all(
self.session1,
"SELECT * FROM ttl_table;",
[[1, 1, None, None], [2, 2, None, None]]
)
time.sleep(7)
self.node1.stop()
session2 = self.patient_exclusive_cql_connection(self.node2)
session2.execute("USE ks;")
assert_unavailable(session2.execute, "SELECT * FROM ttl_table;")
self.node1.start(wait_for_binary_proto=True)
self.session1 = self.patient_exclusive_cql_connection(self.node1)
self.session1.execute("USE ks;")
self.session1.execute("""
ALTER KEYSPACE ks WITH REPLICATION =
{'class' : 'SimpleStrategy', 'replication_factor' : 2};
""")
self.node1.repair(['ks'])
ttl_start = time.time()
ttl_session1 = self.session1.execute('SELECT ttl(col1) FROM ttl_table;')
self.node1.stop()
assert_row_count(session2, 'ttl_table', 1)
assert_all(
session2,
"SELECT * FROM ttl_table;",
[[2, 2, None, None]]
)
# Check that the TTL on both server are the same
ttl_session2 = session2.execute('SELECT ttl(col1) FROM ttl_table;')
ttl_session1 = ttl_session1[0][0] - (time.time() - ttl_start)
assert_almost_equal(ttl_session1, ttl_session2[0][0], error=0.005)
开发者ID:thobbs,项目名称:cassandra-dtest,代码行数:48,代码来源:ttl_test.py
注:本文中的assertions.assert_all函数示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论