ForkJoinPool in Java
-
Last Updated: July 25, 2024
-
By: javahandson
-
Series
In this article, we will learn what is ForkJoinPool in Java. We will also know when to use ForkJoinPool, its features, and different methods.
ForkJoinPool is a class that was introduced as part of Java 7. It is very similar to Executor Service.
As we know executor service is about having a thread pool. Tasks are submitted to the executor service and internally these tasks are added to a data structure i.e. a blocking queue. The tasks are picked from this queue based on the availability of the threads in the pool. Once the thread is available it picks the task from the queue and executes it.
The task submitted can be runnable or callable. If the task is of callable type then a future object will be returned that will contain the resultant output. If you want to get more information on the executor service then please check this article https://javahandson.com/executorservice-in-java/.
ForkJoinPool performs the same action that the executor service performs. But how is it different?
It is different from executor service in 2 ways:
Fork Join means Split and Join. Here an individual task will be split into multiple tasks. The Fork Join pool is optimized for problems that can produce more sub-tasks in other words Fork Join pool is particularly useful for recursive tasks that can be split into smaller tasks, such as in divide-and-conquer algorithms.
The steps for fork-join will be like below:
a. Split a task t into smaller tasks.
b. Solve sub-tasks individually to produce individual results.
c. Wait for all tasks to complete.
d. Join all individual results to provide a single overall result.
e. Return result.
As we know we have a blocking queue that contains tasks and these tasks will be picked by threads based on the availability of the threads in the pool. If Thread1 picks Task1 then Task1 will be split into multiple subtasks and these subtasks will be stored in a deque that is specific to that thread.
Note* A Single thread will have its deque i.e. double-ended queue. that contains the subtasks.
Advantages of queue per thread:
a. Thread keeps picking tasks from its deque. It doesn’t have to pick tasks from the external blocking queue.
b. Scheduling of subtasks will be easier because only a single thread is executing all the subtasks. A thread might be using a core so it will be always busy executing all the subtasks.
c. No need to worry about synchronization as a thread is picking tasks from its deque i.e. No blocking issue ( unless during stealing )
It is a mechanism in which one worker thread can steal the tasks from the queue of another worker thread that is overloaded with subtasks.
Say there are 2 threads Thread1 and Thread2. Thread1 is working on Task1 and Thread2 is working on Task2. Thread1 splits the Task1 into 10 subtasks and Thread2 splits the Task2 into 5 subtasks.
Now Thread2 completes the execution of all the 5 subtasks but Thread1 has completed only 6 subtasks so 4 subtasks are still left. Now Thread2 checks if the tasks are available in the main external blocking queue if there are no tasks left in the main blocking queue then Thread-2 will steal the tasks from Thread-1.
As threads will be using a deque i.e. a double-ended queue hence in the above scenario Thread1 will pick the subtasks from the front of its dequeue whereas Thread2 will steal the subtasks from the rear end of Thread1 dequeue.
ForkJoinPool class provides a set of constructors that help to instantiate the ForkJoinPool.
1. ForkJoinPool()
This is one of the simplest constructors that takes no arguments and creates a pool with as many threads as there are processors returned by Runtime.availableProcessors().
2. ForkJoinPool(int parallelism)
This constructor creates a ForkJoinPool instance with a specified parallelism level. The parallelism argument refers to the maximum number of threads that are actively executing tasks, not waiting to join. If additional tasks are submitted when all threads are active, they will wait in queue until a thread becomes available.
However, there can be situations when the actual number of threads may exceed parallelism due to unjoined tasks.
For ex. We all know about fork and join. Forking means splitting it into smaller tasks that can be executed in parallel and joining is the act of waiting for a forked task to complete and recombining the results. But it can happen like one of the forked tasks has not been executed and the parent task is waiting for all the subtasks to be combined. Such forked tasks are also known as unjoined tasks.
If there are unjoined tasks, the ForkJoinPool might create additional threads to make sure those tasks get executed. This is to prevent a situation where all threads in the pool are busy waiting for other tasks to complete, while there are still tasks that could be executed. So, the pool can exceed the target parallelism level to handle these unjoined tasks and ensure progress is made. Once those tasks are handled, the number of threads should decrease back to the target level, assuming no other tasks are being submitted.
3. ForkJoinPool(int parallelism, ForkJoinPool.ForkJoinWorkerThreadFactory factory, Thread.UncaughtExceptionHandler handler, boolean asyncMode)
This constructor creates a ForkJoinPool instance and it gives more control to the developer. Below are the parameters that are used here.
parallelism: The parallelism argument refers to the maximum number of threads that are actively executing tasks.
factory: ForkJoinPool has a default fork-join worker thread factory for creating threads but using this constructor we can create our own custom fork-join worker thread factory.
handler: This handler gets triggered when a thread is about to exit due to an uncaught exception.
asyncMode: It determines the scheduling policy of the pool. It affects the order in which tasks are executed. If asyncMode is set to false then tasks will be picked in a LIFO manner i.e. Last in tasks will be executed first. If the async mode is set to true then the tasks will picked in a FIFO manner i.e. First in tasks will be executed first. So asyncMode controls whether the pool prioritizes executing the newest tasks first (false) or the oldest tasks first (true).
4. ForkJoinPool.commonPool()
common pool method returns the common shared pool instance. It uses a parallelism level equal to the number of available processors returned by Runtime.availableProcessors(). As this will create a shared pool instance hence it should be used for small short-lived tasks because any changes to the configuration or any blocking operations can affect other parts of your application. So it should be used carefully. It’s always preferred to use our own instances of ForkJoinPool for complex or long-running tasks.
ForkJoinPool can be used to execute tasks that can be split into smaller tasks or recursive tasks. So Java has provided 2 classes using which we can create such recursive tasks.
RecursiveAction is an abstract class. Using this class we can create a recursive task that does not return any value. It’s a subclass of ForkJoinTask. This class has an abstract method compute() in which the task’s logic is defined.
Write a RecursiveAction that changes all the characters in a character array to uppercase.
import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; class ToUpperCaseAction extends RecursiveAction { private final char[] data; private final int start; private final int end; private static final int THRESHOLD = 10; ToUpperCaseAction(char[] data, int start, int end) { this.data = data; this.start = start; this.end = end; } @Override protected void compute() { if (end - start <= THRESHOLD) { for (int i = start; i < end; i++) { data[i] = Character.toUpperCase(data[i]); } } else { int mid = (start + end) / 2; ToUpperCaseAction left = new ToUpperCaseAction(data, start, mid); ToUpperCaseAction right = new ToUpperCaseAction(data, mid, end); invokeAll(left, right); } } }
In this example, ToUpperCaseAction converts each character in an array to uppercase. If the part is smaller than a threshold, it performs the conversion directly. If it’s larger, it splits it into two subtasks. The main method creates a character array from a string, creates a ToUpperCaseAction for the whole array, and executes it in a ForkJoinPool. After the task is executed, each character in the array is converted to uppercase.
The invokeAll() method is a static utility provided by the ForkJoinTask class. It’s used to execute multiple ForkJoinTask instances concurrently. In the given examples, invokeAll(left, right) is used to execute two subtasks, left and right, and in parallel.
RecursiveTask is an abstract class. Using this class we can create a recursive task that returns some value. This class also has an abstract method compute() in which the task’s logic is defined.
Write a RecursiveTask that adds the first 10000 numbers.
import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; class SumTask extends RecursiveTask<Long> { private final int start; private final int end; private static final int THRESHOLD = 500; SumTask(int start, int end) { this.start = start; this.end = end; } @Override protected Long compute() { if (end - start <= THRESHOLD) { long sum = 0; for (int i = start; i < end; i++) { sum += i; } return sum; } else { int mid = (start + end) / 2; SumTask left = new SumTask(start, mid); SumTask right = new SumTask(mid, end); invokeAll(left, right); return left.join() + right.join(); } } }
In this example, SumTask sums the numbers in a range. If the range is smaller than a threshold, it calculates the sum directly. If it’s larger, it splits the range into two, creates two tasks for the two ranges, executes them, and sums their results. The main method creates a SumTask for the range from 0 to 10000, executes it in a ForkJoinPool, and prints the result.
The ForkJoinPool class provides a set of methods to submit and execute the tasks.
1. void execute(ForkJoinTask task)
This method submits the task to the pool and initiates its execution. It does not return a Future object. The task is executed asynchronously by a worker thread in the ForkJoinPool.
public class Demo { public static void main(String[] args) { ForkJoinPool pool = new ForkJoinPool(); char[] data = "Hello Java Hands On".toCharArray(); ToUpperCaseAction action = new ToUpperCaseAction(data, 0, data.length); pool.execute(action); action.join(); // Wait for the task to finish System.out.println(data); // Print the modified data } } Output: HELLO JAVA HANDS ON
2. void execute(Runnable task)
This method is similar to the above execute method but the difference here is we are executing the Runnable task. This task will not be split into multiple subtasks. It is also important to note that this method does not return a Future object which means we can’t use it to directly monitor the result or status of the task.
Please check more info on this method here https://javahandson.com/executorservice-in-java/
3. <T> T invoke(ForkJoinTask<T> task)
This method submits the task to the pool for execution and waits for the result. invoke is a blocking call as it waits for the results. The method signature indicates that this method is generic, and can handle ForkJoinTask objects that produce a result of any type. This method does not return a Future instead it returns the result of the computation directly.
public class Demo { public static void main(String[] args) { ForkJoinPool pool = new ForkJoinPool(); char[] data = "Hello Java Hands On".toCharArray(); ToUpperCaseAction action = new ToUpperCaseAction(data, 0, data.length); pool.invoke(action); System.out.println(data); // Print the modified data } } Output: HELLO JAVA HANDS ON
4. <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
This method takes a collection of Callable tasks as input, asynchronously starts them all, and returns a list of Future objects. Each Future object can be used to check if the computation is complete, to wait for its completion, and to retrieve the result of the computation. The invokeAll method is also a blocking call as it waits for all tasks to be completed. Only after all tasks have been completed, then the method return a list of Future objects.
Please check more info on this method here https://javahandson.com/executorservice-in-java/
5. <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)
This method submits a ForkJoinTask to the ForkJoinPool for execution and returns a ForkJoinTask object. This method is different from the execute method as it returns a ForkJoinTask that we can use to manage the task and get the result.
public class Demo { public static void main(String[] args) throws ExecutionException, InterruptedException { ForkJoinPool pool = new ForkJoinPool(); char[] data = "Hello Java Hands On".toCharArray(); ToUpperCaseAction action = new ToUpperCaseAction(data, 0, data.length); ForkJoinTask<?> future = pool.submit(action); future.get(); // This will block until the task is done System.out.println(data); // Print the modified data } } Output: HELLO JAVA HANDS ON
6. ForkJoinTask<?> submit(Runnable task)
This method accepts a Runnable task, submits it for execution to the ForkJoinPool, and returns a ForkJoinTask. As we know runnable task does not return any result hence the ForkJoinTask can be used to check the status of the task, such as whether it’s completed or not.
public class Demo { public static void main(String[] args) throws ExecutionException, InterruptedException { ForkJoinPool pool = new ForkJoinPool(); char[] data = "Hello Java Hands On".toCharArray(); Runnable task = () -> { for (int i = 0; i < data.length; i++) { data[i] = Character.toUpperCase(data[i]); } }; ForkJoinTask<?> future = pool.submit(task); future.get(); // This will block until the task is done System.out.println(data); // Print the modified data } } Output: HELLO JAVA HANDS ON
7. <T> ForkJoinTask<T> submit(Runnable task, T result)
This method submits a Runnable task for execution and returns a ForkJoinTask. As we know runnable task does not return any result hence this method will return the result value provided in the argument.
public class Demo { public static void main(String[] args) throws ExecutionException, InterruptedException { ForkJoinPool pool = new ForkJoinPool(); char[] data = "Hello Java Hands On".toCharArray(); Runnable task = () -> { for (int i = 0; i < data.length; i++) { data[i] = Character.toUpperCase(data[i]); } }; String result = "Task is Completed!"; ForkJoinTask<String> future = pool.submit(task, result); String futureResult = future.get(); // This will block until the task is done System.out.println("Future Result: " + futureResult); System.out.println(data); // Print the modified data } } Output: Future Result: Task is Completed! HELLO JAVA HANDS ON
So this is all about ForkJoinPool in Java. If you want to get more information on ForkJoinPool then you can refer to the official documentation here https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinPool.html
If you have any questions on this topic, please raise them in the comments section. If you liked this article then please share this post with your friends and colleagues.