Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
332 views
in Technique[技术] by (71.8m points)

java - Parallel execution of directed acyclic graph of tasks

I have a list of tasks [Task-A,Task-B,Task-C,Task-D, ...].
One task can be optionally dependent on other tasks.

For example:
A can be dependent on 3 tasks: B, C and D
B can be dependent on 2 tasks: C and E

It's basically a directed acyclic graph and execution of a task should happen only after the dependent tasks are executed.

Now it might happen that at any point of time, there are multiple tasks that are ready for execution. In such a case, we can run them in parallel.

Any idea on how to implement such an execution while having as much parallelism as possible?

class Task{
     private String name;
     private List<Task> dependentTasks;
     
     public void run(){
     // business logic
     }
}
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

The other answer works fine but is too complicated.

A simpler way is to just execute Kahn's algorithm but in parallel.

The key is to execute all the tasks in parallel for whom all dependencies have been executed.

import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;


class DependencyManager {
private final ConcurrentHashMap<String, List<String>> _dependencies = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, List<String>> _reverseDependencies = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Runnable> _tasks = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Integer> _numDependenciesExecuted = new ConcurrentHashMap<>();
private final  AtomicInteger _numTasksExecuted = new AtomicInteger(0);
private final ExecutorService _executorService = Executors.newFixedThreadPool(16);

private static Runnable getRunnable(DependencyManager dependencyManager, String taskId){
    return () -> {
    try {
        Thread.sleep(2000);  // A task takes 2 seconds to finish.
        dependencyManager.taskCompleted(taskId);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    };
}

/**
* In case a vertex is disconnected from the rest of the graph.
* @param taskId The task id
*/
public void addVertex(String taskId) {
    _dependencies.putIfAbsent(taskId, new ArrayList<>());
    _reverseDependencies.putIfAbsent(taskId, new ArrayList<>());
    _tasks.putIfAbsent(taskId, getRunnable(this, taskId));
    _numDependenciesExecuted.putIfAbsent(taskId, 0);
}

private void addEdge(String dependentTaskId, String dependeeTaskId) {
    _dependencies.get(dependentTaskId).add(dependeeTaskId);
    _reverseDependencies.get(dependeeTaskId).add(dependentTaskId);
}

public void addDependency(String dependentTaskId, String dependeeTaskId) {
    addVertex(dependentTaskId);
    addVertex(dependeeTaskId);
    addEdge(dependentTaskId, dependeeTaskId);
}

private void taskCompleted(String taskId) {
    System.out.println(String.format("%s:: Task %s done!!", Instant.now(), taskId));
    _numTasksExecuted.incrementAndGet();
    _reverseDependencies.get(taskId).forEach(nextTaskId -> {
        _numDependenciesExecuted.computeIfPresent(nextTaskId, (__, currValue) -> currValue + 1);
        int numDependencies = _dependencies.get(nextTaskId).size();
        int numDependenciesExecuted = _numDependenciesExecuted.get(nextTaskId);
        if (numDependenciesExecuted == numDependencies) {
        // All dependencies have been executed, so we can submit this task to the threadpool. 
            _executorService.submit(_tasks.get(nextTaskId));
        }
        });
    if (_numTasksExecuted.get() == _tasks.size()) {
        topoSortCompleted();
    }
}

private void topoSortCompleted() {
    System.out.println("Topo sort complete!!");
    _executorService.shutdownNow();
}

public void executeTopoSort() {
    System.out.println(String.format("%s:: Topo sort started!!", Instant.now()));
    _dependencies.forEach((taskId, dependencies) -> {
    if (dependencies.isEmpty()) {
        _executorService.submit(_tasks.get(taskId));
    }
    });
}
}

public class TestParallelTopoSort {

public static void main(String[] args) {
    DependencyManager dependencyManager = new DependencyManager();
    dependencyManager.addDependency("8", "5");
    dependencyManager.addDependency("7", "5");
    dependencyManager.addDependency("7", "6");
    dependencyManager.addDependency("6", "3");
    dependencyManager.addDependency("6", "4");
    dependencyManager.addDependency("5", "1");
    dependencyManager.addDependency("5", "2");
    dependencyManager.addDependency("5", "3");
    dependencyManager.addDependency("4", "1");
    dependencyManager.executeTopoSort();
    // Parallel version takes 8 seconds to execute.
    // Serial version would have taken 16 seconds.

}
}

The Directed Acyclic Graph constructed in this example is this:

Directed Acyclic Graph


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...