Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
604 views
in Technique[技术] by (71.8m points)

python - 如何在Pyspark RDD中逐行遍历每个项目并将其转换为键? 使用地图功能?(How can I loop over every item a row in Pyspark RDD and turn them into keys? Use map function?)

So, firstly I have some inputs like this:

(所以,首先我有一些像这样的输入:)

A:<phone1,phone2>,<location1>,<email1>
B:<phone1>,<location2>,<email1,email2>

I'd like to use Pyspark.rdd.map()function to loop every time in the row and turn them into key-value pairs like this:

(我想使用Pyspark.rdd.map()函数在行中每次循环,并将它们变成键-值对,如下所示:)

phone1: A:<phone1,phone2>,<location1>,<email1>
phone1: B:<phone1>,<location2>,<email1,email2>
phone2: A:<phone1,phone2>,<location1>,<email1>
location1: A:<phone1,phone2>,<location1>,<email1>
location2: B:<phone1>,<location2>,<email1,email2>
email1: A:<phone1,phone2>,<location1>,<email1>
email1: B:<phone1>,<location2>,<email1,email2>
email2: B:<phone1>,<location2>,<email1,email2>

In my previous attempts, I tried to add a loop onto the lambda function inside of the map function, but it didn't support it.

(在之前的尝试中,我尝试在map函数内部的lambda函数上添加一个循环,但是它不支持该循环。)

Is there any other way?

(还有其他办法吗?)

  ask by GDJi translate from so

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)
    scala> val rdd =  sc.parallelize(Seq("A:<phone1,phone2>,<location1>,<email1>", "B:<phone1>,<location2>,<email1,email2>"))

    scala> rdd.foreach(println)
    A:<phone1,phone2>,<location1>,<email1>
    B:<phone1>,<location2>,<email1,email2>

    scala> case class dataclass(c0:String, c1:String)

    scala> val df = rdd.map(x => x.split(":")).map(y => dataclass(y(0), y(1))).toDF

    scala> df.show(false)
    +---+------------------------------------+
    |c0 |c1                                  |
    +---+------------------------------------+
    |A  |<phone1,phone2>,<location1>,<email1>|
    |B  |<phone1>,<location2>,<email1,email2>|
    +---+------------------------------------+


    scala> val df1 = df.withColumn("tempCol",regexp_replace(regexp_replace(col("c1"), "<", ""),">", ""))
                       .withColumn("tempCol", explode(split(col("tempCol"), ",")))
                       .withColumn("out", concat(col("tempCol"), lit(":"), col("c0"), lit(":"), col("c1")))
                       .drop("c0", "c1", "tempCol")

    scala> df1.show(false)
    +------------------------------------------------+
    |out                                             |
    +------------------------------------------------+
    |phone1:A:<phone1,phone2>,<location1>,<email1>   |
    |phone2:A:<phone1,phone2>,<location1>,<email1>   |
    |location1:A:<phone1,phone2>,<location1>,<email1>|
    |email1:A:<phone1,phone2>,<location1>,<email1>   |
    |phone1:B:<phone1>,<location2>,<email1,email2>   |
    |location2:B:<phone1>,<location2>,<email1,email2>|
    |email1:B:<phone1>,<location2>,<email1,email2>   |
    |email2:B:<phone1>,<location2>,<email1,email2>   |
    +------------------------------------------------+

    scala> val rdd2 = df1.rdd.map(_(0))
    scala> rdd2.foreach(println)
    phone1:A:<phone1,phone2>,<location1>,<email1>
    phone2:A:<phone1,phone2>,<location1>,<email1>
    location1:A:<phone1,phone2>,<location1>,<email1>
    email1:A:<phone1,phone2>,<location1>,<email1>
    phone1:B:<phone1>,<location2>,<email1,email2>
    location2:B:<phone1>,<location2>,<email1,email2>
    email1:B:<phone1>,<location2>,<email1,email2>
    email2:B:<phone1>,<location2>,<email1,email2>

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...