
So far, what did we do??
This article is the 4th of the article series about Java concurrency, explained from the very basics 🙂 What we have discussed so far includes – what the concurrency is, Java threads and synchronization of threads. Suppose we have a big computational problem. Now we can divide it to small pieces and solve each piece. Finally we can get the result by combining all the small pieces. Computers nowadays have several processors. Core i5 has 4 processors. So we can assign these small pieces of tasks to these 4 processors and they would solve 4 tasks at once. This is called parallel computing.
The basic method of achieving this – Java threads. We can insert our task inside a Thread object and send it to process. Likewise we can create as many threads according to the number of sub tasks. Each thread will be processed separately.
New problem…..
Concurrency seems to be a happy ending by this stage. But it is way more complicated 🙁 Now a new problem arises. It arises according to the way we divide a main problem in to smaller pieces.
Let’s take an example. Suppose a big problem have three sub parts A, B, C and D. Our processor has 4 cores. So we put A, B, C and D in to four threads. Now we assume all four processors will start executing the four threads at the same time.
If the sub problem B is a bit more complicated and time consuming than the other three, what happens? Surely, A, C and D will finish earlier and 3 of the processors become idle. The computer can’t get the final solution until the fourth processor finishes the execution of B. So, the whole system freezes until that single processor completes its job. Whole system resources are not used as 3 processors are idle. What a waste…! 🙂
So what should we do?
The way to handle this problem is to create many sub-problems. Suppose B is again sub-divided in to 4 parts B1, B2, B3 and B4. Now, as soon as one processor become idle, it can pick one of these 4 and start solving. So, all processors will be busy and the problem can be solved faster. Likewise we can divide our problem in to many parts.
Note:
Note that, dividing a problem can cost CPU processing time as well. If we divide a problem in to unnecessarily small, again more time will be taken, just to sub-divide the problem. For example, just imagine we want to process an image file. We can divide the image to individual pixels. There are 1049088 pixels in 1366 x 768 size HD image. Dividing a picture in to 1049088 parts would surely cost a considerable time.
But we can divide the picture in to 768 rows, each row having 1366 pixels. So we have to process only 768 sub problems, each row having approximately equal processing time. That would cost lesser overall time when compared with the previous.
How to handle these sub-problems?
Now see, we have to handle a considerable number of sub problems. Again, it would be inefficient to create a dedicated thread for each of them. So, what is the option?
An Example
Let’s begin with an example. If ImageProcessingThread is a child-class of the Java class Thread and imageRows is an integer giving the number of pixel rows in the image.
Eg:
1 2 |
int range = imageRows; ImageProcessingThread[] workers = new ImageProcessingThread[range]; |
This workers is a variable of an array of threads. Each thread can now be assigned a task and start executing.
If
1 |
imageProcessingThread(int rowNo) |
is a Thread object that contains the sub-task to process the row having the number rowNo in an image, so, for all rows,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
for(int i=0;i<range;i++){ workers[i] = new ImageProcessingThread(i); workers[i].start(); } for(int i=0;i<range;i++){ try { workers[i].join(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } |
What did we do here? We have created a thread for each row of pixels. So in 1366×768 image, 768 WorkerThreads. You may note that, each thread will be used only once and after finishing the sub-task, the thread will be removed. This is also wasteful. Here we have the Thread pool 🙂
So, can you think about a better approach?
First thing, we should not embed the sub-task in the thread. Instead, have a pool of smaller number of threads and a list of the sub-tasks.
The threads in the pool would not be removed until all the sub-tasks in the list are finished.
Each thread in the pool is assigned a sub-task from the list in the beginning.
Once a sub-task in a thread is finished, assign a new task from the remaining sub-tasks to the thread.
Continue this until all sub-tasks are finished.
This method avoids the wasteful process of creating and destroying 768 threads.
Now, how do we direct sub-tasks to the thread pool?
For that we have to put all 768 sub-task objects in to some type of an array. This array should act like a queue.
An item is pop out from the queue when there’s a vacancy for a new sub-task in a thread of the pool.
We can use a ConcurrentLinkedQueue in the package java.util.concurrent.
It has the syntax like this;
1 |
ConcurrentLinkedQueue<Runnable> taskQueue = new ConcurrentLinkedQueue<Runnable>(); |
to store a list of sub-tasks in the variable taskQueue. Any number of sub-tasks can be put in to this taskQueue.
1 |
taskQueue.add(subTask); |
Note that, the sub-task is a object that implements Runnable interface. We have done Runnable interface implementation in our previous article of concurrency.

So, here are the steps to use thread-pools:
- Create a sub-task class that implements Runnable interface.
- Create a ConcurrentLinkedQueue object and put the sub-task objects in to it.
- Make an array of Threads for the thread pools.
Now let’s see how to write the class of a single Thread in the thread pool in part 3.
import java.util.concurrent.ConcurrentLinkedQueue;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
public class ThreadInThePool extends Thread{ ConcurrentLinkedQueue<Runnable> taskQueue; private boolean isRunning=true; public ThreadInThePool(ConcurrentLinkedQueue<Runnable> queue){ taskQueue=queue; } //this will stops the loop in the run() method before finishing //all sub-tasks in the taskQueue. public void abort(){ isRunning = false; } public void run(){ while(isRunning){ Runnable subTask=taskQueue.poll(); if(subTask==null){ break; //break statement terminates the thread when all the sub-tasks //in the taskQueue are over! } subTask.run(); } } } |
The method abort() can be use to stop the Thread object from running and eliminate it from the Thread pool.
In the run() method of the thread, the sub task’s run() method is going to execute. When the sub-task’s run() method is finished, it requests another sub-task from the task queue and this is continued until task queue is empty. When it is empty, the thread will end.
Let’s have a complete example code. I want to count the number of primes between 1 to 10000000. So, I can write the following code. Here, CountPrimes is a class that implements the interface Runnable.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
public class CountPrimes implements Runnable{ int count = 0; int min,max; public CountPrimes(int min,int max) { this.min = min; this.max = max; } public int getCount(){ return count; } private static boolean isPrime(int val){ int sqr = (int)Math.sqrt(val); for(int i=2;i<=sqr;i++){ if (val%i==0) return false; } return true; } @Override public void run() { for (int i = min; i<=max;i++){ if(isPrime(i)) count++; } } public static void main(String[] args){ long startTime = System.currentTimeMillis(); CountPrimes n=new CountPrimes(1,10000000); n.run(); long elapsedTime = System.currentTimeMillis() - startTime; System.out.println(n.getCount()); System.out.println("Total elapsed time: " + (elapsedTime/1000.0) + " seconds"); } } |
Here, there is only the main() thread.
The output in my laptop:
664580
Total elapsed time: 8.978 seconds
But now I use the above concept and subdivide my problem in to smaller 20 pieces each process 10000000/20 items and create a thread pool with 8 threads.
Here I modify the class ThreadInThePool a little bit by adding some synchronized methods and a main() method to start the process.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
import java.util.concurrent.ConcurrentLinkedQueue; public class ThreadInThePool extends Thread{ ConcurrentLinkedQueue<Runnable> taskQueue; private boolean isRunning = true; private int ct = 0; private static int thrdCt=0; private static int total = 0; private static long startTime,elapsedTime; public ThreadInThePool(ConcurrentLinkedQueue<Runnable> queue){ taskQueue=queue; } //this will stops the loop in the run() method before finishing //all sub-tasks in the taskQueue. public void abort(){ isRunning = false; } public void run(){ try{ while(isRunning){ Runnable subTask=taskQueue.poll(); if(subTask==null){ break; //break and the thread terminates when all the sub-tasks //in the taskQueue are over! } subTask.run(); CountPrimes tsk = (CountPrimes) subTask; ct+=tsk.getCount(); } addToTotal(ct); } finally{ increseThrdCt(); if (thrdCt==8){ System.out.println(total); elapsedTime= System.currentTimeMillis() - startTime; System.out.println("Total elapsed time: " + (elapsedTime/1000.0) + " seconds"); } } } synchronized private static void increseThrdCt(){ thrdCt++; } synchronized private static void addToTotal(int x) { total = total + x; } public static void main(String[] args){ startTime = System.currentTimeMillis(); ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<Runnable>(); int lo = 1; int hi = 10000000/20; for (int i=0;i<20;i++){ CountPrimes obj = new CountPrimes(lo,hi); queue.add(obj); lo = hi; hi = hi+10000000/20; } ThreadInThePool[] thrdPool = new ThreadInThePool[8]; for (int i=0;i<8;i++){ thrdPool[i] = new ThreadInThePool(queue); thrdPool[i].start(); } } } |
Now the output is
664580
Total elapsed time: 3.86 seconds
which is considerably smaller than the previous time.
There are easier ways of getting thread pools using newFixedThreadPool in the package java.util.concurrent.executors. If you are interested, read the Oracle documentation on thread pools. What I focused in the article is their inner working. With this basic theory, it’s easier to understand how these features work.
Thanks for watching the post! If you like the post, please share it with your friends 🙂 In the next article we’ll see the weaknesses of this procedure and how to overcome them. Also, we’ll discuss what are blockingQueues in the next articles.