I'm trying the Akka Stream API, but I don't know why it throws a java.lang.IllegalArgumentException.
val graph = RunnableGraph.fromGraph(
GraphDSL.create(source, sink)
((source, sink) => Seq(source, sink)) {
implicit b => (source, sink) =>
Import akka.stream.scaladsl.GraphDSL.Implicits._
val partition = b.add(Partition[(KinesisRecord)](2, flow => {
1
}))
source ~> partition.in
partition.out(0) ~> sink
partition.out(1) ~> sink
ClosedShape
})
Here is the current code. The error is as follows
[info] - should consume *** FAILED ***
[info] java.lang.IllegalArgumentException: [Map.in] is already connected
[info] at akka.stream.scaladsl.GraphDSL$Builder.addEdge(Graph.scala:1567)
[info] at akka.stream.scaladsl.GraphDSL$Implicits$CombinerBase.$tilde$greater(Graph.scala:1730)
[info] at akka.stream.scaladsl.GraphDSL$Implicits$CombinerBase.$tilde$greater$(Graph.scala:1729)
[info] at akka.stream.scaladsl.GraphDSL$Implicits$PortOpsImpl.$tilde$greater(Graph.scala:1784)
[info] at akka.stream.scaladsl.GraphApply.create(GraphApply.scala:46)
[info] at akka.stream.scaladsl.GraphApply.create$(GraphApply.scala:41)
[info] at akka.stream.scaladsl.GraphDSL$.create(Graph.scala:1529)
I am using kinesisRecord as the target of source.
However, in this code, if I change the outputPorts to 1 and remove
partition.out(1) ~> sink
this line, it works.
I don't know if I'm missing something or if it's just a bug.
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…