Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
931 views
in Technique[技术] by (71.8m points)

scala - Spark Dataframe write to kafka topic in avro format?

I have a Dataframe in Spark that looks like

eventDF

   Sno|UserID|TypeExp
    1|JAS123|MOVIE
    2|ASP123|GAMES
    3|JAS123|CLOTHING
    4|DPS123|MOVIE
    5|DPS123|CLOTHING
    6|ASP123|MEDICAL
    7|JAS123|OTH
    8|POQ133|MEDICAL
    .......
    10000|DPS123|OTH

I need to write it to Kafka topic in Avro format currently i am able to write in Kafka as JSON using following code

val kafkaUserDF: DataFrame = eventDF.select(to_json(struct(eventDF.columns.map(column):_*)).alias("value"))
  kafkaUserDF.selectExpr("CAST(value AS STRING)").write.format("kafka")
    .option("kafka.bootstrap.servers", "Host:port")
    .option("topic", "eventdf")
    .save()

Now I want to write this in Avro format to Kafka topic

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Spark >= 2.4:

You can use to_avro function from spark-avro library.

import org.apache.spark.sql.avro._

eventDF.select(
  to_avro(struct(eventDF.columns.map(column):_*)).alias("value")
)

Spark < 2.4

You have to do it the same way:

  • Create a function which writes serialized Avro record to ByteArrayOutputStream and return the result. A naive implementation (this supports only flat objects) could be similar to (adopted from Kafka Avro Scala Example by Sushil Kumar Singh)

    import org.apache.spark.sql.Row
    
    def encode(schema: org.apache.avro.Schema)(row: Row): Array[Byte] = {
      val gr: GenericRecord = new GenericData.Record(schema)
      row.schema.fieldNames.foreach(name => gr.put(name, row.getAs(name)))
    
      val writer = new SpecificDatumWriter[GenericRecord](schema)
      val out = new ByteArrayOutputStream()
      val encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(out, null)
      writer.write(gr, encoder)
      encoder.flush()
      out.close()
    
      out.toByteArray()
    }
    
  • Convert it to udf:

    import org.apache.spark.sql.functions.udf
    
    val schema: org.apache.avro.Schema
    val encodeUDF = udf(encode(schema) _)
    
  • Use it as drop in replacement for to_json

    eventDF.select(
      encodeUDF(struct(eventDF.columns.map(column):_*)).alias("value")
    )
    

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...