Following will work with Spark 2.0. You can use map function available since 2.0 release to get columns as Map.
val df1 = df.groupBy(col("school_name")).agg(collect_list(map($"name",$"age")) as "map")
df1.show(false)
This will give you below output.
+-----------+------------------------------------+
|school_name|map |
+-----------+------------------------------------+
|school B |[Map(cathy -> 10), Map(shaun -> 5)] |
|school A |[Map(michael -> 7), Map(emily -> 5)]|
+-----------+------------------------------------+
Now you can use UDF
to join individual Maps into single Map like below.
import org.apache.spark.sql.functions.udf
val joinMap = udf { values: Seq[Map[String,Int]] => values.flatten.toMap }
val df2 = df1.withColumn("map", joinMap(col("map")))
df2.show(false)
This will give required output with Map[String,Int]
.
+-----------+-----------------------------+
|school_name|map |
+-----------+-----------------------------+
|school B |Map(cathy -> 10, shaun -> 5) |
|school A |Map(michael -> 7, emily -> 5)|
+-----------+-----------------------------+
If you want to convert a column value into JSON String then Spark 2.1.0 has introduced to_json function.
val df3 = df2.withColumn("map",to_json(struct($"map")))
df3.show(false)
The to_json
function will return following output.
+-----------+-------------------------------+
|school_name|map |
+-----------+-------------------------------+
|school B |{"map":{"cathy":10,"shaun":5}} |
|school A |{"map":{"michael":7,"emily":5}}|
+-----------+-------------------------------+
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…