本文整理汇总了Java中com.google.pubsub.v1.AcknowledgeRequest类的典型用法代码示例。如果您正苦于以下问题:Java AcknowledgeRequest类的具体用法?Java AcknowledgeRequest怎么用?Java AcknowledgeRequest使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
AcknowledgeRequest类属于com.google.pubsub.v1包,在下文中一共展示了AcknowledgeRequest类的15个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: testPollInRegularCase
import com.google.pubsub.v1.AcknowledgeRequest; //导入依赖的package包/类
/**
* Tests that when ackMessages() succeeds and the subsequent call to poll() has no messages, that
* the subscriber does not invoke ackMessages because there should be no acks.
*/
@Test
public void testPollInRegularCase() throws Exception {
task.start(props);
ReceivedMessage rm1 = createReceivedMessage(ACK_ID1, CPS_MESSAGE, new HashMap<String, String>());
PullResponse stubbedPullResponse = PullResponse.newBuilder().addReceivedMessages(rm1).build();
when(subscriber.pull(any(PullRequest.class)).get()).thenReturn(stubbedPullResponse);
List<SourceRecord> result = task.poll();
assertEquals(1, result.size());
stubbedPullResponse = PullResponse.newBuilder().build();
ListenableFuture<Empty> goodFuture = Futures.immediateFuture(Empty.getDefaultInstance());
when(subscriber.ackMessages(any(AcknowledgeRequest.class))).thenReturn(goodFuture);
when(subscriber.pull(any(PullRequest.class)).get()).thenReturn(stubbedPullResponse);
result = task.poll();
assertEquals(0, result.size());
result = task.poll();
assertEquals(0, result.size());
verify(subscriber, times(1)).ackMessages(any(AcknowledgeRequest.class));
}
开发者ID:GoogleCloudPlatform,项目名称:pubsub,代码行数:23,代码来源:CloudPubSubSourceTaskTest.java
示例2: testPollWithNoMessageKeyAttribute
import com.google.pubsub.v1.AcknowledgeRequest; //导入依赖的package包/类
/**
* Tests when the message(s) retrieved from Cloud Pub/Sub do not have an attribute that matches
* {@link #KAFKA_MESSAGE_KEY_ATTRIBUTE}.
*/
@Test
public void testPollWithNoMessageKeyAttribute() throws Exception {
task.start(props);
ReceivedMessage rm = createReceivedMessage(ACK_ID1, CPS_MESSAGE, new HashMap<String, String>());
PullResponse stubbedPullResponse = PullResponse.newBuilder().addReceivedMessages(rm).build();
when(subscriber.pull(any(PullRequest.class)).get()).thenReturn(stubbedPullResponse);
List<SourceRecord> result = task.poll();
verify(subscriber, never()).ackMessages(any(AcknowledgeRequest.class));
assertEquals(1, result.size());
SourceRecord expected =
new SourceRecord(
null,
null,
KAFKA_TOPIC,
0,
Schema.OPTIONAL_STRING_SCHEMA,
null,
Schema.BYTES_SCHEMA,
KAFKA_VALUE);
assertRecordsEqual(expected, result.get(0));
}
开发者ID:GoogleCloudPlatform,项目名称:pubsub,代码行数:26,代码来源:CloudPubSubSourceTaskTest.java
示例3: testPollWithMessageKeyAttribute
import com.google.pubsub.v1.AcknowledgeRequest; //导入依赖的package包/类
/**
* Tests when the message(s) retrieved from Cloud Pub/Sub do have an attribute that matches {@link
* #KAFKA_MESSAGE_KEY_ATTRIBUTE}.
*/
@Test
public void testPollWithMessageKeyAttribute() throws Exception {
task.start(props);
Map<String, String> attributes = new HashMap<>();
attributes.put(KAFKA_MESSAGE_KEY_ATTRIBUTE, KAFKA_MESSAGE_KEY_ATTRIBUTE_VALUE);
ReceivedMessage rm = createReceivedMessage(ACK_ID1, CPS_MESSAGE, attributes);
PullResponse stubbedPullResponse = PullResponse.newBuilder().addReceivedMessages(rm).build();
when(subscriber.pull(any(PullRequest.class)).get()).thenReturn(stubbedPullResponse);
List<SourceRecord> result = task.poll();
verify(subscriber, never()).ackMessages(any(AcknowledgeRequest.class));
assertEquals(1, result.size());
SourceRecord expected =
new SourceRecord(
null,
null,
KAFKA_TOPIC,
0,
Schema.OPTIONAL_STRING_SCHEMA,
KAFKA_MESSAGE_KEY_ATTRIBUTE_VALUE,
Schema.BYTES_SCHEMA,
KAFKA_VALUE);
assertRecordsEqual(expected, result.get(0));
}
开发者ID:GoogleCloudPlatform,项目名称:pubsub,代码行数:28,代码来源:CloudPubSubSourceTaskTest.java
示例4: testPollWithMessageTimestampAttribute
import com.google.pubsub.v1.AcknowledgeRequest; //导入依赖的package包/类
/**
* Tests when the message(s) retrieved from Cloud Pub/Sub do have an attribute that matches {@link
* #KAFKA_MESSAGE_TIMESTAMP_ATTRIBUTE} and {@link #KAFKA_MESSAGE_KEY_ATTRIBUTE}.
*/
@Test
public void testPollWithMessageTimestampAttribute() throws Exception{
task.start(props);
Map<String, String> attributes = new HashMap<>();
attributes.put(KAFKA_MESSAGE_KEY_ATTRIBUTE, KAFKA_MESSAGE_KEY_ATTRIBUTE_VALUE);
attributes.put(KAFKA_MESSAGE_TIMESTAMP_ATTRIBUTE, KAFKA_MESSAGE_TIMESTAMP_ATTRIBUTE_VALUE);
ReceivedMessage rm = createReceivedMessage(ACK_ID1, CPS_MESSAGE, attributes);
PullResponse stubbedPullResponse = PullResponse.newBuilder().addReceivedMessages(rm).build();
when(subscriber.pull(any(PullRequest.class)).get()).thenReturn(stubbedPullResponse);
List<SourceRecord> result = task.poll();
verify(subscriber, never()).ackMessages(any(AcknowledgeRequest.class));
assertEquals(1, result.size());
SourceRecord expected =
new SourceRecord(
null,
null,
KAFKA_TOPIC,
0,
Schema.OPTIONAL_STRING_SCHEMA,
KAFKA_MESSAGE_KEY_ATTRIBUTE_VALUE,
Schema.BYTES_SCHEMA,
KAFKA_VALUE, Long.parseLong(KAFKA_MESSAGE_TIMESTAMP_ATTRIBUTE_VALUE));
assertRecordsEqual(expected, result.get(0));
}
开发者ID:GoogleCloudPlatform,项目名称:pubsub,代码行数:29,代码来源:CloudPubSubSourceTaskTest.java
示例5: testPollWithPartitionSchemeHashValue
import com.google.pubsub.v1.AcknowledgeRequest; //导入依赖的package包/类
/** Tests that the correct partition is assigned when the partition scheme is "hash_value". */
@Test
public void testPollWithPartitionSchemeHashValue() throws Exception {
props.put(
CloudPubSubSourceConnector.KAFKA_PARTITION_SCHEME_CONFIG,
CloudPubSubSourceConnector.PartitionScheme.HASH_VALUE.toString());
task.start(props);
ReceivedMessage rm = createReceivedMessage(ACK_ID1, CPS_MESSAGE, new HashMap<String, String>());
PullResponse stubbedPullResponse = PullResponse.newBuilder().addReceivedMessages(rm).build();
when(subscriber.pull(any(PullRequest.class)).get()).thenReturn(stubbedPullResponse);
List<SourceRecord> result = task.poll();
verify(subscriber, never()).ackMessages(any(AcknowledgeRequest.class));
assertEquals(1, result.size());
SourceRecord expected =
new SourceRecord(
null,
null,
KAFKA_TOPIC,
KAFKA_VALUE.hashCode() % Integer.parseInt(KAFKA_PARTITIONS),
Schema.OPTIONAL_STRING_SCHEMA,
null,
Schema.BYTES_SCHEMA,
KAFKA_VALUE);
assertRecordsEqual(expected, result.get(0));
}
开发者ID:GoogleCloudPlatform,项目名称:pubsub,代码行数:26,代码来源:CloudPubSubSourceTaskTest.java
示例6: pull
import com.google.pubsub.v1.AcknowledgeRequest; //导入依赖的package包/类
/**
* Pulls messages synchronously, on demand, using the pull request in argument.
*
* <p>
* This method acknowledges all received messages.
* @param pullRequest pull request containing the subscription name
* @return the list of {@link PubsubMessage} containing the headers and payload
*/
private List<PubsubMessage> pull(PullRequest pullRequest, RetrySettings retrySettings) {
Assert.notNull(pullRequest, "The pull request cannot be null.");
try {
SubscriberStub subscriber = this.subscriberFactory.createSubscriberStub(retrySettings);
Assert.notNull(subscriber, "A SubscriberStub is needed to execute the pull request.");
PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);
// Ack received messages.
if (pullResponse.getReceivedMessagesCount() > 0) {
List<String> ackIds = pullResponse.getReceivedMessagesList().stream()
.map(ReceivedMessage::getAckId)
.collect(Collectors.toList());
AcknowledgeRequest acknowledgeRequest = AcknowledgeRequest.newBuilder()
.setSubscriptionWithSubscriptionName(
pullRequest.getSubscriptionAsSubscriptionName())
.addAllAckIds(ackIds)
.build();
subscriber.acknowledgeCallable().call(acknowledgeRequest);
}
return pullResponse.getReceivedMessagesList().stream()
.map(ReceivedMessage::getMessage)
.collect(Collectors.toList());
}
catch (Exception ioe) {
throw new PubSubException("Error pulling messages from subscription "
+ pullRequest.getSubscription() + ".", ioe);
}
}
开发者ID:spring-cloud,项目名称:spring-cloud-gcp,代码行数:42,代码来源:PubSubTemplate.java
示例7: ackMessages
import com.google.pubsub.v1.AcknowledgeRequest; //导入依赖的package包/类
/**
* Attempt to ack all ids in {@link #ackIds}.
*/
private void ackMessages() {
if (ackIds.size() != 0) {
AcknowledgeRequest.Builder requestBuilder = AcknowledgeRequest.newBuilder()
.setSubscription(cpsSubscription);
final Set<String> ackIdsBatch = new HashSet<>();
synchronized (ackIds) {
requestBuilder.addAllAckIds(ackIds);
ackIdsInFlight.addAll(ackIds);
ackIdsBatch.addAll(ackIds);
ackIds.clear();
}
ListenableFuture<Empty> response = subscriber.ackMessages(requestBuilder.build());
Futures.addCallback(
response,
new FutureCallback<Empty>() {
@Override
public void onSuccess(Empty result) {
ackIdsInFlight.removeAll(ackIdsBatch);
log.trace("Successfully acked a set of messages.");
}
@Override
public void onFailure(Throwable t) {
ackIds.addAll(ackIdsBatch);
ackIdsInFlight.removeAll(ackIdsBatch);
log.error("An exception occurred acking messages: " + t);
}
});
}
}
开发者ID:GoogleCloudPlatform,项目名称:pubsub,代码行数:34,代码来源:CloudPubSubSourceTask.java
示例8: testPollCaseWithNoMessages
import com.google.pubsub.v1.AcknowledgeRequest; //导入依赖的package包/类
/** Tests when no messages are received from the Cloud Pub/Sub PullResponse. */
@Test
public void testPollCaseWithNoMessages() throws Exception {
task.start(props);
PullResponse stubbedPullResponse = PullResponse.newBuilder().build();
when(subscriber.pull(any(PullRequest.class)).get()).thenReturn(stubbedPullResponse);
assertEquals(0, task.poll().size());
verify(subscriber, never()).ackMessages(any(AcknowledgeRequest.class));
}
开发者ID:GoogleCloudPlatform,项目名称:pubsub,代码行数:10,代码来源:CloudPubSubSourceTaskTest.java
示例9: testPollWithMultipleAttributes
import com.google.pubsub.v1.AcknowledgeRequest; //导入依赖的package包/类
/**
* Tests when the message retrieved from Cloud Pub/Sub have several attributes, including
* one that matches {@link #KAFKA_MESSAGE_KEY_ATTRIBUTE}.
*/
@Test
public void testPollWithMultipleAttributes() throws Exception {
task.start(props);
Map<String, String> attributes = new HashMap<>();
attributes.put(KAFKA_MESSAGE_KEY_ATTRIBUTE, KAFKA_MESSAGE_KEY_ATTRIBUTE_VALUE);
attributes.put("attribute1", "attribute_value1");
attributes.put("attribute2", "attribute_value2");
ReceivedMessage rm = createReceivedMessage(ACK_ID1, CPS_MESSAGE, attributes);
PullResponse stubbedPullResponse = PullResponse.newBuilder().addReceivedMessages(rm).build();
when(subscriber.pull(any(PullRequest.class)).get()).thenReturn(stubbedPullResponse);
List<SourceRecord> result = task.poll();
verify(subscriber, never()).ackMessages(any(AcknowledgeRequest.class));
assertEquals(1, result.size());
Schema expectedSchema =
SchemaBuilder.struct()
.field(ConnectorUtils.KAFKA_MESSAGE_CPS_BODY_FIELD, Schema.BYTES_SCHEMA)
.field("attribute1", Schema.STRING_SCHEMA)
.field("attribute2", Schema.STRING_SCHEMA)
.build();
Struct expectedValue = new Struct(expectedSchema)
.put(ConnectorUtils.KAFKA_MESSAGE_CPS_BODY_FIELD, KAFKA_VALUE)
.put("attribute1", "attribute_value1")
.put("attribute2", "attribute_value2");
SourceRecord expected =
new SourceRecord(
null,
null,
KAFKA_TOPIC,
0,
Schema.OPTIONAL_STRING_SCHEMA,
KAFKA_MESSAGE_KEY_ATTRIBUTE_VALUE,
expectedSchema,
expectedValue);
assertRecordsEqual(expected, result.get(0));
}
开发者ID:GoogleCloudPlatform,项目名称:pubsub,代码行数:40,代码来源:CloudPubSubSourceTaskTest.java
示例10: acknowledge
import com.google.pubsub.v1.AcknowledgeRequest; //导入依赖的package包/类
@Override
public void acknowledge(SubscriptionPath subscription, List<String> ackIds)
throws IOException {
AcknowledgeRequest request = AcknowledgeRequest.newBuilder()
.setSubscription(subscription.getPath())
.addAllAckIds(ackIds)
.build();
subscriberStub().acknowledge(request); // ignore Empty result.
}
开发者ID:apache,项目名称:beam,代码行数:10,代码来源:PubsubGrpcClient.java
示例11: ackMessages
import com.google.pubsub.v1.AcknowledgeRequest; //导入依赖的package包/类
public ListenableFuture<Empty> ackMessages(AcknowledgeRequest request) {
return subscriber.acknowledge(request);
}
开发者ID:GoogleCloudPlatform,项目名称:pubsub,代码行数:4,代码来源:CloudPubSubGRPCSubscriber.java
示例12: ackMessages
import com.google.pubsub.v1.AcknowledgeRequest; //导入依赖的package包/类
@Override
public ListenableFuture<Empty> ackMessages(AcknowledgeRequest request) {
currentSubscriberIndex = (currentSubscriberIndex + 1) % subscribers.size();
return subscribers.get(currentSubscriberIndex).ackMessages(request);
}
开发者ID:GoogleCloudPlatform,项目名称:pubsub,代码行数:6,代码来源:CloudPubSubRoundRobinSubscriber.java
示例13: testPollWithPartitionSchemeHashKey
import com.google.pubsub.v1.AcknowledgeRequest; //导入依赖的package包/类
/**
* Tests that the correct partition is assigned when the partition scheme is "hash_key". The test
* has two cases, one where a key does exist and one where it does not.
*/
@Test
public void testPollWithPartitionSchemeHashKey() throws Exception {
props.put(
CloudPubSubSourceConnector.KAFKA_PARTITION_SCHEME_CONFIG,
CloudPubSubSourceConnector.PartitionScheme.HASH_KEY.toString());
task.start(props);
Map<String, String> attributes = new HashMap<>();
attributes.put(KAFKA_MESSAGE_KEY_ATTRIBUTE, KAFKA_MESSAGE_KEY_ATTRIBUTE_VALUE);
ReceivedMessage withoutKey = createReceivedMessage(ACK_ID1, CPS_MESSAGE, new HashMap<String, String>());
ReceivedMessage withKey = createReceivedMessage(ACK_ID2, CPS_MESSAGE, attributes);
PullResponse stubbedPullResponse =
PullResponse.newBuilder()
.addReceivedMessages(0, withKey)
.addReceivedMessages(1, withoutKey)
.build();
when(subscriber.pull(any(PullRequest.class)).get()).thenReturn(stubbedPullResponse);
List<SourceRecord> result = task.poll();
verify(subscriber, never()).ackMessages(any(AcknowledgeRequest.class));
assertEquals(2, result.size());
SourceRecord expectedForMessageWithKey =
new SourceRecord(
null,
null,
KAFKA_TOPIC,
KAFKA_MESSAGE_KEY_ATTRIBUTE_VALUE.hashCode() % Integer.parseInt(KAFKA_PARTITIONS),
Schema.OPTIONAL_STRING_SCHEMA,
KAFKA_MESSAGE_KEY_ATTRIBUTE_VALUE,
Schema.BYTES_SCHEMA,
KAFKA_VALUE);
SourceRecord expectedForMessageWithoutKey =
new SourceRecord(
null,
null,
KAFKA_TOPIC,
0,
Schema.OPTIONAL_STRING_SCHEMA,
null,
Schema.BYTES_SCHEMA,
KAFKA_VALUE);
assertRecordsEqual(expectedForMessageWithKey, result.get(0));
assertArrayEquals((byte[])expectedForMessageWithoutKey.value(), (byte[])result.get(1).value());
}
开发者ID:GoogleCloudPlatform,项目名称:pubsub,代码行数:48,代码来源:CloudPubSubSourceTaskTest.java
示例14: testPollWithPartitionSchemeRoundRobin
import com.google.pubsub.v1.AcknowledgeRequest; //导入依赖的package包/类
/**
* Tests that the correct partition is assigned when the partition scheme is "round_robin". The
* tests makes sure to submit an approrpriate number of messages to poll() so that all partitions
* in the round robin are hit once.
*/
@Test
public void testPollWithPartitionSchemeRoundRobin() throws Exception {
task.start(props);
ReceivedMessage rm1 = createReceivedMessage(ACK_ID1, CPS_MESSAGE, new HashMap<String, String>());
ReceivedMessage rm2 = createReceivedMessage(ACK_ID2, CPS_MESSAGE, new HashMap<String, String>());
ReceivedMessage rm3 = createReceivedMessage(ACK_ID3, CPS_MESSAGE, new HashMap<String, String>());
ReceivedMessage rm4 = createReceivedMessage(ACK_ID4, CPS_MESSAGE, new HashMap<String, String>());
PullResponse stubbedPullResponse =
PullResponse.newBuilder()
.addReceivedMessages(0, rm1)
.addReceivedMessages(1, rm2)
.addReceivedMessages(2, rm3)
.addReceivedMessages(3, rm4)
.build();
when(subscriber.pull(any(PullRequest.class)).get()).thenReturn(stubbedPullResponse);
List<SourceRecord> result = task.poll();
verify(subscriber, never()).ackMessages(any(AcknowledgeRequest.class));
assertEquals(4, result.size());
SourceRecord expected1 =
new SourceRecord(
null,
null,
KAFKA_TOPIC,
0,
Schema.OPTIONAL_STRING_SCHEMA,
null,
Schema.BYTES_SCHEMA,
KAFKA_VALUE);
SourceRecord expected2 =
new SourceRecord(
null,
null,
KAFKA_TOPIC,
1,
Schema.OPTIONAL_STRING_SCHEMA,
null,
Schema.BYTES_SCHEMA,
KAFKA_VALUE);
SourceRecord expected3 =
new SourceRecord(
null,
null,
KAFKA_TOPIC,
2,
Schema.OPTIONAL_STRING_SCHEMA,
null,
Schema.BYTES_SCHEMA,
KAFKA_VALUE);
SourceRecord expected4 =
new SourceRecord(
null,
null,
KAFKA_TOPIC,
0,
Schema.OPTIONAL_STRING_SCHEMA,
null,
Schema.BYTES_SCHEMA,
KAFKA_VALUE);
assertRecordsEqual(expected1, result.get(0));
assertRecordsEqual(expected2, result.get(1));
assertRecordsEqual(expected3, result.get(2));
assertRecordsEqual(expected4, result.get(3));
}
开发者ID:GoogleCloudPlatform,项目名称:pubsub,代码行数:69,代码来源:CloudPubSubSourceTaskTest.java
示例15: ackMessages
import com.google.pubsub.v1.AcknowledgeRequest; //导入依赖的package包/类
public ListenableFuture<Empty> ackMessages(AcknowledgeRequest request);
开发者ID:GoogleCloudPlatform,项目名称:pubsub,代码行数:2,代码来源:CloudPubSubSubscriber.java
注:本文中的com.google.pubsub.v1.AcknowledgeRequest类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论