Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
252 views
in Technique[技术] by (71.8m points)

reactive programming - Implementing a turnstile-like operator with RxJava

I need help implementing a turnstile-like operator in RxJava (RxScala). I spent quite some time thinking about it, but I seem to be stuck.

The type of the function should be the following:

def turnstile[T](queue: Observable[T], turnstile: Observable[Boolean]): Observable[T]

The idea is that the behavior of the operator should be very similar to a real turnstile. There are people coming (queue), and there is a turnstile that is either ready for accepting new single person (a true element in the turnstile, you can imagine it as a token inserted into the turnstile), or closed (false in the turnstile, canceling previous token). For every true element in the turnstile, only one person may pass.

Furthermore, inserting several tokens in a row (several true items in a turnstile) without a person passing is the same as inserting only a single token, the turnstile doesn't count the tokens.

In other words, the turnstile is initially closed. When a true element appears in it, it opens up for a single person. If a person appears, it passes through (to the output) and the turnstile closes again. If a false element appears in the turnstile, the turnstile also closes.

queue       ----A---B-------------C--D--
turnstile   --T--------T--T-T-T-T------T
            ============================
output      ----A------B----------C----D

A marble diagram showing open turnstile waiting for a person A, then person B waiting for the turnstile to open,then several tokens behaving as one - person C passes, but person D has to wait for a new token again

----A----B--
--T---T-F-T-
============
----A-----B-

A marble diagram showing, how a false element in the turnstile closes the turnstile again.

Any help is appreciated. I think the only way to implement this without writing a custom operator would be using the zip operator somehow, because it is probably the only operator that makes elements from one sequence wait for elements from the other (or are there any others I'm not aware of?). But I need to not zip some of the turnstile elements depending on whether they got paired with a person or not...

I think this is an interesting problem, and I'm quite curious about some nice solution to it.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

So I think I have a cleaner, fully Rx solution. This was actually a pretty fun problem to solve. Provided it works for your needs, I think it ended up being really elegant, although it took quite awhile to arrive at it.

Sadly I don't know Scala, so you're going to have to deal with my Java8 lambdas. :D

The entire implementation:

public static Observable<String> getTurnstile(final Observable<String> queue, final Observable<Boolean> tokens) {
    return queue.publish(sharedQueue ->
            tokens.switchMap(token -> token ? sharedQueue.limit(1) : Observable.empty()));
}

So, what's happening here is we use publish to make a shared observable of the people queue that we can subscribe to multiple times. Inside that, we use a switchMap on our token stream, which means any time a new Observable is emitted from the switchMap, it drops the last one and subscribes to the new one. Any time a token is true, it makes a new subscription to the people queue (and multiple trues in a row is fine, because it's canceling the old subscriptions). When it's false, it just dumps out an empty Observable to not waste time.

And some (passing) testcases:

@RunWith(JUnit4.class)
public class TurnstileTest {
    private final TestScheduler scheduler = new TestScheduler();
    private final TestSubscriber<String> output = new TestSubscriber<>();

    private final TestSubject<Boolean> tokens = TestSubject.create(scheduler);
    private final TestSubject<String> queue = TestSubject.create(scheduler);

    @Before
    public void setup() {
        Turnstile.getTurnstile(queue, tokens).subscribe(output);
    }

    @Test
    public void allowsOneWithTokenBefore() {
        tokens.onNext(true, 0);
        queue.onNext("Bill", 1);
        queue.onNext("Bob", 2);

        assertPassedThrough("Bill");
    }

    @Test
    public void tokenBeforeIsCancelable() {
        tokens.onNext(true, 0);
        tokens.onNext(false, 1);
        queue.onNext("Bill", 2);

        assertNonePassed();
    }

    @Test
    public void tokensBeforeAreCancelable() {
        tokens.onNext(true, 0);
        tokens.onNext(true, 1);
        tokens.onNext(true, 2);
        tokens.onNext(false, 3);
        queue.onNext("Bill", 4);

        assertNonePassed();
    }

    @Test
    public void eventualPassThroughAfterFalseTokens() {
        tokens.onNext(false, 0);
        queue.onNext("Bill", 1);
        tokens.onNext(false, 2);
        tokens.onNext(false, 3);
        queue.onNext("Jane", 4);
        queue.onNext("Bob", 5);
        tokens.onNext(true, 6);
        tokens.onNext(true, 7);
        tokens.onNext(false, 8);
        tokens.onNext(false, 9);
        queue.onNext("Phil", 10);
        tokens.onNext(false, 11);
        tokens.onNext(false, 12);
        tokens.onNext(true, 13);

        assertPassedThrough("Bill", "Jane", "Bob");
    }

    @Test
    public void allowsOneWithTokenAfter() {
        queue.onNext("Bill", 0);
        tokens.onNext(true, 1);
        queue.onNext("Bob", 2);

        assertPassedThrough("Bill");
    }

    @Test
    public void multipleTokenEntriesBeforeOnlyAllowsOneAtATime() {
        tokens.onNext(true, 0);
        tokens.onNext(true, 1);
        tokens.onNext(true, 2);
        queue.onNext("Bill", 3);
        tokens.onNext(true, 4);
        tokens.onNext(true, 5);
        queue.onNext("Jane", 6);
        queue.onNext("John", 7);

        assertPassedThrough("Bill", "Jane");
    }

    @Test
    public void noneShallPassWithoutToken() {
        queue.onNext("Jane", 0);
        queue.onNext("John", 1);

        assertNonePassed();
    }

    private void closeSubjects() {
        scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
        scheduler.triggerActions();
        tokens.onCompleted();
        queue.onCompleted();
        scheduler.triggerActions();
    }

    private void assertNonePassed() {
        closeSubjects();
        output.assertReceivedOnNext(Lists.newArrayList());
    }

    private void assertPassedThrough(final String... names) {
        closeSubjects();
        output.assertReceivedOnNext(Lists.newArrayList(names));
    }
}

Let me know if you find any edge cases that don't work with this, particularly if it has trouble in real time since the tests are obviously in a controlled environment.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...