Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
675 views
in Technique[技术] by (71.8m points)

scala - how to create EdgeRDD from data frame in Spark

I have a dataframe in spark. Each row represents a person and I want to retrieve possible connections among them. The rule to have a link is that, for each possible pair, if they have the same prop1:String and the absolute difference of prop2:Int is < 5 then the link exists. I am trying to understand the best way to accomplish this task working with data frame.

I am trying to retrieve indexed RDDs:

val idusers = people.select("ID")
                     .rdd
                     .map(r => r(0).asInstanceOf[Int])
                     .zipWithIndex
val prop1users = people.select("ID")
                        .rdd
                        .map(r => (r(0).asInstanceOf[Int], r(1).asInstanceOf[String]))
val prop2users = people.select("ID")
                        .rdd
                        .map(r => (r(0).asInstanceOf[Int], r(2).asInstanceOf[Int]))

then start removing duplicates like:

var links = idusers
            .join(idusers)
            .filter{ case (v1, v2) => v2._1 != v2._2 }

but then I got stuck to check for prop1... anyway, is there a way to accomplish all these steps just using data frame?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Suppose you have something like that:

val sqlc : SQLContext = ???

case class Person(id: Long, country: String, age: Int)

val testPeople = Seq(
  Person(1, "Romania"    , 15),
  Person(2, "New Zealand", 30),
  Person(3, "Romania"    , 17),
  Person(4, "Iceland"    , 20),
  Person(5, "Romania"    , 40),
  Person(6, "Romania"    , 44),
  Person(7, "Romania"    , 45),
  Person(8, "Iceland"    , 21),
  Person(9, "Iceland"    , 22)
)

val people = sqlc.createDataFrame(testPeople)

You can create first self miracle with columns renamed to avoid column-clashed in self-join:

val peopleR = people
  .withColumnRenamed("id"     , "idR")
  .withColumnRenamed("country", "countryR")
  .withColumnRenamed("age"    , "ageR")

Now you can join dataframe with self, dropping swapped pairs and loop-edges:

import org.apache.spark.sql.functions._

val relations = people.join(peopleR,
      (people("id") < peopleR("idR")) &&
        (people("country") === peopleR("countryR")) &&
        (abs(people("age") - peopleR("ageR")) < 5))

Finally you can build desired EdgeRDD:

import org.apache.spark.graphx._

val edges = EdgeRDD.fromEdges(relations.map(row => Edge(
      row.getAs[Long]("id"), row.getAs[Long]("idR"), ())))

relations.show() will now output:

+---+-------+---+---+--------+----+
| id|country|age|idR|countryR|ageR|
+---+-------+---+---+--------+----+
|  1|Romania| 15|  3| Romania|  17|
|  4|Iceland| 20|  8| Iceland|  21|
|  4|Iceland| 20|  9| Iceland|  22|
|  5|Romania| 40|  6| Romania|  44|
|  6|Romania| 44|  7| Romania|  45|
|  8|Iceland| 21|  9| Iceland|  22|
+---+-------+---+---+--------+----+

and edges.toLocalIterator.foreach(println) will output:

Edge(1,3,())
Edge(4,8,())
Edge(4,9,())
Edge(5,6,())
Edge(6,7,())
Edge(8,9,())

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...