Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
674 views
in Technique[技术] by (71.8m points)

scala - How to use COGROUP for large datasets

I have two rdd's namely val tab_a: RDD[(String, String)] and val tab_b: RDD[(String, String)] I'm using cogroup for those datasets like:

val tab_c = tab_a.cogroup(tab_b).collect.toArray

val updated = tab_c.map { x =>
  {
 //somecode
  }
}

I'm using tab_c cogrouped values for map function and it works fine for small datasets but in case of huge datasets it throws Out Of Memory exception.

I have tried converting the final value to RDD but no luck same error

val newcos = spark.sparkContext.parallelize(tab_c)

1.How to use Cogroup for large datasets ?

2.Can we persist the cogrouped value ?

Code

 val source_primary_key = source.map(rec => (rec.split(",")(0), rec))
source_primary_key.persist(StorageLevel.DISK_ONLY)

val destination_primary_key = destination.map(rec => (rec.split(",")(0), rec))
destination_primary_key.persist(StorageLevel.DISK_ONLY)

val cos = source_primary_key.cogroup(destination_primary_key).repartition(10).collect()

  var srcmis: Array[String] = new Array[String](0)
var destmis: Array[String] = new Array[String](0)

var extrainsrc: Array[String] = new Array[String](0)
var extraindest: Array[String] = new Array[String](0)

var srcs: String = Seq("")(0)
var destt: String = Seq("")(0)

val updated = cos.map { x =>
  {

    val key = x._1
    val value = x._2

    srcs = value._1.mkString(",")
    destt = value._2.mkString(",")

    if (srcs.equalsIgnoreCase(destt) == false && destt != "") {
      srcmis :+= srcs
      destmis :+= destt

    }

    if (srcs == "") {

      extraindest :+= destt.mkString("")
    }

    if (destt == "") {

      extrainsrc :+= srcs.mkString("")
    }

  }

}

Code Updated:

  val tab_c = tab_a.cogroup(tab_b).filter(x => x._2._1 =!= x => x._2._2)
 // tab_c = {1,Compactbuffer(1,john,US),Compactbuffer(1,john,UK)}
      {2,Compactbuffer(2,john,US),Compactbuffer(2,johnson,UK)}..

ERROR:

 ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerTaskEnd(4,3,ResultTask,FetchFailed(null,0,-1,27,org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:697)
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:693)


ERROR YarnScheduler: Lost executor 8 on datanode1: Container killed by YARN for exceeding memory limits. 1.0 GB of 1020 MB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

Thank you

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

When you use collect() you are basically telling spark to move all the resulting data back to the master node, which can easily produce a bottleneck. You are no longer using Spark at that point, just a plain array in a single machine.

To trigger computation just use something that requires the data at every node, that's why executors live on top of a distributed file system. For instance saveAsTextFile().

Here are some basic examples.

Remember, the entire objective here (that is, if you have big data) is to move the code to your data and compute there, not to bring all the data to the computation.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...