Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
732 views
in Technique[技术] by (71.8m points)

scala - How to perform one operation on each executor once in spark

I have a weka model stored in S3 which is of size around 400MB. Now, I have some set of record on which I want to run the model and perform prediction.

For performing prediction, What I have tried is,

  1. Download and load the model on driver as a static object , broadcast it to all executors. Perform a map operation on prediction RDD. ----> Not working, as in Weka for performing prediction, model object needs to be modified and broadcast require a read-only copy.

  2. Download and load the model on driver as a static object and send it to executor in each map operation. -----> Working (Not efficient, as in each map operation, i am passing 400MB object)

  3. Download the model on driver and load it on each executor and cache it there. (Don't know how to do that)

Does someone have any idea how can I load the model on each executor once and cache it so that for other records I don't load it again?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

You have two options:

1. Create a singleton object with a lazy val representing the data:

    object WekaModel {
        lazy val data = {
            // initialize data here. This will only happen once per JVM process
        }
    }       

Then, you can use the lazy val in your map function. The lazy val ensures that each worker JVM initializes their own instance of the data. No serialization or broadcasts will be performed for data.

    elementsRDD.map { element =>
        // use WekaModel.data here
    }

Advantages

  • is more efficient, as it allows you to initialize your data once per JVM instance. This approach is a good choice when needing to initialize a database connection pool for example.

Disadvantages

  • Less control over initialization. For example, it's trickier to initialize your object if you require runtime parameters.
  • You can't really free up or release the object if you need to. Usually, that's acceptable, since the OS will free up the resources when the process exits.

2. Use the mapPartition (or foreachPartition) method on the RDD instead of just map.

This allows you to initialize whatever you need for the entire partition.

    elementsRDD.mapPartition { elements =>
        val model = new WekaModel()

        elements.map { element =>
            // use model and element. there is a single instance of model per partition.
        }
    }

Advantages:

  • Provides more flexibility in the initialization and deinitialization of objects.

Disadvantages

  • Each partition will create and initialize a new instance of your object. Depending on how many partitions you have per JVM instance, it may or may not be an issue.

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...