Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
404 views
in Technique[技术] by (71.8m points)

android - Parallel request with Retrofit, Coroutines and Suspend functions

I'm using Retrofit in order to make some network requests. I'm also using the Coroutines in combination with 'suspend' functions.

My question is: Is there a way to improve the following code. The idea is to launch multiple requests in parallels and wait for them all to finish before continuing the function.

lifecycleScope.launch {
    try {
        itemIds.forEach { itemId ->
            withContext(Dispatchers.IO) { itemById[itemId] = MyService.getItem(itemId) }
        }
    } catch (exception: Exception) {
        exception.printStackTrace()
    }

    Log.i(TAG, "All requests have been executed")
}

(Note that "MyService.getItem()" is a 'suspend' function.)

I guess that there is something nicer than a foreach in this case.

Anyone with an idea?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

I've prepared three approaches to solving this, from the simplest to the most correct one. To simplify the presentation of the approaches, I have extracted this common code:

lifecycleScope.launch {
    val itemById = try {
        fetchItems(itemIds)
    } catch (exception: Exception) {
        exception.printStackTrace()
    }
    Log.i(TAG, "Fetched these items: $itemById")
}

Before I go on, a general note: your getItem() function is suspendable, you have no need to submit it to the IO dispatcher. All your coroutines can run on the main thread.

Now let's see how we can implement fetchItems(itemIds).

1. Simple forEach

Here we take advantage of the fact that all the coroutine code can run on the main thread:

suspend fun fetchItems(itemIds: Iterable<Long>): Map<Long, Item> {
    val itemById = mutableMapOf<Long, Item>()
    coroutineScope {
        itemIds.forEach { itemId ->
            launch { itemById[itemId] = MyService.getItem(itemId) }
        }
    }
    return itemById
}

coroutineScope will wait for all the coroutines you launch inside it. Even though they all run concurrently to each other, the launched coroutines still dispatch to the single (main) thread, so there is no concurrency issue with updating the map from each of them.

2. Thread-Safe Variant

The fact that it leverages the properties of a single-threaded context can be seen as a limitation of the first approach: it doesn't generalize to threadpool-based contexts. We can avoid this limitation by relying on the async-await mechanism:

suspend fun fetchItems(itemIds: Iterable<Long>): Map<Long, Item> = coroutineScope {
    itemIds.map { itemId -> async { itemId to MyService.getItem(itemId) } }
            .map { it.await() }
            .toMap()
}

Here we rely on two non-obvious properties of Collection.map():

  1. It performs all the transformation eagerly, so the first transformation to a collection of Deferred<Pair<Long, Item>> is completely done before entering the second stage, where we await on all of them.
  2. It is an inline function, which allows us to write suspendable code in it even though the function itself is not a suspend fun and gets a non-suspendable lambda (Deferred<T>) -> T.

This means that all the fetching is done concurrently, but the map gets assembled in a single coroutine.

3. Flow-Based Approach with Improved Concurrency Control

The above solved the concurrency for us, but it lacks any backpressure. If your input list is very large, you'll want to put a limit on how many simultaneous network requests you're making.

You can do this with a Flow-based idiom:

suspend fun fetchItems(itemIds: Iterable<Long>): Map<Long, Item> = itemIds
        .asFlow()
        .flatMapMerge(concurrency = MAX_CONCURRENT_REQUESTS) { itemId ->
            flow { emit(itemId to MyService.getItem(itemId)) }
        }
        .toMap()

Here the magic is in the .flatMapMerge operation. You give it a function (T) -> Flow<R> and it will execute it sequentially on all the input, but then it will concurrently collect all the flows it got. Note that I couldn't simplify flow { emit(getItem()) } } to just flowOf(getItem()) because getItem() must be called lazily, while collecting the flow.

Flow.toMap() is not currently provided in the standard library, so here it is:

suspend fun <K, V> Flow<Pair<K, V>>.toMap(): Map<K, V> {
    val result = mutableMapOf<K, V>()
    collect { (k, v) -> result[k] = v }
    return result
}

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...