Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
742 views
in Technique[技术] by (71.8m points)

scala - Function returns an empty List in Spark

Below is code for getting list of file Names in a zipped file

def getListOfFilesInRepo(zipFileRDD : RDD[(String,PortableDataStream)]) : (List[String]) = {
    val zipInputStream = zipFileRDD.values.map(x => new ZipInputStream(x.open))
    val filesInZip =  new ArrayBuffer[String]()
    var ze : Option[ZipEntry] = None
    zipInputStream.foreach(stream =>{
      do{
        ze = Option(stream.getNextEntry);
        ze.foreach{ze =>
          if(ze.getName.endsWith("java") && !ze.isDirectory()){
            var fileName:String = ze.getName.substring(ze.getName.lastIndexOf("/")+1,ze.getName.indexOf(".java"))
            filesInZip += fileName
          }
        }
        stream.closeEntry()
      } while(ze.isDefined)
      println(filesInZip.toList.length) // print 889 (correct)
    })
    println(filesInZip.toList.length) // print 0 (WHY..?)
    (filesInZip.toList)
  }

I execute above code in the following manner :

scala> val zipFileRDD = sc.binaryFiles("./handsOn/repo~apache~storm~14135470~false~Java~master~2210.zip")
zipFileRDD: org.apache.spark.rdd.RDD[(String, org.apache.spark.input.PortableDataStream)] = ./handsOn/repo~apache~storm~14135470~false~Java~master~2210.zip BinaryFileRDD[17] at binaryFiles at <console>:25

scala> getListOfFilesInRepo(zipRDD)
889
0
res12: List[String] = List()

Why i am not getting 889 and instead getting 0?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

It happens because filesInZip is not shared between workers. foreach operates on a local copy of filesInZip and when it finishes this copy is simply discarded and garbage collected. If you want to keep the results you should use transformation (most likely a flatMap) and return collected aggregated values.

def listFiles(stream: PortableDataStream): TraversableOnce[String] = ???

zipInputStream.flatMap(listFiles)

You can learn more from Understanding closures


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...