I am writing Apache beam python code which read data from pubsub subscription and print it on console but it is getting struck and not getting completed.
import argparse
import logging
import ast
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
class FlattenJson(beam.DoFn):
def process(self, element, *args, **kwargs):
print("Element: {element}")
class DecodeMsgs(beam.DoFn):
def process(self, element):
print("############Before Decode:element",element)
logging.info("Before Decode: {element}")
return [element]
class PubsubStreamingToBq:
def __init__(self):
pass
def run(self, subscription, pipeline_args=None):
pipeline_options = PipelineOptions(pipeline_args, streaming=True, save_main_session=True)
print("Args: {pipeline_args}", pipeline_args)
with beam.Pipeline(options=pipeline_options) as p:
pubsub_logs = (p | "Read Pubsub Msg" >> beam.io.ReadFromPubSub(subscription=subscription) | "Decoding" >> beam.ParDo(DecodeMsgs()))
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
parser = argparse.ArgumentParser()
parser.add_argument('--subscription', required=True)
known_args, pipeline_args = parser.parse_known_args()
print("Known Args: {known_args}", known_args)
PubsubStreamingToBq_obj = PubsubStreamingToBq()
PubsubStreamingToBq_obj.run(known_args.subscription, pipeline_args)
Could anyone let me know what is the issue?
Using beam version 2.27.0 and Python version 3.6
question from:
https://stackoverflow.com/questions/66050802/beam-pubsub-code-is-struck-and-not-publishing-the-messages 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…