Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
345 views
in Technique[技术] by (71.8m points)

android - How can I send items to a Kotlin.Flow (like a Behaviorsubject)

I wanted to know how can I send/emit items to a Kotlin.Flow, so my use case is:

In the consumer/ViewModel/Presenter I can subscribe with the collect function:

fun observe() {
 coroutineScope.launch {
    // 1. Send event
    reopsitory.observe().collect {
      println(it)
    }
  }
}

But the issue is in the Repository side, with RxJava we could use a Behaviorsubject expose it as an Observable/Flowable and emit new items like this:

behaviourSubject.onNext(true)

But whenever I build a new flow:

flow {

}

I can only collect. How can I send values to a flow?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

If you want to get the latest value on subscription/collection you should use a ConflatedBroadcastChannel:

private val channel = ConflatedBroadcastChannel<Boolean>()

This will replicate BehaviourSubject, to expose the channel as a Flow:

// Repository
fun observe() {
  return channel.asFlow()
}

Now to send an event/value to that exposed Flow simple send to this channel.

// Repository
fun someLogicalOp() {
  channel.send(false) // This gets sent to the ViewModel/Presenter and printed.
}

Console:

false

If you wish to only receive values after you start collecting you should use a BroadcastChannel instead.

To make it clear:

Behaves as an Rx's PublishedSubject

private val channel = BroadcastChannel<Boolean>(1)

fun broadcastChannelTest() {
  // 1. Send event
  channel.send(true)

  // 2. Start collecting
  channel
    .asFlow()
    .collect {
      println(it)
    }

  // 3. Send another event
  channel.send(false)
}

false

Only false gets printed as the first event was sent before collect { }.


Behaves as an Rx's BehaviourSubject

private val confChannel = ConflatedBroadcastChannel<Boolean>()

fun conflatedBroadcastChannelTest() {
  // 1. Send event
  confChannel.send(true)

  // 2. Start collecting
  confChannel
    .asFlow()
    .collect {
      println(it)
    }

  // 3. Send another event
  confChannel.send(false)
}

true

false

Both events are printed, you always get the latest value (if present).

Also, want to mention Kotlin's team development on DataFlow (name pending):

Which seems better suited to this use case (as it will be a cold stream).


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...