JSON sources are not very well suited for data with evolving schema (how about Avro or Parquet instead) but the simple solution is to use the same schema for all sources and make new fields optional / nullable:
import org.apache.spark.sql.types.{StructType, StructField, LongType}
val schema = StructType(Seq(
StructField("A", StructType(Seq(
StructField("B", LongType, true),
StructField("D", LongType, true)
)), true),
StructField("C", LongType, true)))
You can pass schema
like this to DataFrameReader
:
val rddV1 = sc.parallelize(Seq("{ "A": {"B": 1 } }"))
val df1 = sqlContext.read.schema(schema).json(rddV1)
val rddV2 = sc.parallelize(Seq("{ "A": {"B": 1 }, "C": 2 }"))
val df2 = sqlContext.read.schema(schema).json(rddV2)
val rddV3 = sc.parallelize(Seq("{ "A": {"B": 1, "D": 3 }, "C": 2 }"))
val df3 = sqlContext.read.schema(schema).json(rddV3)
and you'll get a consistent structure independent of a variant:
require(df1.schema == df2.schema && df2.schema == df3.schema)
with missing columns automatically set to null
:
df1.printSchema
// root
// |-- A: struct (nullable = true)
// | |-- B: long (nullable = true)
// | |-- D: long (nullable = true)
// |-- C: long (nullable = true)
df1.show
// +--------+----+
// | A| C|
// +--------+----+
// |[1,null]|null|
// +--------+----+
df2.show
// +--------+---+
// | A| C|
// +--------+---+
// |[1,null]| 2|
// +--------+---+
df3.show
// +-----+---+
// | A| C|
// +-----+---+
// |[1,3]| 2|
// +-----+---+
Note:
This solutions is data source dependent. It may or may not work with other sources, or even result in malformed records.
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…