Many developers know how to start a thread. But what happens next? How to properly stop a thread? What if the thread is blocked? What if the thread started another thread is done, would the child thread be also terminated? When and how to use and re-use threads? Read this post to find answers.
Theory
If you know what thread is and what is the difference between parallel and concurrent execution, jump into Practical.
Why threads?
Problem
Many years ago computers were big and expensive (IBM 7094 costed $18 million for example). If some program’s waiting for data from the slow input device to start processing it, the very expensive CPU simply sits idle.
Solution
To utilize CPU better new Operating Systems and hardware allowing to have multiple programs in memory were developed. So while one program is waiting on IO, another one having enough data could use CPU. This is how the CPU could be utilized almost 100% of the time.
Preemptive scheduling
What if there are multiple programs ready to use CPU? Which one should run first? OSs have a preemtive scheduler giving each program a fair share of CPU and keeping all parts of the system busy. So if there are two programs PR1 and PR2, the scheduler could allow PR1 to use CPU for some time, then put it on hold and let PR2 use CPU, then put PR2 on hold, switch to PR1 one, and so on. If some program is waiting on IO, the scheduler will not give the CPU this program until the IO is done. So, even though there is only one CPU we have programs running simultaneously, or concurrently.
A program with its state (allowing to know which instruction should be run next when the scheduler unpauses the program and what resources the program needs) is called a process. A process has one or many execution threads. A thread needs fewer resources than a process, so on a modern OS, it’s cheaper to create a thread than a process.
Concurrency vs parallelism
Imagine a restraint with a waiter and a chef. The waiter is waiting for the customers to come and serve them. The chef is cooking the food. While the waiter is waiting for the customers, bringing them a menu, and receiving an order, the chef is working on the orders that the waiter already brought. In our analogy, each table is a process, the order is a program to execute, the waiter and customers reading the menu are IO, and the chef is the CPU and the scheduler. While ingredients for a dish ordered by table 1 are cooking in boiled water, the chef could work on another dish ordered by table 2. This is how one chef executes multiple programs at the same time. This is concurrent execution. Processes (tables) can’t change the order of execution, it’s up to the chef to decide which program to execute next. There is a limit on orders the chef can execute at the same time. To reduce the amount of time the customers are waiting for the food, a new chef could join the team, so we have two chefs working in parallel. This is parallel execution. Different CPU cores (chefs) can work on the same process (order) to make it faster or on different processes (orders).
Note
|
Now the waiter could be a bottleneck. The chefs can execute more orders, but they have to wait for the waiter to bring them and simply sit idle. IO can be parallel as well, but we’ll take a look at it later in a separate post. |
Practical
In this section, we’ll see how to create a thread and how to use it. We use Java since this language is well known, and it has a rich set of capabilities for multithreading.
How to create and use a thread?
There are multiple ways to execute some piece of code concurrently, but basically, it’s done by creating a function that is going to be executed concurrently (or in parallel) and passing it to a (new or existing) thread. We will discuss the thread re-using a bit later. For now, let’s create a thread and see how it works.
AtomicBoolean changed = new AtomicBoolean(false);
Thread thread = new Thread(() -> {
// executed concurrently with main thread
changed.compareAndSet(false, true);
});
// Before starting the thread, the thread is not running.
assertThat(thread.isAlive(), is(false));
assertThat(thread.getState(), is(Thread.State.NEW));
assertThat(changed.get(), is(false));
// Start the thread.
thread.start();
assertThat(thread.isAlive(), is(true));
assertThat(thread.getState(), is(Thread.State.RUNNABLE));
assertThat(changed.get(), is(false));
// Wait for the thread to finish.
thread.join();
assertThat(thread.isAlive(), is(false));
assertThat(thread.getState(), is(Thread.State.TERMINATED));
assertThat(changed.get(), is(true));
In this example, we create a thread and pass it a lambda expression that is going to be executed concurrently with the main thread. The function just changes a boolean value, so we can see in the main thread if it has been executed or not.
In the first line, we create an AtomicBoolean variable that will be used to signal the main thread that the created thread has finished. The AtomicBoolean is a special class that allows us to use atomic operations on it. We will talk about atomic operations in later posts.
Each thread has multiple states:
-
NEW - the thread has been created but not started yet (the start() method has not been called)
-
RUNNABLE - the thread has been started but not yet finished
-
TERMINATED - the thread has finished
There are more states, but they are related to synchronization, and we cover them a bit later.
How to wait for a thread to finish?
The started thread runs in parallel with the main thread. We can wait for the thread to finish by calling the join()
method. This method blocks the main thread until the running thread finishes. It’s a blocking call, so the main thread can’t continue until the thread finishes. There is an overloaded version of join() that takes a timeout parameter. If the thread doesn’t finish in the given time, the main thread just continues execution.
Threads synchronization
So, the join() method is one of the ways to synchronize threads. The other way is to use locks (we discuss it in the next post). So far let’s see how to use join() and how it changes the executing thread’s state.
To test it we need a long-running function that keeps a thread busy for a while (there are two methods running for 100 milliseconds and for 1 second). We would use some not very efficient algorithm for finding prime numbers to implement it and give a big enough number to occupy the CPU for a desired amount of time.
Thread thread = new Thread(() -> {
// executed concurrently with main thread
var child = new Thread(CpuIntensiveAlgorithm::run1s);
child.start();
try {
child.join();
} catch (InterruptedException e) {
//ignore in test
}
});
thread.start();
// let the child thread run for a while
thread.join(100);
assertThat(thread.getState(), is(Thread.State.WAITING));
When one thread is waiting for another thread to finish, it changes its state to WAITING. If we change the child.join() call (line 9) to child.join(100), the thread will have TIMED_WAITING state (see next test method).
So, we covered all thread states except for BLOCKED. To discuss this state we need to dive deeper into threads synchronization which we will discuss in the next post.
Thread sleep
One more way to put a thread into a waiting state is to call Thread.sleep()
method. This method blocks the thread for the given amount of time. It’s a bad idea to use this method for threads synchronization because it blocks the thread, so it’s just occupying resources and doesn’t do anything, and sometimes this idle time is too long, and sometimes it’s too short. You never know because you don’t control the scheduler.
Threads interdependency
What happens to a child thread if its parent thread terminates? Nothing, the child thread is still running.
AtomicBoolean calculationIsDone = new AtomicBoolean(false);
Thread childThread = new Thread(() -> {
CpuIntensiveAlgorithm.run100Ms();
calculationIsDone.set(true);
});
Thread parentThread = new Thread(childThread::start);
parentThread.start();
parentThread.join(100);
assertThat(parentThread.isAlive(), is(false));
assertThat(childThread.isAlive(), is(true));
childThread.join(200);
assertThat(calculationIsDone.get(), is(true));
How to stop a thread?
So, we know how to wait for a thread to finish, but how to force it to be stopped (if it’s running for too long, for example)?
The simple answer is you can’t force it. Remember? You can’t control the scheduler. But you can call Thread.interrupt()
method. This method interrupts the thread. So, any method that throws an InterruptedException will probably throw this exception, and you can handle the interruption in the catch block.
Note
|
I say "probably" because it really depends on implementation. But it’s a good practice to throw InterruptedException when you receive the interrupt signal. |
AtomicBoolean exceptionCaught = new AtomicBoolean(false);
Thread threadToStop = new Thread(() -> {
try {
Thread.sleep(Long.MAX_VALUE);
} catch (InterruptedException e) {
exceptionCaught.compareAndSet(false, true);
}
});
threadToStop.start();
threadToStop.interrupt();
threadToStop.join(100);
assertThat(exceptionCaught.get(), is(true));
assertThat(threadToStop.isAlive(), is(false));
In the example, the thread sleeps as long as it can but when it receives the interrupt signal the sleep method throws an InterruptedException.
Interrupting a thread listening for input
What if a thread is waiting for some input? For example, listening on a server socket (on the accept()
method) for incoming requests from clients? Would the accept method throw an InterruptedException
, InterruptedIOException
or any kind of exception in case the thread is interrupted? The answer is no. The InterruptedException
exception is thrown only by methods that bring a thread into WAITING or TIMED_WAITING state. The ServerSocket.accept()
method keeps the thread in RUNNABLE state despite the fact that the thread is waiting on IO. That’s what you should expect calling any IO blocking operations. The fact that you interrupted a thread waiting on IO doesn’t magically tell IO to stop blocking the thread. In our example the socket doesn’t know about the interruption, and it keeps listening for incoming requests and keeps blocking the thread.
AtomicBoolean exceptionCaught = new AtomicBoolean(false);
AtomicInteger receivedInput = new AtomicInteger(0);
var port = ThreadLocalRandom.current().nextInt(10000, 20000);
Thread threadToStop = new Thread(() -> {
try (ServerSocket socket = new ServerSocket(port);
var clientSocket = socket.accept();
InputStreamReader inputStream = new InputStreamReader(clientSocket.getInputStream())) {
var read = inputStream.read();
receivedInput.compareAndSet(0, read);
} catch (IOException e) {
exceptionCaught.compareAndSet(false, true);
}
});
threadToStop.start();
threadToStop.interrupt();
threadToStop.join(100);
assertThat(exceptionCaught.get(), is(false));
assertThat(threadToStop.isAlive(), is(true));
assertThat(threadToStop.getState(), is(State.RUNNABLE));
// after interrupting we even can send input to the thread
try (Socket socket = new Socket("localhost", port); PrintWriter out = new PrintWriter(socket.getOutputStream(),
true)) {
out.write(10);
}
threadToStop.join(100);
assertThat(receivedInput.get(), is(10));
Even though the thread is interrupted it’s still alive and listening for input. We can even send a message to the socket.
Note
|
Not any stream reading is blocking. For example, FileInputStream can be read from beginning to end. So the thread reading from the file is done as soon as the file is read. |
How to handle thread interruption in a thread listening for input
The golden rule is "don’t wait forever". Always use a timeout. In the same way it works for the Thread.join(timout)
method, and it should work for any IO methods. Find a way to set a timeout. For socket the method is Socket.setSoTimeout()
.
As soon as the timeout is set you need to implement the interruption handling. When the interrupt()
method is called on a thread that is not in a waiting state (so, the InterruptedException
is not thrown), the method sets the interrupted flag of the thread to true. The flag can be checked by calling Thread.isInterrupted()
. So, the interrupted thread can handle the interruption (for example close any open resources and exit). In our example all open resources are closed automatically (by the try-with-resources), so the thread just need to stop listen on the accept()
method.
Note
|
If there is no way to set the timout, the client interrupting the thread should close the IO operation before the interruption. Otherwise, the thread will be stuck in the blocking method. From design point of view, it’s not good to let client know about internal thread details. So, the timout is preferable, since it keeps internal details encapsulated and client is not aware of any IO, it just know that there is some job that has to be interrupted. |
AtomicBoolean exceptionCaught = new AtomicBoolean(false);
var port = ThreadLocalRandom.current().nextInt(10000, 20000);
Thread threadToStop = new Thread(() -> {
try (ServerSocket socket = new ServerSocket(port)) {
socket.setSoTimeout(10);
while (!Thread.currentThread().isInterrupted()) {
try (var clientSocket = socket.accept();
InputStreamReader inputStream = new InputStreamReader(clientSocket.getInputStream())) {
var read = inputStream.read();
// process input
}
}
} catch (IOException e) {
exceptionCaught.compareAndSet(false, true);
}
});
threadToStop.start();
threadToStop.interrupt();
threadToStop.join(100);
assertThat(threadToStop.isAlive(), is(false));
As you see there is a very small timeout now and each time the timeout is over the thread checks if it’s interrupted. And in case of an interruption, it exits the loop and stops listening for incoming connections.
Note
|
But what happens with clients if the timeout is over? Will the client be able to establish a connection and send a request while the thread is checking for the interruption and is not waiting on the Socket.accept() method? Yes, this is how non-blocking IO works. Basically, there is a buffer for the input and as soon as the thread calls accept method again it receives the next input that came while the thread was checking the interruption flag or processing the previous input.
|
How to stop a thread that is busy calculating something?
AtomicBoolean calculationIsDone = new AtomicBoolean(false);
Thread threadToStop = new Thread(() -> {
CpuIntensiveAlgorithm.run1s();
calculationIsDone.set(true);
});
threadToStop.start();
threadToStop.interrupt();
threadToStop.join(100);
assertThat(threadToStop.isAlive(), is(true));
assertThat(calculationIsDone.get(), is(false));
No miracles, the thread is still alive and calculating. So, to process interruption we need to split the job into smaller chunks and check for interruption after each chunk is done in the same way as we did in the previous example. Don’t block the thread for a long time! Expect the interruption!
Thread reusing
The last thing to talk about is costs. Each thread occupies resources. JVM allocates about 1Mb of memory for each thread. Each thread needs time to be started. It’s an additional load for the scheduler (one more thread to schedule). So, if you have a lot of work to run in parallel it’s a good idea to reuse threads.
Thread pool
The thread pool allows re-using threads. It’s one or many pre-allocated threads and a queue of lambdas which are tasks to be executed by the threads. There are multiple ways of threads allocating, the queue can be limited or unlimited, multiple policies of what to do if the queue is full, and so on, but the basic idea is that instead of being executed immediately the task is put into the queue and some thread is scheduled to execute it.
The task is represented by an instance of the Callable
interface. And when the Callable
is put into the pool the result of the execution is represented by Future
. The Future
is a promise that the result will be available later. It allows to check the current state of the task or call Future.get()
to wait for the result.
// Create a pool of threads having only one thread.
ExecutorService executorService = Executors.newFixedThreadPool(1);
ExecutorCompletionService<String> executorCompletionService = new ExecutorCompletionService<>(executorService);
AtomicInteger executionNumber = new AtomicInteger(0);
Callable<String> task = () -> {
var threadName = Thread.currentThread().getName();
return "thread name: " + threadName + ", number of execution: " + executionNumber.incrementAndGet();
};
// Put two tasks in the queue.
executorCompletionService.submit(task);
executorCompletionService.submit(task);
Future<String> firstTaskResult = executorCompletionService.take();
Future<String> secondTaskResult = executorCompletionService.take();
assertThat(firstTaskResult.get(), is("thread name: pool-1-thread-1, number of execution: 1"));
assertThat(secondTaskResult.get(), is("thread name: pool-1-thread-1, number of execution: 2"));
executorService.shutdown();
In the above example, we created a thread pool with only one thread and submitted a task that returns the name of the thread and the number of execution. We submitted the task twice, and we can see that both tasks are executed by the same thread. This is how we re-used the thread.
Conclusion
So, now you know how to start a thread and how to wait for it to be executed. You know how to handle thread interruption and how to stop a thread that is busy calculating something or waiting for IO. You know how to use a thread pool. In the next post, we will talk about locks and synchronization. Stay tuned!