Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
159 views
in Technique[技术] by (71.8m points)

python - pySpark Best alternative for using Spark SQL/DF withing a UDF?

I'm stuck in a process where I need to perform some action for each column value in my Dataframe which requires traversing through the DF again. Following is a data sample:

Row(user_id='KxGeqg5ccByhaZfQRI4Nnw', gender='male', business_id='S75Lf-Q3bCCckQ3w7mSN2g', friends='my4q3Sy6Ei45V58N2l8VGw, R3zl9VKw63rPxSfBxbasWw, c-c64rURhBR8V8scSbwo7Q, tn6qogrDbb9hEKfRBGUUpw, pu_AQig2fw40PshvtgONPQ, IDrgtQccPN9c4rBn7yyk4Q, OIIx11vTeLN8EBcZrYXHKQ')

friends is here is just a list of other user_id. What I'm trying to do is fetch some value for each of this friends for this specific user. Now, since this is user_id I'd need to query my DF for this, which isn't allowed in UDF. I'm neither able to perform spark.sql nor refer a Dataframe and perform a filter since both are sparkSession objects.

What different approach can I try here?

Trying by creating a DF and then filtering:

tempDF=sparkSession.sql("SELECT review_sentiment,user_id,business_id FROM events")

def getfriendsSentiment(friendsList, b_id):

  listOfSentiments=[]
  for friend_id in friendsList.split(','):
    try:
      listOfSentiments.append(tempDF.filter("user_id='"+friend_id+"' AND business_id='"+b_id+"'").rdd.flatMap(lambda x:x).collect()[0])
    except:
      pass

friendsSentiment = udf(getfriendsSentiment, StringType())
businessReviewUserDfWithFriends=businessReviewUserDfWithFriends.withColumn('friendsSentimentToBusiness', friendsSentiment('friends','business_id'))

Error:

py4j.Py4JException: Method __getstate__([]) does not exist

Trying by creating a Table and querying it:

sparkSession.sql("CREATE TABLE events USING DELTA LOCATION '/delta/events/'")

def getfriendsSentiment(friendsList, b_id):

  listOfSentiments=[]
  for friend_id in friendsList.split(','):
    try:
       listOfSentiments.append(spark.sql("SELECT review_sentiment FROM events WHERE user_id='"+friend_id+"' AND business_id='"+b_id+"' GROUP BY review_sentiment ORDER BY COUNT(review_sentiment) DESC LIMIT 1").rdd.flatMap(lambda x:x).collect()[0])
    except:
      pass

Error:

PicklingError: Could not serialize object: Exception: It appears that you are attempting........

What can I do to get around this?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

You are not allowed to use SparkSession/DataFrame objects in UDFS.

The solution I think that will work here is to explode every row by friends then do join (friend.id==user.id&&friend.business_id==user.business_id).

Second solution is possible (if the events table will fit into your memory), is to collect your event table at the start, and then broadcast it to all executors. Then, you can use your data in the UDF. It can be done only if the events is a small table and fits into your memory.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...