Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
417 views
in Technique[技术] by (71.8m points)

apache flink - Can't understand the result for the event time group by window

I am using Flink 1.12.0 and have a data collection and use that to try out the event time group window.Following is the full code.

package org.example.sqlexploration

import java.sql.Timestamp
import java.text.SimpleDateFormat

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.table.api.{AnyWithOperations, FieldExpression}
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.types.Row
import org.example.sqlexploration.Implicits.String2Timestamp

case class MyStock(id: String, event_time: Timestamp, price: Int)

object Implicits {

  implicit class String2Timestamp(strDate: String) {
    def ts = {
      val milli = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(strDate).getTime
      new Timestamp(milli)

    }
  }

}


object Main {

  def main(args: Array[String]): Unit = {
    val elements = Seq(
      MyStock("id1", "2020-01-04 11:36:10".ts, 1),
      MyStock("id1", "2020-01-04 11:36:15".ts, 2),
      MyStock("id1", "2020-01-04 11:36:13".ts, 4),
      MyStock("id1", "2020-01-04 11:36:18".ts, 8),
      MyStock("id1", "2020-01-04 11:36:12".ts, 16)

    )

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val ds: DataStream[MyStock] = env.fromCollection(elements).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks[MyStock] {
      var max_seen = Long.MinValue

      override def checkAndGetNextWatermark(stock: MyStock, l: Long): Watermark = {
        val ts = stock.event_time.getTime
        if (max_seen < ts) {
          max_seen = ts
        }
        new Watermark(max_seen - 2000) //allow 2 seconds lateness

      }

      override def extractTimestamp(stock: MyStock, l: Long): Long = stock.event_time.getTime
    })

    val tenv = StreamTableEnvironment.create(env)
    tenv.createTemporaryView("sourceView", ds, $"id", $"price", $"event_time".rowtime() as "rt")
    val sql =
      """
      select id,
      sum(price) as total_price,
      tumble_start(rt, interval '2' second) as proc_start,
      tumble_end(rt, interval '2' second) as proc_end
      from sourceView
      group by id, tumble(rt, interval '2' second)

      """.stripMargin(' ')

    tenv.sqlQuery(sql).toAppendStream[Row].print()
    env.execute()
  }


}

In my application, I have set parallism to be 1 and use AssignerWithPunctuatedWatermarks implementation allowing 2 seconds lateness. The tumble event time window is 2 seconds interval,

The result output is:

id1,1,2020-01-04T03:36:10,2020-01-04T03:36:12
id1,4,2020-01-04T03:36:12,2020-01-04T03:36:14
id1,2,2020-01-04T03:36:14,2020-01-04T03:36:16
id1,8,2020-01-04T03:36:18,2020-01-04T03:36:20

I don't understand why id1,4,2020-01-04T03:36:12,2020-01-04T03:36:14 is contained in the result.

The event that lead to the above window creation is: MyStock("id1", "2020-01-04 11:36:13".ts, 4) . It is late because the watermark reaches 2020-01-04 11:36:13. Isn't the event excluded when the event time equals to the watermark?


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)
等待大神答复

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...