Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
432 views
in Technique[技术] by (71.8m points)

request - Netty synchronous client with asynchronous callers

I am creating a server which consumes commands from numerous sources such as JMS, SNMP, HTTP etc. These are all asynchronous and are working fine. The server maintains a single connection to a single item of legacy hardware which has a request/reply architecture with a custom TCP protocol. Ideally I would like a single command like this blocking type method

public Response issueCommandToLegacyHardware(Command command)

or this asynchronous type method

public Future<Response> issueCommandToLegacyHardware(Command command)

I am relatively new to Netty and asynchronous programming, basically learning it as I go along. My current thought is that my LegacyHardwareClient class will have public synchronized issueCommandToLegacyHardware(Command command), will make a write to the client channel to the legacy hardware, then take() from a SynchronousQueue<Response> which will block. The ChannelInboundHandler in the pipeline will offer() a Response to the SynchronousQueue>Response> which will allow the take() to unblock and receive the data.

Is this too convoluted? Are there any examples around of synchronous Netty client implementations that I can look at? Are there any best practices for Netty? I could obviously use just standard Java sockets however the power of Netty for parsing custom protocols along with the ease of maintaniability is far too great to give up.

UPDATE: Just regarding the implementation, I used an ArrayBlockingQueue<>() and I used put() and remove() rather than offer() and remove(). Because I wanted to ensure that subsequent requests to the legacy hardware were only sent when any active requests had been replied to as the legacy hardware behaviour is not known with certainty otherwise.

The reason offer() and remove() did not work for me was that the offer() command would not pass anything if there was not an actively blocking take() request no the other side. The converse is true that remove() would not return anything unless there was a blocking put() call inserting data. I couldn't use a put()/remove() since the remove() statement would never be reached since there was no request written to the channel to trigger the event from where the remove() would be called. I couldn't use offer()/take() since the offer() statement would return false since the take() call hadn't been executed yet. Using the ArrayBlockingQueue<>() with a capacity of 1, it ensured that only one command could be executed at once. Any other commands would block until there was sufficient room to insert, with a capacity of 1 this meant it had to be empty. The emptying of the queue was done once a response had been received from the legacy hardware. This ensured a nice synchronous behaviour toward the legacy hardware but provided an asynchronous API to the users of the legacy hardware, for which there are many.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Instead of designing your application on a blocking manner using SynchronousQueue<Response>, design it in a nonblocking manner using SynchronousQueue<Promise<Response>>.

Your public Future<Response> issueCommandToLegacyHardware(Command command) should then use offer() to add a DefaultPromise<>() to the Queue, and then the netty pipeline can use remove() to get the response for that request, notice I used remove() instead of take(), since only under exceptional circumstances, there is none element present.

A quick implementation of this might be:

public class MyLastHandler extends SimpleInboundHandler<Response> {
    private final SynchronousQueue<Promise<Response>> queue;

    public MyLastHandler (SynchronousQueue<Promise<Response>> queue) {
        super();
        this.queue = queue;
    }

    // The following is called messageReceived(ChannelHandlerContext, Response) in 5.0.
    @Override
    public void channelRead0(ChannelHandlerContext ctx, Response msg) {
        this.queue.remove().setSuccss(msg); // Or setFailure(Throwable)
    }
}

The above handler should be placed last in the chain.

The implementation of public Future<Response> issueCommandToLegacyHardware(Command command) can look:

Channel channel = ....;
SynchronousQueue<Promise<Response>> queue = ....;

public Future<Response> issueCommandToLegacyHardware(Command command) {
    return issueCommandToLegacyHardware(command, channel.eventLoop().newPromise());
}

public Future<Response> issueCommandToLegacyHardware(Command command, Promise<Response> promise) {
    queue.offer(promise);
    channel.write(command);
    return promise;
}

Using the approach with the overload on issueCommandToLegacyHardware is also the design pattern used for Channel.write, this makes it really flexable.

This design pattern can be used as follows in client code:

issueCommandToLegacyHardware(
    Command.TAKE_OVER_THE_WORLD_WITH_FIRE, 
    channel.eventLoop().newPromise()
).addListener(
    (Future<Response> f) -> {
        System.out.println("We have taken over the world: " + f.get());
    }
);

The advantage of this design pattern is that no unneeded blocking is used anywhere, just plain async logic.

Appendix I: Javadoc:

Promise Future DefaultPromise


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

1.4m articles

1.4m replys

5 comments

57.0k users

...