Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
1.1k views
in Technique[技术] by (71.8m points)

scala - Error when Spark 2.2.0 standalone mode write Dataframe to local single-node Kafka

The data source is from Databricks Notebook demo:Five Spark SQL Helper Utility Functions to Extract and Explore Complex Data Types!

But when I try these code on my own laptop, I always get errors.

First, load JSON data as DataFrame

res2: org.apache.spark.sql.DataFrame = [battery_level: string, c02_level: string]

scala> res2.show
+-------------+---------+
|battery_level|c02_level|
+-------------+---------+
|            7|      886|
|            5|     1378|
|            8|      917|
|            8|     1504|
|            8|      831|
|            9|     1304|
|            8|     1574|
|            9|     1208|
+-------------+---------+

Second, write data to Kafka:

res2.write 
  .format("kafka") 
  .option("kafka.bootstrap.servers", "localhost:9092") 
  .option("topic", "test") 
  .save()

All of those follows the notebook demo above and official steps

But errors shows:

scala> res2.write 
         .format("kafka") 
         .option("kafka.bootstrap.servers", "localhost:9092") 
         .option("topic", "iot-devices") 
         .save()
org.apache.spark.sql.AnalysisException: Required attribute 'value' not found;
  at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$6.apply(KafkaWriter.scala:72)
  at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$6.apply(KafkaWriter.scala:72)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.sql.kafka010.KafkaWriter$.validateQuery(KafkaWriter.scala:71)
  at org.apache.spark.sql.kafka010.KafkaWriter$.write(KafkaWriter.scala:87)
  at org.apache.spark.sql.kafka010.KafkaSourceProvider.createRelation(KafkaSourceProvider.scala:165)
  at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:472)
  at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
  at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
  at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
  at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:610)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233)
  ... 52 elided

I assumed that it might be the Kafka problem, then I test the DataFrame read from Kafka to ensure the connectivity:

scala> val kaDF = spark.read
         .format("kafka") 
         .option("kafka.bootstrap.servers", "localhost:9092") 
         .option("subscribe", "iot-devices") 
         .load()
kaDF: org.apache.spark.sql.DataFrame = [key: binary, value: binary ... 5 more fields]

scala> kaDF.show
+----+--------------------+-----------+---------+------+--------------------+-------------+
| key|               value|      topic|partition|offset|           timestamp|timestampType|
+----+--------------------+-----------+---------+------+--------------------+-------------+
|null|    [73 73 73 73 73]|iot-devices|        0|     0|2017-09-27 11:11:...|            0|
|null|[64 69 63 6B 20 3...|iot-devices|        0|     1|2017-09-27 11:29:...|            0|
|null|       [78 69 78 69]|iot-devices|        0|     2|2017-09-27 11:29:...|            0|
|null|[31 20 32 20 33 2...|iot-devices|        0|     3|2017-09-27 11:30:...|            0|
+----+--------------------+-----------+---------+------+--------------------+-------------+

So, the result shows that reading data in topic "iot-devices" from Kafka bootstrap.servers localhost:9092 does work.

I searched a lot online, but still can't solve it?

Can Anybody with Spark SQL experience tell me what is wrong in my command?

Thanks!

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

The error message clearly shows the source of the problem:

org.apache.spark.sql.AnalysisException: Required attribute 'value' not found;

The Dataset to be written has to have at least value column (and optionally key and topic) and res2 has only battery_level, c02_level.

You can for example:

import org.apache.spark.sql.functions._

res2.select(to_json(struct($"battery_level", "c02_level")).alias("value"))
  .writeStream
  ...

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...