I have a hello service in gRPC-java with deadlines. The implementation is with the blocking channel. I simulated a 300 milliseconds of computation just to test the deadlines and the service is working as expected.
public class GreetingServiceImpl extends GreetServiceGrpc.GreetServiceImplBase {
public void greetWithDeadline(GreetWithDeadlineRequest request, StreamObserver<GreetWithDeadlineResponse> responseObserver) {
try {
Context context = Context.current();
System.out.println("taking 300 milliseconds to process...");
Thread.sleep(300);
// if the time reaches the upper bound dead line we return the gRPC call
if (context.isCancelled()) {
System.out.println("context.isCancelled()");
responseObserver.onCompleted(); // I tested with and without this line
return;
}
String result = "Hello " + request.getGreeting().getFirstName() + " " + request.getGreeting().getLastName() + " after 300 milliseconds.";
GreetWithDeadlineResponse response = GreetWithDeadlineResponse.newBuilder()
.setResult(result)
.build();
responseObserver.onNext(response);
// don't forget to complete the call and send message back to the client!!!
responseObserver.onCompleted();
} catch (InterruptedException e) {
responseObserver.onCompleted(); // I tested with and without this line
e.printStackTrace();
} catch (Exception e) {
responseObserver.onCompleted(); // I tested with and without this line
e.printStackTrace();
}
}
Then I decided to create unit tests for this service. The test that I run with a large deadline than the 300 milliseconds passes. The second test that I run with a very small deadline does not pass.
@RunWith(JUnit4.class)
public class GreetingServiceImplTest {
@Rule
public final GrpcServerRule grpcServerRule = new GrpcServerRule().directExecutor();
@Before
public void addService() {
grpcServerRule.getServiceRegistry().addService(new GreetingServiceImpl());
}
@Test
public void greeterUnaryCallWithSmallDeadline() {
List<Pair<String, String>> people = Arrays.asList(
new Pair("Felipe", "Gutierrez")
);
List<Status.Code> listOfCodes = new ArrayList<Status.Code>();
GreetServiceGrpc.GreetServiceBlockingStub blockingStub = GreetServiceGrpc.newBlockingStub(grpcServerRule.getChannel());
people.forEach(person -> {
try {
// create the protocol buffer message Greeting
Greeting greeting = Greeting.newBuilder()
.setFirstName(person.x)
.setLastName(person.y)
.build();
// create a greeting request with the protocol buffer greeting message
GreetWithDeadlineRequest request = GreetWithDeadlineRequest.newBuilder()
.setGreeting(greeting)
.build();
System.out.println("Sending message: " + greeting.toString());
// call the gRPC and get back a protocol buffer GreetingResponse
GreetWithDeadlineResponse greetResponse = blockingStub
.withDeadline(Deadline.after(1, TimeUnit.MILLISECONDS))
.greetWithDeadline(request);
System.out.println("Hello from server with deadline: " + greetResponse.getResult());
} catch (StatusRuntimeException e) {
listOfCodes.add(e.getStatus().getCode());
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
});
System.out.println(listOfCodes.size());
assertEquals(1, listOfCodes.size());
listOfCodes.forEach(code -> {
assertEquals(code, Status.Code.DEADLINE_EXCEEDED);
});
}
The strange thing is that these assertions from my test are true: assertEquals(1, listOfCodes.size());
and assertEquals(code, Status.Code.DEADLINE_EXCEEDED);
. But the unit test is hanging and never terminates. First I was thinking that the problem was that I didn't put the responseObserver.onCompleted();
in the right place. So I tried to put it in all places that seem reasonable. But without success.
I am receiving a nullPointerExceptiopn
from the gRPC class InProcessTransport
which seems that the serverStreamListener
is already closed:
INFO: Exception notifying context listener
java.lang.NullPointerException
at io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessClientStream.internalCancel(InProcessTransport.java:746)
at io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessClientStream.serverClosed(InProcessTransport.java:683)
at io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessClientStream.access$1700(InProcessTransport.java:619)
at io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessServerStream.cancel(InProcessTransport.java:563)
at io.grpc.internal.ServerImpl$ServerTransportListenerImpl$1ServerStreamCancellationListener.cancelled(ServerImpl.java:596)
at io.grpc.Context$ExecutableListener.run(Context.java:1005)
at com.google.common.util.concurrent.MoreExecutors$DirectExecutor.execute(MoreExecutors.java:398)
at io.grpc.Context$ExecutableListener.deliver(Context.java:997)
at io.grpc.Context.addListener(Context.java:479)
at io.grpc.internal.ServerImpl$ServerTransportListenerImpl.createContext(ServerImpl.java:601)
at io.grpc.internal.ServerImpl$ServerTransportListenerImpl.streamCreatedInternal(ServerImpl.java:501)
at io.grpc.internal.ServerImpl$ServerTransportListenerImpl.streamCreated(ServerImpl.java:476)
at io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessClientStream.start(InProcessTransport.java:780)
at io.grpc.internal.ForwardingClientStream.start(ForwardingClientStream.java:87)
at io.grpc.internal.InternalSubchannel$CallTracingTransport$1.start(InternalSubchannel.java:642)
at io.grpc.internal.ClientCallImpl.startInternal(ClientCallImpl.java:290)
at io.grpc.internal.ClientCallImpl.start(ClientCallImpl.java:190)
at io.grpc.internal.CensusTracingModule$TracingClientInterceptor$1.start(CensusTracingModule.java:394)
at io.grpc.internal.CensusStatsModule$StatsClientInterceptor$1.start(CensusStatsModule.java:695)
at io.grpc.stub.ClientCalls.startCall(ClientCalls.java:310)
at io.grpc.stub.ClientCalls.asyncUnaryRequestCall(ClientCalls.java:282)
at io.grpc.stub.ClientCalls.futureUnaryCall(ClientCalls.java:191)
at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:129)
at org.github.felipegutierrez.explore.grpc.greet.GreetServiceGrpc$GreetServiceBlockingStub.greetWithDeadline(GreetServiceGrpc.java:422)
at org.github.felipegutierrez.explore.grpc.greet.server.GreetingServiceImplTest.lambda$greeterUnaryCallWithSmallDeadline$7(GreetingServiceImplTest.java:271)
at java.util.Arrays$ArrayList.forEach(Arrays.java:3880)
at org.github.felipegutierrez.explore.grpc.greet.server.GreetingServiceImplTest.greeterUnaryCallWithSmallDeadline(GreetingServiceImplTest.java:256)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
at org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:43)
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.Iterator.forEachRemaining(Iterator.java:116)
at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
at org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:82)
at org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:73)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:248)
at org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$5(DefaultLauncher.java:211)
at org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:226)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:199)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:132)
at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.processAllTestClasses(JUnitPlatformTestClassProcessor.java:99)
at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.access$000(JUnitPlatformTestClassProcessor.java:79)
at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor.stop(JUnitPlatformTestClassProcessor.java:75)
at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.stop(SuiteTestClassProcessor.java:61)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAc
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…