I am using Flink 1.12.0 and have a data collection and use that to try out the event time group window.Following is the full code.
package org.example.sqlexploration
import java.sql.Timestamp
import java.text.SimpleDateFormat
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.table.api.{AnyWithOperations, FieldExpression}
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.types.Row
import org.example.sqlexploration.Implicits.String2Timestamp
case class MyStock(id: String, event_time: Timestamp, price: Int)
object Implicits {
implicit class String2Timestamp(strDate: String) {
def ts = {
val milli = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(strDate).getTime
new Timestamp(milli)
}
}
}
object Main {
def main(args: Array[String]): Unit = {
val elements = Seq(
MyStock("id1", "2020-01-04 11:36:10".ts, 1),
MyStock("id1", "2020-01-04 11:36:15".ts, 2),
MyStock("id1", "2020-01-04 11:36:13".ts, 4),
MyStock("id1", "2020-01-04 11:36:18".ts, 8),
MyStock("id1", "2020-01-04 11:36:12".ts, 16)
)
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val ds: DataStream[MyStock] = env.fromCollection(elements).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks[MyStock] {
var max_seen = Long.MinValue
override def checkAndGetNextWatermark(stock: MyStock, l: Long): Watermark = {
val ts = stock.event_time.getTime
if (max_seen < ts) {
max_seen = ts
}
new Watermark(max_seen - 2000) //allow 2 seconds lateness
}
override def extractTimestamp(stock: MyStock, l: Long): Long = stock.event_time.getTime
})
val tenv = StreamTableEnvironment.create(env)
tenv.createTemporaryView("sourceView", ds, $"id", $"price", $"event_time".rowtime() as "rt")
val sql =
"""
select id,
sum(price) as total_price,
tumble_start(rt, interval '2' second) as proc_start,
tumble_end(rt, interval '2' second) as proc_end
from sourceView
group by id, tumble(rt, interval '2' second)
""".stripMargin(' ')
tenv.sqlQuery(sql).toAppendStream[Row].print()
env.execute()
}
}
In my application, I have set parallism to be 1 and use AssignerWithPunctuatedWatermarks implementation allowing 2 seconds lateness. The tumble event time window is 2 seconds interval,
The result output is:
id1,1,2020-01-04T03:36:10,2020-01-04T03:36:12
id1,4,2020-01-04T03:36:12,2020-01-04T03:36:14
id1,2,2020-01-04T03:36:14,2020-01-04T03:36:16
id1,8,2020-01-04T03:36:18,2020-01-04T03:36:20
I don't understand why id1,4,2020-01-04T03:36:12,2020-01-04T03:36:14
is contained in the result.
The event that lead to the above window creation is: MyStock("id1", "2020-01-04 11:36:13".ts, 4)
. It is late because the watermark reaches 2020-01-04 11:36:13
. Isn't the event excluded when the event time equals to the watermark?