Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
4.4k views
in Technique[技术] by (71.8m points)

scala - Kafka producer: send avro as array[byte] without schema

I am trying to setup a simple kafka stack in local and I am at the point where I need to create a toy Producer. This: https://lombardo-chcg.github.io/tools/2017/09/29/kafka-avro-producer-in-scala.html (see below for the piece of code I'm interested in) is almost exactly what I want except:

Here the producer sends a GenericData.Record object, so the whole schema is sent and it doesn't leverage the schema registry. I want to send an Array[Byte] with the first few bytes being the id of the schema and the following bytes being the data, without the schema (or so I think it is the optimal way to do it)

The piece of code I am talking about:

import java.util.Properties

import org.apache.avro.Schema.Parser
import org.apache.avro.generic.GenericData
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.slf4j.LoggerFactory

case class User(name: String, favoriteNumber: Int, favoriteColor: String)

class AvroProducer {
  val logger = LoggerFactory.getLogger(getClass)

  val kafkaBootstrapServer = sys.env("KAFKA_BOOTSTRAP_SERVER")
  val schemaRegistryUrl = sys.env("SCHEMA_REGISTRY_URL")

  val props = new Properties()
  props.put("bootstrap.servers", kafkaBootstrapServer)
  props.put("schema.registry.url", schemaRegistryUrl)
  props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")
  props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")
  props.put("acks", "1")

  val producer = new KafkaProducer[String, GenericData.Record](props)
  val schemaParser = new Parser

  val key = "key1"
  val valueSchemaJson =
  s"""
    {
      "namespace": "com.avro.junkie",
      "type": "record",
      "name": "User2",
      "fields": [
        {"name": "name", "type": "string"},
        {"name": "favoriteNumber",  "type": "int"},
        {"name": "favoriteColor", "type": "string"}
      ]
    }
  """
  val valueSchemaAvro = schemaParser.parse(valueSchemaJson)
  val avroRecord = new GenericData.Record(valueSchemaAvro)

  val mary = new User("Mary", 840, "Green")
  avroRecord.put("name", mary.name)
  avroRecord.put("favoriteNumber", mary.favoriteNumber)
  avroRecord.put("favoriteColor", mary.favoriteColor)

  def start = {
    try {
      val record = new ProducerRecord("users", key, avroRecord)
      val ack = producer.send(record).get()
      // grabbing the ack and logging for visibility
      logger.info(s"${ack.toString} written to partition ${ack.partition.toString}")
    }
    catch {
      case e: Throwable => logger.error(e.getMessage, e)
    }
  }
}

Problem(s):

  • I don't know how to retrieve the id of the schema from schema-registry
  • I don't know how to send only the data without the schema + the id as Array[Byte]

I know how to write the whole avro to Array[Byte]:

    val writer = new SpecificDatumWriter[GenericData.Record](valueSchemaAvro)
    val out = new ByteArrayOutputStream
    val encoder = EncoderFactory.get.binaryEncoder(out, null)
    writer.write(avroRecord, encoder) // but here I am also writing the schema, right?
    encoder.flush
    out.close
    out.toByteArray

thanks so much


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

The first code does use the Schema Registry, and computes an ID + replaces the schema in the byte array for you within KafkaAvroSerializer

If you want to bypass the Schema Registry, use ByteArraySerializer and send the result of out.toByteArray in the second code block to the producer.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

1.4m articles

1.4m replys

5 comments

57.0k users

...