We discussed how to start and stop threads in the previous post. This one is about making threads run collaboratively instead of impeding each other.
Theory
Synchronization
As we discussed previously, sometimes threads need to synchronize. It’s required because the threads are executed by the scheduler, and we can’t control the order of execution. Sometimes the order is important. There are two cases when we need to synchronize:
-
Multiple threads read and update shared data. Without synchronization, it can become inconsistent.
-
A thread needs to wait for another thread to finish its work.
Races
Each thread has its own memory. Everything created within the thread’s lambda is local to the thread and not visible to any other thread. But any thread launching a new thread can pass its local variables to the new thread. This is how the variables can be shared between threads.
Imagine that there is a main thread that has a counter set to zero. The main thread creates two threads (A and B) and passes the counter to them to be incremented. Thread A starts and reads the counter. It’s zero. Then thread B starts and also reads the counter. It’s zero. Then thread A increments the counter. It’s one. Then thread B increments the counter. It’s also one. Thread A writes the value to the counter in the main thread and thread B writes the value to the same variable. The counter is one but has to be two. This is a race condition called "lost update". And this is a good example of why it’s important to protect shared data. Thread B should wait until thread A is done with the update and only after that it should read the counter.
Linearizability
What do we expect from concurrent code? It should be executed in the same way as it would be executed in sequential order. Either actor A increments the counter and then actor B increments the counter or actor B increments the counter and then actor A increments the counter. In any case, the counter should be two at the end of the operations. It can’t be one.
Atomicity
What is the problem with the counter? The problem is that there are 3 operations: read the counter from the shared memory to the thread’s local memory, increment it in the local memory and write the new value back to the shared memory. And since there could be a switch from thread A to thread B between reading and writing, another thread could read the old value.
private static class Counter {
private int count = 0;
public int getCount() {
return count;
}
public void increment() {
count++;
//long version:
// int localCount = count;
// localCount = localCount + 1;
// count = localCount;
}
}
To reproduce the problem we can use kotlinx-lincheck library. The library receives methods marked with @Operation
to generate multiple scenarios of concurrent executions of the operations. And then it checks each execution for linearizability. Can the result produced by threads executing the operations in parallel be produced by a single thread executing operations in some sequential order? If there is no such order, the test fails. We can’t call the increment twice and get 1 as a result of executing things sequentially but we have this result executing them in parallel. So the LostUpdateIntCounterTest fails and prints the failed scenario to output.
Mutex
What’s the solution? The solution is to read, increment, and write as one operation or atomically. In our case, there are threads A and B that are about to perform the atomic operation called "atomic increment" (which consists of 3 steps: read, increment, write). Atomicity means that if thread A already started the operation then thread B should wait until thread A is done with all 3 steps.
The mechanism allowing the implementation of atomic operations is called mutual exclusion or mutex. What is mutex? Imagine a rostrum and a forum. Anyone from the forum’s audience could potentially speak but to make things organized, only one who is on the rostrum is allowed to speak and no one can interrupt the speaker. When the speaker is done she can release the rostrum to let someone else speak. Each person is a thread and the rostrum is a mutex in our example.
To make a set of steps atomic, we need to add mutex acquisition before the set and mutex release after the set. This is how the mutex protects the set of operations from parallel execution (i.e. interruptions).
So, to fix our counter:
private static class Counter {
private int count = 0;
private Lock lock = new ReentrantLock();
public int getCount() {
return count;
}
public void increment() {
lock.lock();
try {
//count++;
int localCount = count;
localCount = localCount + 1;
count = localCount;
} finally {
lock.unlock();
}
}
}
The lock is a mutex protecting the counter increment from concurrent execution. If multiple threads are executing the Counter.increment()
only one of them will be able to proceed lock.lock()
. All other threads are blocked on it until the thread acquired the lock called lock.unlock()
. The ReentrantLock
is a lock that allows a thread to not block itself by calling the lock()
method multiple times (for example if the thread calls lock.lock()
and then calls a method that also has lock.lock()
). It’s a good practice to have lock.unlock()
in the "finally" section to guarantee to unlock.
This is a test demonstrating the lock.
ReentrantLock lock = new ReentrantLock();
AtomicBoolean protectedSectionReached = new AtomicBoolean(false);
Thread child = new Thread(() -> {
lock.lock();
protectedSectionReached.set(true);
});
//lock is acquired by the main thread
lock.lock();
child.start();
run100Ms();
assertThat(protectedSectionReached.get(), is(false));
assertThat(child.getState(), is(WAITING));
lock.unlock();
run100Ms();
assertThat(protectedSectionReached.get(), is(true));
assertThat(child.getState(), is(TERMINATED));
Since the lock is acquired by the main thread, the child thread is blocked on it. The child thread is waiting for the lock to be released.
Note
|
We use our CPU intensive algorithm from the previous post to delay things for 100 ms sometimes to make sure that parallel thread had enough time to execute things. |
It’s worth mentioning that there are AtomicInteger, AtomicLong, and other atomic versions of the primitive types. So, there is no reason to implement our own atomic counter. But please note, that the atomic class itself doesn’t guarantee atomicity, it just provides atomic operations. The following increment is not atomic:
private static class Counter {
private AtomicInteger count = new AtomicInteger(0);
public int getCount() {
return count.get();
}
public void increment() {
// wrong implementation
count.set(count.get() + 1);
}
}
You should use AtomicInteger.incrementAndGet()
or AtomicInteger.getAndIncrement()
(we don’t need the result immediately, so it doesn’t matter which method to call) instead, which are the atomic increment methods.
Blocked Thread
There is another way to create a mutex in Java. It’s a synchronized
word that can be used on a method or a block level. A thread that is blocked on this mutex is in a BLOCKED
state. The problem with this approach is that a thread waiting in the BLOCKED
state is not able to handle an interruption. See the example below:
final Object lock = new Object();
Thread thread = new Thread(() -> {
synchronized(lock){
//do something here
}
});
synchronized (lock){
thread.start();
thread.join(100);
assertThat(thread.getState(), is(State.BLOCKED));
thread.interrupt();
thread.join(100);
assertThat(thread.getState(), is(State.BLOCKED));
}
We discussed in the previous blog post that it’s very important to be interruptable. The big advantage of the ReeentrantLock is that there is an interruptable version of the lock() method (called ReentrantLock.lockInterruptibly()
). See TestThreadInBlockedState.testInterruptWaitingThread()
for an example.
Dirty read
Is it enough to have the update operation atomic to protect the variable shared among threads? In our case, there is only one variable and the answer is yes. You can check it by adding the lock to the increment
method and restarting the test. If there is only one field and the update is atomic then the read is also atomic.
But that doesn’t work in case we have multiple fields (for example, numerical and string representation of the counter).
private static class StringAndNumber {
private int number = 0;
private String string = "0";
private Lock lock = new ReentrantLock();
public int getNumber() {
return number;
}
private String getString() {
return string;
}
public void increment() {
lock.lock();
try {
number++;
string = String.valueOf(number);
} finally {
lock.unlock();
}
}
}
The atomicity of the update operation is not enough to protect the read. If you run the DirtyReadTest
you will see the failed scenario.
Parallel part: | getNumber(): 1 | increment(): void | | getString(): 0 | |
While one thread is incrementing, another thread is reading. The reader could get the first field updated but the second field is still having the old value. The operation is atomic from the writing threads perspective, but it’s not from the reading threads point of view. We can easily fix it by adding the lock to read methods as well. And this is actually how we can prevent any inconsistency - just make all methods interacting with the shared state available to only one thread at a time. But that’s not probably what we want. Our initial intent is to have threads running in parallel, so we can utilize resources more efficiently. But if each method is exclusive and blocks all threads except one that acquired the lock then the execution is sequential. We lost any parallelism.
Imagine that we have only one thread incrementing the counter and 1000 threads reading it. All threads have to wait for the lock to be released by the updating thread. That’s fine. That makes both fields consistent for the readers. But since both read methods (getNumber
and getString
) are blocking now, each reader should wait for another reader. Parallel reads are not allowed anymore, despite the fact the reading threads can’t impact each other.
The better solution is to block readers while there is an update in progress, but as soon as the update is done, it’s safe to read in parallel. So, we need two locks: one for the update operation and one for the read operation. The locks are depends on each other. The read lock is not exclusive. Multiple threads can acquire it. So, reading threads don’t need to wait for each other. But if the write lock is acquired then the read lock is not allowed to be acquired, it should wait until the write lock is released. This is how we make all readers wait until the increment is done. Also, the write lock can’t be acquired until all read locks are released. So the update is waiting for all readers to finish (otherwise they could read an inconsistent update). Does it mean that readers could potentially block the writer forever? Nope, the write lock is waiting only for readers that have started before the write attempt. All reading threads that tried to acquire the read lock after the write lock acquisition attempt are waiting for the write lock to be acquired (as soon as all in-progress readers are done) and released (after the write is done). This is how the update is consistent but readers don’t block each other. So the final version of the fix is:
private static class StringAndNumber {
ReadWriteLock lock = new ReentrantReadWriteLock();
private int number = 0;
private String string = "0";
public int getNumber() {
lock.readLock().lock();
try {
return number;
} finally {
lock.readLock().unlock();
}
}
private String getSting() {
lock.readLock().lock();
try {
return string;
} finally {
lock.readLock().unlock();
}
}
synchronized public void increment() {
lock.writeLock().lock();
try {
number++;
string = String.valueOf(number);
} finally {
lock.writeLock().unlock();
}
}
DirtyReadFixTest
proves that the fix is correct. If you are curious to see how read-write locks work, please check the ReadWriteLockTest
.
Deadlock
You should be careful when there is more than one lock involved in the thread’s synchronization. A deadlock is a situation when two or more threads are waiting for each other to release the lock. Here is an example:
Lock lockA = new ReentrantLock();
Lock lockB = new ReentrantLock();
AtomicBoolean protectedSectionReachedByThreadA = new AtomicBoolean(false);
Thread threadA = new Thread(() -> {
lockA.lock();
try {
run100Ms();
lockB.lock();
try {
protectedSectionReachedByThreadA.compareAndSet(false, true);
} finally {
lockB.unlock();
}
} finally {
lockA.unlock();
}
});
AtomicBoolean protectedSectionReachedByThreadB = new AtomicBoolean(false);
Thread threadB = new Thread(() -> {
lockB.lock();
try {
run100Ms();
lockA.lock();
try {
protectedSectionReachedByThreadB.compareAndSet(false, true);
} finally {
lockA.unlock();
}
} finally {
lockB.unlock();
}
});
threadA.start();
threadB.start();
threadA.join(200);
threadB.join(200);
assertThat(protectedSectionReachedByThreadA.get(), is(false));
assertThat(protectedSectionReachedByThreadB.get(), is(false));
assertThat(threadA.getState(), is(WAITING));
assertThat(threadB.getState(), is(WAITING));
Thread A acquires the lockA and waits for the lockB to be released. Thread B acquires the lockB and waits for the lockA to be released. Both threads are waiting for each other forever.
Waiting and notifying
So far we have seen how to synchronize threads to keep shared data consistent. But what if one thread needs to sync its work (not data) with another thread? For example, some thread can’t proceed until another thread is finished its work. To implement it the threads need to be able to send messages to each other.
One of the ways to have a thread waiting for another thread to finish is to use Condition
. The condition is created from a lock and the lock should be acquired before using the condition. Let’s see how it works. Below we create a thread that is waiting for a signal from the main thread. It acquires the lock and waits for the signal on the condition. As soon as the await()
method is called the thread releases the lock and starts waiting. So, the main thread can acquire the lock and send the signal. That’s not enough for the waiting thread to receive the signal, it should be able to acquire the lock to go ahead, so the main thread should release the lock after the signal is sent. Then the waiting thread continues executing from the await()
method.
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
AtomicBoolean awaitReached = new AtomicBoolean(false);
AtomicBoolean threadIsAwaken = new AtomicBoolean(false);
Thread thread = new Thread(() -> {
lock.lock();
try {
awaitReached.compareAndSet(false, true);
condition.await();
threadIsAwaken.compareAndSet(false, true);
} catch (InterruptedException e) {
//ignore in this test
} finally {
lock.unlock();
}
});
thread.start();
run100Ms();
assertThat(awaitReached.get(), is(true));
assertThat(threadIsAwaken.get(), is(false));
// to use condition we need to acquire the lock
// otherwise IllegalMonitorStateException is thrown
assertThrows(IllegalMonitorStateException.class, condition::signalAll);
//the thread is waiting for the condition
//and it releases the lock
assertThat(lock.tryLock(), is(true));
// now awake the thread
condition.signalAll();
// let's delay a bit to let the thread handle the signal if it's received
run100Ms();
// no, the thread is still awaiting the condition because it can't acquire the lock
assertThat(threadIsAwaken.get(), is(false));
assertThat(thread.getState(), is(WAITING));
// now we release the lock
lock.unlock();
// let's delay a bit to make sure that the thread has enough time to receive the signal
run100Ms();
//now the thread is awakened
assertThat(threadIsAwaken.get(), is(true));
assertThat(thread.getState(), is(TERMINATED));
An important outcome from the above example:
-
To wait for the condition, we need to acquire the lock.
-
To signal the condition, we need to acquire the lock.
-
After the signal is sent/received, we need to release the lock to let another thread proceed.
This is how the condition is protected from concurrent execution and lets threads communicate with each other.
Note
|
Please note, that to be able to receive the signal, the thread must be waiting on await() method, not on the lock.lock() . If the thread is blocked on the lock acquisition, it can’t receive the signal. See TestReentrantLock.signalIsNotReceivedIfThreadBlockedOnLockAcquisitionNotOnAwait for details.
|
Practice
Having all the above knowledge, we can implement a thing that will be very useful soon. The thing can be used to split the execution of multiple threads into sequential phases. Each phase is executed by some thread and while the phase is executing other threads are waiting. Then the next phase is started and so on. The PhaseSync
can be useful to reproduce different race conditions. That’s the next topic I’ll cover in the next post.
Usage
The PhaseSync class has the following public methods:
public void phase(Phases phase, FallibleFunction execution);
public void phaseWithExpectedException(Phases phase, FallibleFunction execution, Class<? extends Exception> expectedException);
public boolean noExceptions();
public String exceptionDetails();
public void ifAnyExceptionRethrow();
The phase()
method is used to execute the given function in the given phase. The phaseWithExpectedException()
method is the same as the phase()
but it also expects an exception to be thrown (we’ll talk about it a bit later).
This is an example of how the PhaseSync is used:
PhaseSync phaseSync = new PhaseSync();
final AtomicReference<String> stages = new AtomicReference<>("");
BinaryOperator<String> append = (a, b) -> a + b;
runAsync(() -> {
phaseSync.phase(Phases.SECOND, () -> stages.getAndAccumulate("2", append));
phaseSync.phase(Phases.FOURTH, () -> stages.getAndAccumulate("4", append));
});
runAsync(() -> {
phaseSync.phase(Phases.FIRST, () -> stages.getAndAccumulate("1", append));
phaseSync.phase(Phases.THIRD, () -> stages.getAndAccumulate("3", append));
});
phaseSync.phase(Phases.FIFTH, () -> {});
assertEquals("1234", stages.get());
We create two threads that execute 4 phases. Each phase just adds the phase number to the stages string. The phases are executed in the order specified by the first parameter’s value. The last fifth phase is to wait for all phases to be executed in the main thread.
Note
|
The CompletableFuture.runAsync() method is just a convenient way to execute a code in a parallel thread. Under the hood, it uses the ForkJoinPool.commonPool() to execute the code. The commonPool() is a preconfigured thread pool. We discussed thread pools in the previous post.
|
The FallibleFunction (second argument of the phase* methods) is a function that can throw an exception.
@FunctionalInterface
public interface FallibleFunction {
void run() throws Exception;
}
The exception is not propagated to the caller of the function. Instead, the exception is stored in the PhaseSync object. The object allows to check if there any exceptions were thrown by calling noExceptions()
and the details info about the exception can be provided by exceptionDetails()
method.
So, we can call any methods throwing checked exceptions without worrying about the catch clauses. For example:
phaseSync.phase(Phases.FIRST, () -> Thread.sleep(1));
The phase*
are supposed to be used in async functions (lambda expression). Java doesn’t allow throwing checked exceptions from async functions. The PhaseSync class is designed to split execution and exception handling.
PhaseSync phaseSync = new PhaseSync();
runAsync(() -> phaseSync.phase(Phases.FIRST, () -> Files.readAllLines(Paths.get("/notExistingFile"))));
phaseSync.phase(Phases.SECOND, () -> assertThrows(IOException.class, phaseSync::ifAnyExceptionRethrow));
phaseSync.phase(Phases.THIRD, () -> Files.readAllLines(Paths.get("/dev/null")));
runAsync(() -> phaseSync.phase(Phases.FOURTH, () -> assertDoesNotThrow(phaseSync::ifAnyExceptionRethrow)));
phaseSync.phase(Phases.FIFTH, () -> {});
We run the file reading in a separate thread, and then, when it’s convenient, we can pass the exception to a corresponding handler. It could be handled in a completely different thread. So, not only execution is split into phases, but error handling is split into phases too. We have full control over the execution and can reproduce any scenario we want. It will be very useful in the next post where we’re talking about dirty, non-repeatable, and phantom reads in the context of relational databases.
Implementation
Now, when we know how the class is used, let’s take a look at how it is implemented.
We need phaseSync.phase*()
methods to wait for each other and execute in the order specified by the first parameter’s value. How would you implement it? It’s worth thinking about it because there are multiple ways. You can even try to implement it as an exercise.
In my implementation, I use a condition variable and a lock. The condition variable protected by the lock is used to signal the threads that the phase owned by the lock is finished. Each phase is waiting for the condition and checks if its number is the next phase’s number. If it is, the phase is executed and the condition is signaled to pass the execution to the next phase. If it’s not, it is just back to waiting.
It works in the same way as virtual queues created by a queue management system. You come to a room where some service is provided. There is a machine printing physical tickets and a screen showing numbers. To be queued you need to get a ticket with some number printed for you. Then you just need to wait until your number is displayed on the screen, which means that now is your turn, and you can be served. Each time the screen is changed you check if it has your number. If it’s not, you continue waiting.
So, here is the implementation:
public class PhaseSync {
private final Lock lock = new ReentrantLock();
private final Condition phaseIsDone = lock.newCondition();
private Phases currentPhase = Phases.FIRST;
// some code omitted
private void phase(Phases phase, FallibleFunction execution, Consumer<Exception> exceptionHandler) {
lock.lock();
try {
while (currentPhase != phase) {
if (!phaseIsDone.await(5, TimeUnit.SECONDS)) {
exceptionHandler.accept(new Exception("Timeout waiting for " + phase));
return;
}
}
executeAndHandleExceptions(execution, exceptionHandler);
if (currentPhase.hasNext()) {
currentPhase = currentPhase.next();
}
phaseIsDone.signalAll();
} catch (InterruptedException e) {
throw new RuntimeException("Thread interrupted");
} finally {
lock.unlock();
}
}
public enum Phases {
FIRST,
SECOND,
THIRD,
FOURTH,
FIFTH,
SIXTH,
SEVENTH,
EIGHTH,
NINTH,
TENTH;
public boolean hasNext() {
return next() != null;
}
public Phases next() {
return Phases.values()[ordinal() + 1];
}
}
The remaining code is about exception handling, you can take a look at it if you are curious. The code can be found here.
Conclusion
In this post, we discussed how to protect shared resources from concurrent access, and how threads can communicate to organize collaborative execution. We applied that knowledge to implement the PhaseSync class allowing us to split execution into several phases to demonstrate concurrent issues. We’re going to use PhaseSync to talk about atomicity and isolation in the context of relational databases. See you soon!