Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
888 views
in Technique[技术] by (71.8m points)

scala - Mockito's mock throw ClassNotFoundException in Spark application

I found that mock object in Mockito would throw ClassNotFoundException when used in Spark. Here is a minimal example:

import org.apache.spark.{SparkConf, SparkContext}
import org.mockito.{Matchers, Mockito}
import org.scalatest.FlatSpec
import org.scalatest.mockito.MockitoSugar

trait MyTrait {
  def myMethod(a: Int): Int
}

class MyTraitTest extends FlatSpec with MockitoSugar {
  "Mock" should "work in Spark" in {
    val m = mock[MyTrait](Mockito.withSettings().serializable())
    Mockito.when(m.myMethod(Matchers.any())).thenReturn(1)

    val conf = new SparkConf().setAppName("testApp").setMaster("local")
    val sc = new SparkContext(conf)

    assert(sc.makeRDD(Seq(1, 2, 3)).map(m.myMethod).first() == 1)
  }
}

which would throw the following exception:

[info] MyTraitTest:
[info] Mock
[info] - should work in Spark *** FAILED ***
[info]   org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.ClassNotFoundException: MyTrait$$EnhancerByMockitoWithCGLIB$$6d9e95a8
[info]  at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
[info]  at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
[info]  at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
[info]  at java.lang.Class.forName0(Native Method)
[info]  at java.lang.Class.forName(Class.java:348)
[info]  at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
[info]  at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1819)
[info]  at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
[info]  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1986)
[info]  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
[info]  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
[info]  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
[info]  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
[info]  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
[info]  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
[info]  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
[info]  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
[info]  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
[info]  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
[info]  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
[info]  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
[info]  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
[info]  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
[info]  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
[info]  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
[info]  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
[info]  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
[info]  at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
[info]  at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
[info]  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:80)
[info]  at org.apache.spark.scheduler.Task.run(Task.scala:99)
[info]  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
[info]  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
[info]  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
[info]  at java.lang.Thread.run(Thread.java:745)

The stacktrace hints this is related to dynamic class loading, but I don't know how to fix it.

Update: Apparently, change

val m = mock[MyTrait](Mockito.withSettings().serializable())

to

val m = mock[MyTrait](Mockito.withSettings().serializable(SerializableMode.ACROSS_CLASSLOADERS))

makes exception disappear. However I am not following why this fix is necessary. I thought in spark local mode, a single JVM is running that hosts both driver and executor. So it must be a different ClassLoader is used to load the deserialized class on executor?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)
Waitting for answers

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...