Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
327 views
in Technique[技术] by (71.8m points)

python - Filter based on another RDD in Spark

I would like to keep only the employees which does have a departement ID referenced in the second table.

Employee table
LastName    DepartmentID
Rafferty    31
Jones   33
Heisenberg  33
Robinson    34
Smith   34

Department table
DepartmentID
31  
33  

I have tried the following code which does not work:

employee = [['Raffery',31], ['Jones',33], ['Heisenberg',33], ['Robinson',34], ['Smith',34]]
department = [31,33]
employee = sc.parallelize(employee)
department = sc.parallelize(department)
employee.filter(lambda e: e[1] in department).collect()

Py4JError: An error occurred while calling o344.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist

Any ideas? I am using Spark 1.1.0 with Python. However, I would accept a Scala or Python answer.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

In this case, what you would like to achieve is to filter at each partition with the data contained in the department table: This would be the basic solution:

val dept = deptRdd.collect.toSet
val employeesWithValidDeptRdd = employeesRdd.filter{case (employee, d) => dept.contains(d)}

If your department data is large, a broadcast variable will improve performance by delivering the data once to all the nodes instead of having to serialize it with each task

val deptBC = sc.broadcast(deptRdd.collect.toSet)
val employeesWithValidDeptRdd = employeesRdd.filter{case (employee, d) => deptBC.value.contains(d)}

Although using join would work, it's a very expensive solution as it will require a distributed shuffle of the data (byKey) to achieve the join. Given that the requirement is a simple filter, sending the data to each partition (as shown above) will provide much better performance.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...