Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
338 views
in Technique[技术] by (71.8m points)

serialization - Issue while storing data from Spark-Streaming to Cassandra

SparkStreaming context reading a stream from RabbitMQ with an interval of 30 seconds. I want to modify the values of few columns of corresponding rows existing in cassandra and then want to store data back to Cassandra. For that i need to check whether the row for the particular primary key exist in Cassandra or not if, yes, fetch it and do the necessary operation. But the problem is, i create the StreamingContext on the driver and actions get performed on Worker. So, they are not able to get the StreamingContext object reason being it wasn't serialized and sent to workers and i get this error : java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext. I also know that we cannot access the StreamingContext inside foreachRDD. But, How do i achieve the same functionality here without getting serialization error?

I have looked at fews examples here but it didn't help.

Here is the snippet of the code :

   val ssc = new StreamingContext(sparkConf,30)
    val receiverStream = RabbitMQUtils.createStream(ssc, rabbitParams)
    receiverStream.start()      
    val lines = receiverStream.map(EventData.fromString(_))
    lines.foreachRDD{ x => if (x.toLocalIterator.nonEmpty) {
                x.foreachPartition { it => for (tuple <- it) { 
                val cookieid  = tuple.cookieid                
                val sessionid = tuple.sessionid              
                val logdate = tuple.logdate
                val EventRows =  ssc.cassandraTable("SparkTest", CassandraTable).select("*")
                .where("cookieid = '" + cookieid + "' and logdate = '" + logdate+ "' and sessionid = '" + sessionid + "')

                   Somelogic Whether row exist or not for Cookieid

                }  } }
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

The SparkContext cannot be serialized and passed across multiple workers in possibly different nodes. If you need to do something like this you could use forEachPartiion, mapPartitons. Else do this withing your function that gets passed around

 CassandraConnector(SparkWriter.conf).withSessionDo { session =>
  ....
    session.executeAsync(<CQL Statement>)

and in the SparkConf you need to give the Cassandra details

  val conf = new SparkConf()
    .setAppName("test")
    .set("spark.ui.enabled", "true")
    .set("spark.executor.memory", "8g")
    //  .set("spark.executor.core", "4")
    .set("spark.eventLog.enabled", "true")
    .set("spark.eventLog.dir", "/ephemeral/spark-events")
    //to avoid disk space issues - default is /tmp
    .set("spark.local.dir", "/ephemeral/spark-scratch")
    .set("spark.cleaner.ttl", "10000")
    .set("spark.cassandra.connection.host", cassandraip)
    .setMaster("spark://10.255.49.238:7077")

The Java CSCParser is a library that is not serializable. So Spark cannot send it possibly different nodes if you call map or forEach on the RDD. One workaround is using mapPartion, in which case one full Parition will be executed in one SparkNode. Hence it need not serialize for each call.Example

val rdd_inital_parse = rdd.mapPartitions(pLines).

 def pLines(lines: Iterator[String]) = {
    val parser = new CSVParser() ---> Cannot be serialized, will fail if using rdd.map(pLines)
    lines.map(x => parseCSVLine(x, parser.parseLine))
  }

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...