Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
532 views
in Technique[技术] by (71.8m points)

apache spark - How to add a SparkListener from pySpark in Python?

I want to create a Jupyter/IPython extension to monitor Apache Spark Jobs.

Spark provides a REST API.

However instead of polling the server, I want the event updates to be sent through callbacks.

I am trying to register a SparkListener with the SparkContext.addSparkListener(). This feature is not available in the PySpark SparkContext object in Python. So how can I register a python listener to Scala/Java version of the context from Python. Is it possible to do this through py4j? I want python functions to be called when the events fire in the listener.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

It is possible although it is a bit involved. We can use Py4j callback mechanism to pass message from a SparkListener. First lets create a Scala package with all required classes. Directory structure:

.
├── build.sbt
└── src
    └── main
        └── scala
            └── net
                └── zero323
                    └── spark
                        └── examples
                            └── listener
                                ├── Listener.scala
                                ├── Manager.scala
                                └── TaskListener.scala

build.sbt:

name := "listener"

organization := "net.zero323"

scalaVersion := "2.11.7"

val sparkVersion = "2.1.0"

libraryDependencies ++= List(
  "org.apache.spark" %% "spark-core" % sparkVersion,
  "net.sf.py4j" % "py4j" % "0.10.4"  // Just for the record
)

Listener.scala defines a Python interface we are going to implement later

package net.zero323.spark.examples.listener

/* You can add arbitrary methods here, 
 * as long as these match corresponding Python interface 
 */
trait Listener {
  /* This will be implemented by a Python class.
   * You can of course use more specific types, 
   * for example here String => Unit */
  def notify(x: Any): Any
}

Manager.scala will be used to forward messages to Python listener:

package net.zero323.spark.examples.listener

object Manager {
  var listeners: Map[String, Listener] = Map()

  def register(listener: Listener): String = {
    this.synchronized {
      val uuid = java.util.UUID.randomUUID().toString
      listeners = listeners + (uuid -> listener)
      uuid
    }
  }

  def unregister(uuid: String) = {
    this.synchronized {
      listeners = listeners - uuid
    }
  }

  def notifyAll(message: String): Unit = {
    for { (_, listener) <- listeners } listener.notify(message)
  }

}

Finally a simple SparkListener:

package net.zero323.spark.examples.listener

import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._

/* A simple listener which captures SparkListenerTaskEnd,
 * extracts numbers of records written by the task
 * and converts to JSON. You can of course add handlers 
 * for other events as well.
 */
class PythonNotifyListener extends SparkListener { 
  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
    val recordsWritten = taskEnd.taskMetrics.outputMetrics.recordsWritten
    val message = compact(render(
      ("recordsWritten" ->  recordsWritten)
    ))
    Manager.notifyAll(message)
  }
}

Lets' package our extension:

sbt package

and start PySpark session adding a generated jar to the class path and registering listener:

 $SPARK_HOME/bin/pyspark 
   --driver-class-path target/scala-2.11/listener_2.11-0.1-SNAPSHOT.jar 
   --conf spark.extraListeners=net.zero323.spark.examples.listener.PythonNotifyListener

Next we have to define a Python object which implements Listener interface:

class PythonListener(object):
    package = "net.zero323.spark.examples.listener"

    @staticmethod
    def get_manager():
        jvm = SparkContext.getOrCreate()._jvm
        manager = getattr(jvm, "{}.{}".format(PythonListener.package, "Manager"))
        return manager

    def __init__(self):
        self.uuid = None

    def notify(self, obj):
        """This method is required by Scala Listener interface
        we defined above.
        """
        print(obj)

    def register(self):
        manager = PythonListener.get_manager()
        self.uuid = manager.register(self)
        return self.uuid

    def unregister(self):
        manager =  PythonListener.get_manager()
        manager.unregister(self.uuid)
        self.uuid = None

    class Java:
        implements = ["net.zero323.spark.examples.listener.Listener"]

start callback server:

sc._gateway.start_callback_server()

create and register listener:

listener = PythonListener()

register it:

listener.register()

and test:

>>> sc.parallelize(range(100), 3).saveAsTextFile("/tmp/listener_test")
{"recordsWritten":33}
{"recordsWritten":34}
{"recordsWritten":33}

On exit you should shutdown the callback server:

sc._gateway.shutdown_callback_server()

Note:

This should be used with caution when working with Spark streaming, which internally uses callback server.

Edit:

If this is to much hassle you could just define org.apache.spark.scheduler.SparkListenerInterface:

class SparkListener(object):
    def onApplicationEnd(self, applicationEnd):
        pass
    def onApplicationStart(self, applicationStart):
        pass
    def onBlockManagerRemoved(self, blockManagerRemoved):
        pass
    def onBlockUpdated(self, blockUpdated):
        pass
    def onEnvironmentUpdate(self, environmentUpdate):
        pass
    def onExecutorAdded(self, executorAdded):
        pass
    def onExecutorMetricsUpdate(self, executorMetricsUpdate):
        pass
    def onExecutorRemoved(self, executorRemoved):
        pass
    def onJobEnd(self, jobEnd):
        pass
    def onJobStart(self, jobStart):
        pass
    def onOtherEvent(self, event):
        pass
    def onStageCompleted(self, stageCompleted):
        pass
    def onStageSubmitted(self, stageSubmitted):
        pass
    def onTaskEnd(self, taskEnd):
        pass
    def onTaskGettingResult(self, taskGettingResult):
        pass
    def onTaskStart(self, taskStart):
        pass
    def onUnpersistRDD(self, unpersistRDD):
        pass
    class Java:
        implements = ["org.apache.spark.scheduler.SparkListenerInterface"]

extend it:

class TaskEndListener(SparkListener):
    def onTaskEnd(self, taskEnd):
        print(taskEnd.toString())

and use it directly:

>>> sc._gateway.start_callback_server()
True
>>> listener = TaskEndListener()
>>> sc._jsc.sc().addSparkListener(listener)
>>> sc.parallelize(range(100), 3).saveAsTextFile("/tmp/listener_test_simple")
SparkListenerTaskEnd(0,0,ResultTask,Success,org.apache.spark.scheduler.TaskInfo@9e7514a,org.apache.spark.executor.TaskMetrics@51b8ba92)
SparkListenerTaskEnd(0,0,ResultTask,Success,org.apache.spark.scheduler.TaskInfo@71278a44,org.apache.spark.executor.TaskMetrics@bdc06d)
SparkListenerTaskEnd(0,0,ResultTask,Success,org.apache.spark.scheduler.TaskInfo@336)

While simpler, this method is not selective (more traffic between JVM and Python) requires handling Java objects inside Python session.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...