Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
98 views
in Technique[技术] by (71.8m points)

java - Why this unit test for gRPC with DEADLINE_EXCEEDED is hanging forever?

I have a hello service in gRPC-java with deadlines. The implementation is with the blocking channel. I simulated a 300 milliseconds of computation just to test the deadlines and the service is working as expected.

public class GreetingServiceImpl extends GreetServiceGrpc.GreetServiceImplBase {
    public void greetWithDeadline(GreetWithDeadlineRequest request, StreamObserver<GreetWithDeadlineResponse> responseObserver) {
        try {
            Context context = Context.current();

            System.out.println("taking 300 milliseconds to process...");
            Thread.sleep(300);

            // if the time reaches the upper bound dead line we return the gRPC call
            if (context.isCancelled()) {
                System.out.println("context.isCancelled()");
                responseObserver.onCompleted(); // I tested with and without this line
                return;
            }

            String result = "Hello " + request.getGreeting().getFirstName() + " " + request.getGreeting().getLastName() + " after 300 milliseconds.";
            GreetWithDeadlineResponse response = GreetWithDeadlineResponse.newBuilder()
                    .setResult(result)
                    .build();
            responseObserver.onNext(response);
            // don't forget to complete the call and send message back to the client!!!
            responseObserver.onCompleted();
        } catch (InterruptedException e) {
            responseObserver.onCompleted(); // I tested with and without this line
            e.printStackTrace();
        } catch (Exception e) {
            responseObserver.onCompleted(); // I tested with and without this line
            e.printStackTrace();
        }
    }

Then I decided to create unit tests for this service. The test that I run with a large deadline than the 300 milliseconds passes. The second test that I run with a very small deadline does not pass.

@RunWith(JUnit4.class)
public class GreetingServiceImplTest {
    @Rule
    public final GrpcServerRule grpcServerRule = new GrpcServerRule().directExecutor();

    @Before
    public void addService() {
        grpcServerRule.getServiceRegistry().addService(new GreetingServiceImpl());
    }

    @Test
    public void greeterUnaryCallWithSmallDeadline() {
        List<Pair<String, String>> people = Arrays.asList(
                new Pair("Felipe", "Gutierrez")
        );
        List<Status.Code> listOfCodes = new ArrayList<Status.Code>();
        GreetServiceGrpc.GreetServiceBlockingStub blockingStub = GreetServiceGrpc.newBlockingStub(grpcServerRule.getChannel());

        people.forEach(person -> {
            try {
                // create the protocol buffer message Greeting
                Greeting greeting = Greeting.newBuilder()
                        .setFirstName(person.x)
                        .setLastName(person.y)
                        .build();
                // create a greeting request with the protocol buffer greeting message
                GreetWithDeadlineRequest request = GreetWithDeadlineRequest.newBuilder()
                        .setGreeting(greeting)
                        .build();
                System.out.println("Sending message: " + greeting.toString());
                // call the gRPC and get back a protocol buffer GreetingResponse
                GreetWithDeadlineResponse greetResponse = blockingStub
                        .withDeadline(Deadline.after(1, TimeUnit.MILLISECONDS))
                        .greetWithDeadline(request);
                System.out.println("Hello from server with deadline: " + greetResponse.getResult());
            } catch (StatusRuntimeException e) {
                listOfCodes.add(e.getStatus().getCode());
                e.printStackTrace();
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
        System.out.println(listOfCodes.size());
        assertEquals(1, listOfCodes.size());
        listOfCodes.forEach(code -> {
            assertEquals(code, Status.Code.DEADLINE_EXCEEDED);
        });
    }

The strange thing is that these assertions from my test are true: assertEquals(1, listOfCodes.size()); and assertEquals(code, Status.Code.DEADLINE_EXCEEDED);. But the unit test is hanging and never terminates. First I was thinking that the problem was that I didn't put the responseObserver.onCompleted(); in the right place. So I tried to put it in all places that seem reasonable. But without success. I am receiving a nullPointerExceptiopn from the gRPC class InProcessTransport which seems that the serverStreamListener is already closed:

INFO: Exception notifying context listener
java.lang.NullPointerException
    at io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessClientStream.internalCancel(InProcessTransport.java:746)
    at io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessClientStream.serverClosed(InProcessTransport.java:683)
    at io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessClientStream.access$1700(InProcessTransport.java:619)
    at io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessServerStream.cancel(InProcessTransport.java:563)
    at io.grpc.internal.ServerImpl$ServerTransportListenerImpl$1ServerStreamCancellationListener.cancelled(ServerImpl.java:596)
    at io.grpc.Context$ExecutableListener.run(Context.java:1005)
    at com.google.common.util.concurrent.MoreExecutors$DirectExecutor.execute(MoreExecutors.java:398)
    at io.grpc.Context$ExecutableListener.deliver(Context.java:997)
    at io.grpc.Context.addListener(Context.java:479)
    at io.grpc.internal.ServerImpl$ServerTransportListenerImpl.createContext(ServerImpl.java:601)
    at io.grpc.internal.ServerImpl$ServerTransportListenerImpl.streamCreatedInternal(ServerImpl.java:501)
    at io.grpc.internal.ServerImpl$ServerTransportListenerImpl.streamCreated(ServerImpl.java:476)
    at io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessClientStream.start(InProcessTransport.java:780)
    at io.grpc.internal.ForwardingClientStream.start(ForwardingClientStream.java:87)
    at io.grpc.internal.InternalSubchannel$CallTracingTransport$1.start(InternalSubchannel.java:642)
    at io.grpc.internal.ClientCallImpl.startInternal(ClientCallImpl.java:290)
    at io.grpc.internal.ClientCallImpl.start(ClientCallImpl.java:190)
    at io.grpc.internal.CensusTracingModule$TracingClientInterceptor$1.start(CensusTracingModule.java:394)
    at io.grpc.internal.CensusStatsModule$StatsClientInterceptor$1.start(CensusStatsModule.java:695)
    at io.grpc.stub.ClientCalls.startCall(ClientCalls.java:310)
    at io.grpc.stub.ClientCalls.asyncUnaryRequestCall(ClientCalls.java:282)
    at io.grpc.stub.ClientCalls.futureUnaryCall(ClientCalls.java:191)
    at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:129)
    at org.github.felipegutierrez.explore.grpc.greet.GreetServiceGrpc$GreetServiceBlockingStub.greetWithDeadline(GreetServiceGrpc.java:422)
    at org.github.felipegutierrez.explore.grpc.greet.server.GreetingServiceImplTest.lambda$greeterUnaryCallWithSmallDeadline$7(GreetingServiceImplTest.java:271)
    at java.util.Arrays$ArrayList.forEach(Arrays.java:3880)
    at org.github.felipegutierrez.explore.grpc.greet.server.GreetingServiceImplTest.greeterUnaryCallWithSmallDeadline(GreetingServiceImplTest.java:256)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
    at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
    at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
    at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
    at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
    at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
    at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
    at org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:43)
    at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
    at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
    at java.util.Iterator.forEachRemaining(Iterator.java:116)
    at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
    at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
    at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
    at org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:82)
    at org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:73)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:248)
    at org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$5(DefaultLauncher.java:211)
    at org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:226)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:199)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:132)
    at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.processAllTestClasses(JUnitPlatformTestClassProcessor.java:99)
    at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.access$000(JUnitPlatformTestClassProcessor.java:79)
    at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor.stop(JUnitPlatformTestClassProcessor.java:75)
    at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.stop(SuiteTestClassProcessor.java:61)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAc

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)
Waitting for answers

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...