Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
101 views
in Technique[技术] by (71.8m points)

Apache Flink EventTime processing not working

I am trying to perform stream-stream join using Flink v1.11 app on KDA. Join wrt to ProcessingTime works, but with EventTime I don’t see any output records from Flink.

Here is my code with EventTime processing which is not working,

public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    DataStream<Trade> input1 = createSourceFromInputStreamName1(env)
            .assignTimestampsAndWatermarks(
                    WatermarkStrategy.<Trade>forMonotonousTimestamps()
                            .withTimestampAssigner(((event, l) -> event.getEventTime()))
            );
    DataStream<Company> input2 = createSourceFromInputStreamName2(env)
            .assignTimestampsAndWatermarks(
                    WatermarkStrategy.<Company>forMonotonousTimestamps()
                            .withTimestampAssigner(((event, l) -> event.getEventTime()))
            );
    DataStream<String> joinedStream = input1.join(input2)
            .where(new TradeKeySelector())
            .equalTo(new CompanyKeySelector())
            .window(TumblingEventTimeWindows.of(Time.seconds(30)))
            .apply(new JoinFunction<Trade, Company, String>() {
                @Override
                public String join(Trade t, Company c) {
                    return t.getEventTime() + ", " + t.getTicker() + ", " + c.getName() + ", " + t.getPrice();
                }
            });
    joinedStream.addSink(createS3SinkFromStaticConfig());
    env.execute("Flink S3 Streaming Sink Job");
}

I got a similar join working with ProcessingTime

public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
    DataStream<Trade> input1 = createSourceFromInputStreamName1(env);
    DataStream<Company> input2 = createSourceFromInputStreamName2(env);
    DataStream<String> joinedStream = input1.join(input2)
            .where(new TradeKeySelector())
            .equalTo(new CompanyKeySelector())
            .window(TumblingProcessingTimeWindows.of(Time.milliseconds(10000)))
            .apply (new JoinFunction<Trade, Company, String> (){
                @Override
                public String join(Trade t, Company c) {
                    return t.getEventTime() + ", " + t.getTicker() + ", " + c.getName() + ", " + t.getPrice();
                }
            });
    joinedStream.addSink(createS3SinkFromStaticConfig());
    env.execute("Flink S3 Streaming Sink Job");
}

Sample records from two streams which I am trying to join:

{'eventTime': 1611773705, 'ticker': 'TBV', 'price': 71.5}
{'eventTime': 1611773705, 'ticker': 'TBV', 'name': 'The Bavaria'}
question from:https://stackoverflow.com/questions/65947247/apache-flink-eventtime-processing-not-working

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

I don't see anything obviously wrong, but any of the following could cause this job to not produce any output:

  • A problem with watermarking. For example, if one of the streams becomes idle, then the watermarks will cease to advance. Or if there are no events after a window, then the watermark will not advance far enough to close that window. Or if the timestamps aren't actually in ascending order (with the forMonotonousTimestamps strategy, the events should be in order by timestamp), the pipeline could be silently dropping all of the out-of-order events.
  • The StreamingFileSink only finalizes its output during checkpointing, and does not finalize whatever files are pending if and when the job is stopped.
  • A windowed join behaves like an inner join, and requires at least one event from each input stream in order to produce any results for a given window interval. From the example you shared, it looks like this is not the issue.

Update:

Given that what you (appear to) want to do is to join each Trade with the latest Company record available at the time of the Trade, a lookup join or a temporal table join seem like they might be good approaches.

Here are a couple of examples:

https://github.com/ververica/flink-sql-cookbook/blob/master/joins/04/04_lookup_joins.md

https://github.com/ververica/flink-sql-cookbook/blob/master/joins/03/03_kafka_join.md

Some documentation:

https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/joins.html#event-time-temporal-join

https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/versioned_tables.html


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...