Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
257 views
in Technique[技术] by (71.8m points)

java - Handling Exceptions for ThreadPoolExecutor

I have the following code snippet that basically scans through the list of task that needs to be executed and each task is then given to the executor for execution.

The JobExecutor in turn creates another executor (for doing db stuff...reading and writing data to queue) and completes the task.

JobExecutor returns a Future<Boolean> for the tasks submitted. When one of the task fails, I want to gracefully interrupt all the threads and shutdown the executor by catching all the exceptions. What changes do I need to do?

public class DataMovingClass {
    private static final AtomicInteger uniqueId = new AtomicInteger(0);

  private static final ThreadLocal<Integer> uniqueNumber = new IDGenerator();   

  ThreadPoolExecutor threadPoolExecutor  = null ;

   private List<Source> sources = new ArrayList<Source>();

    private static class IDGenerator extends ThreadLocal<Integer> {
        @Override
        public Integer get() {
            return uniqueId.incrementAndGet();
        }
  }

  public void init(){

    // load sources list

  }

  public boolean execute() {

    boolean succcess = true ; 
    threadPoolExecutor = new ThreadPoolExecutor(10,10,
                10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1024),
                new ThreadFactory() {
                    public Thread newThread(Runnable r) {
                        Thread t = new Thread(r);
                        t.setName("DataMigration-" + uniqueNumber.get());
                        return t;
                    }// End method
                }, new ThreadPoolExecutor.CallerRunsPolicy());

     List<Future<Boolean>> result = new ArrayList<Future<Boolean>>();

     for (Source source : sources) {
                    result.add(threadPoolExecutor.submit(new JobExecutor(source)));
     }

     for (Future<Boolean> jobDone : result) {
                try {
                    if (!jobDone.get(100000, TimeUnit.SECONDS) && success) {
                        // in case of successful DbWriterClass, we don't need to change
                        // it.
                        success = false;
                    }
                } catch (Exception ex) {
                    // handle exceptions
                }
            }

  }

  public class JobExecutor implements Callable<Boolean>  {

        private ThreadPoolExecutor threadPoolExecutor ;
        Source jobSource ;
        public SourceJobExecutor(Source source) {
            this.jobSource = source;
            threadPoolExecutor = new ThreadPoolExecutor(10,10,10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1024),
                    new ThreadFactory() {
                        public Thread newThread(Runnable r) {
                            Thread t = new Thread(r);
                            t.setName("Job Executor-" + uniqueNumber.get());
                            return t;
                        }// End method
                    }, new ThreadPoolExecutor.CallerRunsPolicy());
        }

        public Boolean call() throws Exception {
            boolean status = true ; 
            System.out.println("Starting Job = " + jobSource.getName());
            try {

                        // do the specified task ; 


            }catch (InterruptedException intrEx) {
                logger.warn("InterruptedException", intrEx);
                status = false ;
            } catch(Exception e) {
                logger.fatal("Exception occurred while executing task "+jobSource.getName(),e);
                status = false ;
            }
           System.out.println("Ending Job = " + jobSource.getName());
            return status ;
        }
    }
}   
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

When you submit a task to the executor, it returns you a FutureTask instance.

FutureTask.get() will re-throw any exception thrown by the task as an ExecutorException.

So when you iterate through the List<Future> and call get on each, catch ExecutorException and invoke an orderly shutdown.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...