Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
536 views
in Technique[技术] by (71.8m points)

python - Dataframe pyspark to dict

I have this dataframe path_df:

path_df.show()
+---------------+-------------+----+
|FromComponentID|ToComponentID|Cost|
+---------------+-------------+----+
|            160|          163|27.0|
|            160|          183|27.0|
|            161|          162|22.0|
|            161|          170|31.0|
|            162|          161|22.0|
|            162|          167|24.0|
|            163|          160|27.0|
|            163|          164|27.0|
|            164|          163|27.0|
|            164|          165|35.0|
|            165|          164|35.0|
|            165|          166|33.0|
|            166|          165|33.0|
|            166|          167|31.0|
|            167|          162|24.0|
|            167|          166|31.0|
|            167|          168|27.0|
|            168|          167|27.0|
|            168|          169|23.0|
|            169|          168|23.0|
+---------------+-------------+----+
only showing top 20 rows

From this, I want to make a dictionnary, as follow: {FromComponentID:{ToComponentID:Cost}}

For my current data, it would be:

{160 : {163 : 27,
        183 : 27},
 161 : {162 : 22,
        170 : 31},
 162 : {161 : 22
        167 : 24},
 ...
 167 : {162 : 24,
        166 : 31,
        168 : 27}
 168 : {167 : 27,
        169 : 23},
 169 : {168 : 23}
}

Can I do that using only PySpark and how ? Or maybe it's better to extract my data and process them directly with python.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

You can do all of this with dataframe transformations and udfs. The only slightly annoying thing is that, because you technically have two different types of dictionaries (one where key=integer and value=dictionary, the other where key=integer value=float), you will have to define two udfs with different datatypes. Here is one possible way to do this:

from pyspark.sql.functions import udf,collect_list,create_map
from pyspark.sql.types import MapType,IntegerType,FloatType

data = [[160,163,27.0],[160,183,27.0],[161,162,22.0],
      [161,170,31.0],[162,161,22.0],[162,167,24.0],
      [163,160,27.0],[163,164,27.0],[164,163,27.0],
      [164,165,35.0],[165,164,35.0],[165,166,33.0],
      [166,165,33.0],[166,167,31.0],[167,162,24.0],
      [167,166,31.0],[167,168,27.0],[168,167,27.0],
      [168,169,23.0],[169,168,23.0]]

cols = ['FromComponentID','ToComponentID','Cost']
df = spark.createDataFrame(data,cols)

combineMap = udf(lambda maps: {key:f[key] for f in maps for key in f},
             MapType(IntegerType(),FloatType()))

combineDeepMap = udf(lambda maps: {key:f[key] for f in maps for key in f},
             MapType(IntegerType(),MapType(IntegerType(),FloatType())))

mapdf = df.groupBy('FromComponentID')
.agg(collect_list(create_map('ToComponentID','Cost')).alias('maps'))
.agg(combineDeepMap(collect_list(create_map('FromComponentID',combineMap('maps')))))

result_dict = mapdf.collect()[0][0]

For a large dataset, this should offer some performance boosts over a solution that requires the data to be collected onto a single node. But since spark still has to serialize the udf, there won't be huge gains over an rdd based solution.


Update:

An rdd solution is a lot more compact but, in my opinion, it is not as clean. This is because pyspark doesn't store large dictionaries as rdds very easily. The solution is to store it as a distributed list of tuples and then convert it to a dictionary when you collect it to a single node. Here is one possible solution:

maprdd = df.rdd.groupBy(lambda x:x[0]).map(lambda x:(x[0],{y[1]:y[2] for y in x[1]}))
result_dict = dict(maprdd.collect()) 

Again, this should offer performance boosts over a pure python implementation on single node, and it might not be that different than the dataframe implementation, but my expectation is that the dataframe version will be more performant.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...