Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
518 views
in Technique[技术] by (71.8m points)

kotlin - Why not use GlobalScope.launch?

I read that usage of Globalscope is highly discouraged, here.

I have a simple use-case. For every kafka message (let's say a list of Ids) that I receive I have to split it and invoke a rest service simultaneously for each of those Ids and wait for it to be done and proceed with other synchronous tasks. There is nothing else in that application that requires coroutine. In this case, Can I just get away with using Globalscope ?

Note: This is not an android application. It's a kafka stream processor running on server side. It's an ephemeral, stateless, containerized (Docker) application running in Kubernetes (Buzzword-compliant if you will)

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

You should scope your concurrency appropriately using structured concurrency. Your coroutines can leak if you don't do this. In your case, scoping them to the processing of a single message seems appropriate.

Here's an example:

/* I don't know Kafka, but let's pretend this function gets 
 * called when you receive a new message
 */
suspend fun onMessage(msg: Message) {
    val ids: List<Int> = msg.getIds()    

    val jobs = ids.map { id ->
        GlobalScope.launch { restService.post(id) }
    }

    jobs.joinAll()
}

If one of the calls to restService.post(id) fails with an exception, the example will immediately rethrow the exception, and all the jobs that hasn't completed yet will leak. They will continue to execute (potentially indefinitely), and if they fail, you won't know about it.

To solve this, you need to scope your coroutines. Here's the same example without the leak:

suspend fun onMessage(msg: Message) = coroutineScope {
    val ids: List<Int> = msg.getIds()    

    ids.forEach { id ->
        // launch is called on "this", which is the coroutineScope.
        launch { restService.post(id) }
    }
}

In this case, if one of the calls to restService.post(id) fails, then all other non-completed coroutines inside the coroutine scope will get cancelled. When you leave the scope, you can be sure that you haven't leaked any coroutines.

Also, because coroutineScope will wait until all child-coroutines are done, you can drop the jobs.joinAll() call.

Side note: A convention when writing a function that start some coroutines, is to let the caller decide the coroutine scope using the receiver parameter. Doing this with the onMessage function could look like this:

fun CoroutineScope.onMessage(msg: Message): List<Job> {
    val ids: List<Int> = msg.getIds()    

    return ids.map { id ->
        // launch is called on "this", which is the coroutineScope.
        launch { restService.post(id) }
    }
}

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...