Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
405 views
in Technique[技术] by (71.8m points)

python - Dealing with non-uniform JSON columns in spark dataframe

I would like to know what is the best practice for reading a newline delimited JSON file into a dataframe. Critically, one of the (required) fields in each record maps to an object that is not guaranteed to have the same sub-fields (ie the schema is non-uniform across all the records).

For example, an input file might look like:

{"id": 1, "type": "foo", "data": {"key0": "foo", "key2": "meh"}}
{"id": 2, "type": "bar", "data": {"key2": "poo", "key3": "pants"}}
{"id": 3, "type": "baz", "data": {"key3": "moo"}}

In this case the fields id, type, and data will be present in all records, but the struct mapped to by data will have a heterogeneous schema.

I have two approaches for dealing wi the non-uniformity of the data column:

  1. let spark infer the schema:
df = spark.read.options(samplingRatio=1.0).json('s3://bucket/path/to/newline_separated_json.txt')

The obvious issue with this approach is the need to sample every record to determine the super-set of fields/schemas that will be the final schema. This may be prohibitively expensive given a dataset in the low 100s of millions of records? Or...

  1. Tell spark to coerce the data field into a JSON string, and then just have a schema comprised of three top-level string fields, id, type, data. And here I'm not really sure the best way to proceed. For example I am assuming just declaring the data field to be a string as in the following, will not work because it is not explicitly doing the equivalent of json.dumps?
schema = StructType([
    StructField("id", StringType(), true),
    StructField("type", StringType(), true),
    StructField("data", StringType(), true)
])
df = spark.read.json('s3://bucket/path/to/newline_separated_json.txt', schema=schema)

If I want to avoid the cost of scanning the full dataset incurred by option 1, what is the best way to ingest this file and keep the data field as a JSON string?

Thank you

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

I think your attempt and the overall idea is in the right direction. Here are two more approaches based on the build-in options aka get_json_object/from_json via dataframe API and using map transformation along with python's json.dumps() and json.loads() via the RDD API.

Option 1: get_json_object() / from_json()

First let's try with get_json_object() which doesn't require a schema:

import pyspark.sql.functions as f

df = spark.createDataFrame([
  ('{"id": 1, "type": "foo", "data": {"key0": "foo", "key2": "meh"}}'),
  ('{"id": 2, "type": "bar", "data": {"key2": "poo", "key3": "pants"}}'),
  ('{"id": 3, "type": "baz", "data": {"key3": "moo"}}')
], StringType())

df.select(f.get_json_object("value", "$.id").alias("id"), 
          f.get_json_object("value", "$.type").alias("type"), 
           f.get_json_object("value", "$.data").alias("data"))

# +---+----+-----------------------------+
# |id |type|data                         |
# +---+----+-----------------------------+
# |1  |foo |{"key0":"foo","key2":"meh"}  |
# |2  |bar |{"key2":"poo","key3":"pants"}|
# |3  |baz |{"key3":"moo"}               |
# +---+----+-----------------------------+

On the contrary from_json() requires a schema definition:

from pyspark.sql.types import StringType, StructType, StructField
import pyspark.sql.functions as f

df = spark.createDataFrame([
  ('{"id": 1, "type": "foo", "data": {"key0": "foo", "key2": "meh"}}'),
  ('{"id": 2, "type": "bar", "data": {"key2": "poo", "key3": "pants"}}'),
  ('{"id": 3, "type": "baz", "data": {"key3": "moo"}}')
], StringType())

schema = StructType([
    StructField("id", StringType(), True),
    StructField("type", StringType(), True),
    StructField("data", StringType(), True)
])

df.select(f.from_json("value", schema).getItem("id").alias("id"), 
         f.from_json("value", schema).getItem("type").alias("type"), 
         f.from_json("value", schema).getItem("data").alias("data"))

# +---+----+-----------------------------+
# |id |type|data                         |
# +---+----+-----------------------------+
# |1  |foo |{"key0":"foo","key2":"meh"}  |
# |2  |bar |{"key2":"poo","key3":"pants"}|
# |3  |baz |{"key3":"moo"}               |
# +---+----+-----------------------------+

Option 2: map/RDD API + json.dumps()

from pyspark.sql.types import StringType, StructType, StructField
import json

df = spark.createDataFrame([
  '{"id": 1, "type": "foo", "data": {"key0": "foo", "key2": "meh"}}',
  '{"id": 2, "type": "bar", "data": {"key2": "poo", "key3": "pants"}}',
  '{"id": 3, "type": "baz", "data": {"key3": "moo"}}'
], StringType())

def from_json(data):
  row = json.loads(data[0])
  return (row['id'], row['type'], json.dumps(row['data']))

json_rdd = df.rdd.map(from_json)

schema = StructType([
    StructField("id", StringType(), True),
    StructField("type", StringType(), True),
    StructField("data", StringType(), True)
])

spark.createDataFrame(json_rdd, schema).show(10, False)

# +---+----+--------------------------------+
# |id |type|data                            |
# +---+----+--------------------------------+
# |1  |foo |{"key2": "meh", "key0": "foo"}  |
# |2  |bar |{"key2": "poo", "key3": "pants"}|
# |3  |baz |{"key3": "moo"}                 |
# +---+----+--------------------------------+

Function from_json will transform the string row into a tuple of (id, type, data). json.loads() will parse the json string and return a dictionary through which we generate and return the final tuple.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...