2024. 9. 22. 21:32ㆍBackend/Java
Condition의 존재 이유는, Mutex는 상호배제를 위한 도구이고, 동기화를 위한 수단이 아니다.
따라서 특정 조건이 충족될 때까지 대기하거나 작업 간의 순서를 조정하는 기능을 제공하지 못한다.
Condition 변수는 왜 필요한 걸까?
특정 스레드가 어떤 상태 또는 행위를 수행하기를 기다리는 방법이다.
예를 들면, 문자가 buffer에 추가되길 기다릴 때 사용될 수 있다. 즉, 특정 상태가 true가 되기를 기다리는데 Condition Variable이 이용된다.
출처:https://web.stanford.edu/~ouster/cgi-bin/cs140-spring14/lecture.php?topic=locks
Locks and Condition Variables
Locks and Condition Variables Lecture Notes for CS 140 Spring 2014 John Ousterhout Readings for this topic from Operating Systems: Principles and Practice: Sections 5.2-5.4. Needed: higher-level synchronization mechanism that provides Mutual exclusion: eas
web.stanford.edu
그렇다면 Condition은 Lock과는 상관이 없어 보이는데, 왜 mutex를 필요로 할까?
mutex와 condition은 함께 동작한다. 왜 그런 것일까?
그 이유를 생각해 보면 만약 condition변수를 변경시키고 signal을 수행하는 상황을 상상해 본다면, 그 과정은 원자적으로 유지돼야 한다.
왜냐하면 condition이 바뀌고, signal을 넣는 당시에 condition이 달라질 수 있기 때문이다. 다시 말해서 이 말의 뜻은, condition을 변경시키고 signal을 보내는 과정은 결국 lock으로 보호되어야 하는 동작이다.
따라서 Condition은 Mutex와 함께 설계된 것이라고 생각한다.
Java의 Condition
Lock과 Condition 변수가 같이 쓰이면, 이걸 모니터라고 부른다.
그래서 Java에서는 synchronized라는 아주 간단한 모니터를 제공함
synchronized는 mutex가 대기하는 entry queue를 제공하고, 모니터이기 때문에 wait queue도 존재한다. 하지만 대기영역이 하나밖에 없다.
여러 대기 공간이 필요할 때는, Java에서는 ReentrantLock과 함께 사용할 있게 Conditon객체를 제공한다.
이전 블로그 포스팅에서 설명한 내용을 보면 ReentrantLock에서 lock을 획득하는 상황은
- CLH 큐에서 대기 중인 스레드가 없을 때
- CLH 큐에 진입하여 대기 후, Node가 head에 위치할 때
이렇게 위의 두 상황에서 cas를 통해 setState를 해서 lock을 획득하는데, Condtion Variable에서 await을 하게 되면 어떻게 동작하는지 살펴보려고 한다.
ConditionObject
먼저 Condition을 생성하려면 ReentrantLock에서 ReentrantLock::newCondition을 통해 생성할 수 있다.
public class ReentrantLock implements Lock, java.io.Serializable {
.
.
.
public Condition newCondition() {
return sync.newCondition();
}
}
ReentrantLock에서 쓰는 Condition은 이전 AQS의 ReentrantLock용 구현체인 Sync에서 생성할 수 있는 메서드를 제공한다.
abstract static class Sync extends AbstractQueuedSynchronizer {
.
.
.
final ConditionObject newCondition() {
return new ConditionObject();
}
.
.
.
}
위를 살펴보면 Sync에서 final 메서드로 ConditionObject가 구현되어 있기 때문에 FairSync, NonFairSync에 관계없이 모두 동일하게 동작한다.
ConditionObject는 Condition이라는 인터페이스를 구현하고, AbstractQueuedSynchronizer와 연계되어 동작하기에 AQS의 inner class로 구현되어 있다.
public abstract class AbstractQueuedSynchronizer {
.
.
public class ConditionObject implements Conditon {
.
.
public void await(~~) {
}
}
}
특이하게 static inner class로 구현되어 있지 않는데, 이렇게 되면 AQS의 인스턴스를 바탕으로 하는 ConditionObject만 만들 수 있게 된다.
이렇게 구현하면 ConditionObject에서는, 현재 인스턴스화된 AQS의 메서드에 접근할 수 있게 되어 ReentrantLock과 연계되어 동작할 수 있다.
위에서 언급했듯이, Condition는 대기하거나 신호를 줄 때 모두 Lock을 획득한 상태에서만 가능하다.
await()
Condition에서 await() 하는 상황을 살펴보자
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
ConditionNode node = new ConditionNode();
int savedState = enableWait(node);
.
.
.
}
일단 await()한다는 뜻은, signal()을 받기 전까지 무조건 대기하겠다는 뜻이다.
따라서 내부적으로 대기를 위한 ConditionNode를 생성하며, CLH 큐와 비슷하게 Node기반으로 Linked List 형태로 구현돼 있다.
ConditionNode는 AbstractQueuedSynchronizer에서 사용했던 Node를 구현하여 사용한다.
static final class ConditionNode extends Node implements ForkJoinPool.ManagedBlocker {
ConditionNode nextWaiter; // link to next waiting node
/**
* Allows Conditions to be used in ForkJoinPools without
* risking fixed pool exhaustion. This is usable only for
* untimed Condition waits, not timed versions.
*/
public final boolean isReleasable() {
return status <= 1 || Thread.currentThread().isInterrupted();
}
public final boolean block() {
while (!isReleasable()) LockSupport.park();
return true;
}
}
ConditionNode에서는 다음 대기자를 linked List 형태로 가리키게 되고, CLH Node를 상속했기 때문에 AQS에서 CLH 큐에 대기할 때처럼 대기 스레드 정보를 모두 가질 수 있게 된다.
그런데 Node에 이미 Node next가 있지만, ConditionNode nextWaiter를 통해 Condition의 조건 대기 큐를 구성한다.
private long enableWait(ConditionNode node) {
if (isHeldExclusively()) {
node.waiter = Thread.currentThread();
node.setStatusRelaxed(COND | WAITING);
ConditionNode last = lastWaiter;
if (last == null)
firstWaiter = node;
else
last.nextWaiter = node;
lastWaiter = node;
long savedState = getState();
if (release(savedState))
return savedState;
}
node.status = CANCELLED; // lock not held or inconsistent
throw new IllegalMonitorStateException();
}
await() 메서드에서 ConditionNode를 만든 뒤, enableWait메서드에서는
가장 먼저 isHeldExclusively를 통해 락을 가지고 있는 스레드에서 wait을 요청하는지 검사한다.
이후 ConditionNode에 대기 중인 스레드 정보를 waiter에 추가한다.
이렇게 lastWaiter를 가져오는데, lastWaiter가 null이라는 것은 이전에 들어온 waiter가 없다는 뜻이기 때문에 첫 번째 waiter를 node로 넣고, lastwaiter가 있다면, last의 다음에 새롭게 넣게 된다.
이후 getState를 통해 Lock의 state(lock에 진입할 때마다 1씩 증가하는 state)를 가져오고, 그 state만큼 release를 한다.
Condition의 await()의 특별한 점은, 락을 반환하고 대기한다는 것이다.
ReentrantLock은 재진입 가능한 락이기 때문에 락을 얻은 횟수만큼 release 해서 state를 0으로 만들어주어야 다른 스레드가 락을 획득할 수 있다고 판단하게 된다. (이전 포스트에서, AQS에서 락을 획득할 때는 cas연산으로 state값을 0일 때 1로 만들면서 락을 획득했다.)
public final boolean release(int arg) {
if (tryRelease(arg)) {
signalNext(head);
return true;
}
return false;
}
release에서 실제로 락을 반환하는 역할을 하는데, tryRelease를 통해 state를 돌려두고, signalNext를 통해 AQS에 다음 대기 중인 스레드를 unpark 한다.
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (getExclusiveOwnerThread() != Thread.currentThread())
throw new IllegalMonitorStateException();
boolean free = (c == 0);
if (free)
setExclusiveOwnerThread(null);
setState(c);
return free;
}
tryRelease에서는 방금 언급했던 것처럼 state가 0인 것을 확인하고,
setExclusiveOwnerThread를 null로 바꾸는 등 락을 해제하는 동작을 수행한다.
private static void signalNext(Node h) {
Node s;
if (h != null && (s = h.next) != null && s.status != 0) {
s.getAndUnsetStatus(WAITING);
LockSupport.unpark(s.waiter);
}
}
signalNext는 head를 기준으로 바로 다음 노드가 null이 아닐 때, AQS Node의 다음 락을 얻을 스레드를 깨우는 역할을 수행하게 된다.
여기까지가 await()의 enableWait(node) 메서드였고, 락을 반환하여 다음 노드를 unpark 해서 깨운다. (이전 블로그의 내용처럼, 깨워진 스레드는 loop를 통해 first==true가 되고, tryAcquire를 통해 state를 0→ 1로 바꾸어 lock을 획득한다.)
여기까지, 락을 반환하고 다음 스레드를 실행시키는 것까진 완료했다.
enableWait() 이후의 동작
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
ConditionNode node = new ConditionNode();
int savedState = enableWait(node);
LockSupport.setCurrentBlocker(this); // for back-compatibility
boolean interrupted = false, cancelled = false, rejected = false;
while (!canReacquire(node)) {
if (interrupted |= Thread.interrupted()) {
if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
break; // else interrupted after signal
} else if ((node.status & COND) != 0) {
try {
if (rejected)
node.block();
else
ForkJoinPool.managedBlock(node);
} catch (RejectedExecutionException ex) {
rejected = true;
} catch (InterruptedException ie) {
interrupted = true;
}
} else
Thread.onSpinWait(); // awoke while enqueuing
}
LockSupport.setCurrentBlocker(null);
node.clearStatus();
acquire(node, savedState, false, false, false, 0L);
if (interrupted) {
if (cancelled) {
unlinkCancelledWaiters(node);
throw new InterruptedException();
}
Thread.currentThread().interrupt();
}
}
이후 canReacquire(node)를 확인한다.
private boolean canReacquire(ConditionNode node) {
// check links, not status to avoid enqueue race
Node p; // traverse unless known to be bidirectionally linked
return node != null && (p = node.prev) != null &&
(p.next == node || isEnqueued(node));
}
이때 p = node.prev가 중요한데,
node.prev는 ConditoinNode에서는 서로 간의 연결에 사용되지 않기 때문에 항상 null일 것이다. 그래서 canReacquire는 false를 반환하는데
여기서 loop로 들어가서 ForkJoinPool.managedBlock(node)를 통해 park 하게 된다.
ForkJoinPool을 통해 LockSupport.park를 호출??
여기서 뜬금없이 ForkJoinPool의 manageBlock이라는 메서드를 호출한다.
이 메서드를 결론부터 보면 결국 LockSupport.park를 호출하는 메서드다.
하지만, 왜 lock을 획득할 때는 LockSupport.park()을 호출했던 반면, 여기에서는 왜 ForkJoinPool의 manageBlock을 호출하는 걸까?
public static void managedBlock(ManagedBlocker blocker)
throws InterruptedException {
Thread t; ForkJoinPool p;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread &&
(p = ((ForkJoinWorkerThread)t).pool) != null)
p.compensatedBlock(blocker);
else
unmanagedBlock(blocker);
}
호출하는 메서드는 위처럼 생겼다.
일단 일반적인 상황에서는 ForkJoinWorkerThread가 아닐 테니깐 아래의 else 메서드가 호출된다.
그런데 또 파라미터는 타입은 ManageBlocker이다. 분명 ConditionNode를 넣었는데, 자세히 보면
static final class ConditionNode extends Node
implements ForkJoinPool.ManagedBlocker {
ConditionNode nextWaiter; // link to next waiting node
/**
* Allows Conditions to be used in ForkJoinPools without
* risking fixed pool exhaustion. This is usable only for
* untimed Condition waits, not timed versions.
*/
public final boolean isReleasable() {
return status <= 1 || Thread.currentThread().isInterrupted();
}
public final boolean block() {
while (!isReleasable()) LockSupport.park();
return true;
}
}
ManaggeBlocker의 isReleasable메서드와 block메서드를 구현하고 있다.
이 isReleasable을 통해, lock을 획득 가능할 때 조건을 확인할 수 있다. block메서드는
private static void unmanagedBlock(ManagedBlocker blocker)
throws InterruptedException {
if (blocker == null) throw new NullPointerException();
do {} while (!blocker.isReleasable() && !blocker.block());
}
while loop 내에서 사용된다.
releasable 하지 않다면 block 한다. 여기 block에서 ConditionNode의 LockSupport.park를 호출한다.
만약 ForkJoinPool의 노드였다면, 내부에서 현재 스레드의 ForkJoinPool을 가져와서 compensatedBlock이라는 메서드가 호출되는데
private void compensatedBlock(ManagedBlocker blocker)
throws InterruptedException {
if (blocker == null) throw new NullPointerException();
for (;;) {
int comp; boolean done;
long c = ctl;
if (blocker.isReleasable())
break;
if ((comp = tryCompensate(c)) >= 0) {
long post = (comp == 0) ? 0L : RC_UNIT;
try {
done = blocker.block();
} finally {
getAndAddCtl(post);
}
if (done)
break;
}
}
}
이 메서드는 ForkJoinPool의 메서드인데,
스레드가 블로킹될 때 다른 스레드를 만들어서 작업을 계속 이어나가도록 하는 기능을 수행한다.
release가능하지 않을 때 tryCompensate메서드를 호출하는데, 내부에서 tryCompensate()를 호출하게 되면 보상 스레드를 만드는 과정에 관여한다.
tryCompensate에 대한 로직은 이번 포스팅이 Condition의 원리를 설명하기 때문에 상세 설명은 넘기겠다.
그렇다면 왜 굳이 ForkJoinPool을?
Condition에서 ForkJoinPool의 static 메서드를 이용하고 있는 구조이다.
이것 때문에 Condition에서 ForkJoinPool에 의존성이 생기는 것 때문에 어색해 보인다.
ReentrantLock에서는 직접 LockSupport.park()을 직접 이용하는데 반해 왜 이렇게까지 굳이 Condition에서 ForkJoinPool의 manageBlock을 통해 park을 수행했을까?
이 이유에 대해 생각해 봤는데, Lock을 대기하는 것과 wait으로 대기하는 것의 차이가 있기 때문인 것 같았다.
예를 들어 Lock이 대기하고 있다면, 임계영역에 다른 스레드가 작업 중이기 때문에 굳이 굳이 다른 스레드가 같은 작업을 하게 할 필요가 없다.
하지만 await()이라는 거는 락을 반환하고 대기하는 것이기 때문에 다른 스레드가 락을 얻고 작업을 수행해야 한다.
만약 ForkJoinPool을 사용 중이라면, await 하는 경우에는 다른 스레드를 하나 만들어 락을 얻고 다른 작업을 수행하도록 만들어야 자연스럽기 때문에 이렇게 구현해 둔 것 같다.
만약 이해가 안 간다면 아마 ForkJoinPool의 스레드가 한 개뿐인 상황을 고민해 보면 좋을 것 같다.
대기에서 깨어나는 방법은?
만약 signal을 받게 된다면 이 ConditionNode가 AQS 대기열로 이동하면서 prev와 next가 설정된다.
따라서 canRequire메서드에서는 이 node.prev를 바탕으로 unpark 된 이후 loop를 탈출한다.
이후 acquire메서드에서는 CLH 큐에 들어간 상태기 때문에 추가적으로 Node를 삽입하진 않고, loop를 돌면서 park 하게 된다.
여기에서 signal을 주면 어떤 일이 일어나길래 ConditionNode가 CLH 대기 큐로 이동하는지 알아보겠다.
public final void signal() {
ConditionNode first = firstWaiter;
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
if (first != null)
doSignal(first, false);
}
signal이 호출되면, ConditionNode에서 대기하는 Node들의 첫 번째 Node를 가져온다.
이후 first가 null이 아니라면 doSignal메서드를 호출하는데
private void doSignal(ConditionNode first, boolean all) {
while (first != null) {
ConditionNode next = first.nextWaiter;
if ((firstWaiter = next) == null)
lastWaiter = null;
if ((first.getAndUnsetStatus(COND) & COND) != 0) {
enqueue(first);
if (!all)
break;
}
first = next;
}
}
doSignal에서는 다음 노드를 first로 바꿔주는 동작을 수행한 후, ConditionStatus를 제거하고, enqueue를 수행한다.
/**
* Enqueues the node unless null. (Currently used only for
* ConditionNodes; other cases are interleaved with acquires.)
*/
final void enqueue(Node node) {
if (node != null) {
for (;;) {
Node t = tail;
node.setPrevRelaxed(t); // avoid unnecessary fence
if (t == null) // initialize
tryInitializeHead();
else if (casTail(t, node)) {
t.next = node;
if (t.status < 0) // wake up to clean link
LockSupport.unpark(node.waiter);
break;
}
}
}
}
이 enqueue는 ConditionNodes에서만 쓰이는 메서드인데, enqueue가 호출되면 casTail을 통해 CLH 큐의 가장 뒤로 연결된다.
이 부분으로 인해 ArrayBlockingQueue의 공정모드에서 조차도 Thread starvation이 발생할 수 있었고, 해당 부분에 대해 OpenJDK에 이슈를 남겼다.
https://mail.openjdk.org/pipermail/core-libs-dev/2024-September/129090.html
[External] : Re: [POTENTIAL BUG] Potential FIFO violation in BlockingQueue under high contention and suggestion for fair mode in
mail.openjdk.org
제목을 잘못 적은 탓에 제목에서 말하고자 하는 내용이 하나도 전달되지 못한 것 같다. 확인 한 번 더 하고 메일을 보냈어야 했는데,,
아무튼 이 부분에 대해서 OpenJDK Contributer도 해당 가능성을 인지했고, 해결해야 할 문제인지, 성능상의 trade-off인지 판단하는 의견을 듣기 위해 기다리고 있다.
정리
마지막으로 정리하면 condition에서 await 하던 스레드는 signal을 받게 되면 enqueue를 통해 AQS CLH큐의 뒤에 붙게 되면서 기존 canReacquire를 탈출하여 AQS의 acquire메서드를 통해 락을 얻기를 기다리게 된다.
'Backend > Java' 카테고리의 다른 글
Synchronized, Heavy-weight Lock 동작 원리, ReentrantLock과의 성능 비교 (3) | 2024.10.03 |
---|---|
제네릭 배열은 왜 안 될까?, TypeReference는 어떻게 런타임에 제네릭 타입 정보를 보존할까? (1) | 2024.09.26 |
ReentrantLock이 동작하는 원리 (AbstractQueuedSynchronizer) (1) | 2024.09.17 |