ReentrantLock이 동작하는 원리 (AbstractQueuedSynchronizer)

2024. 9. 17. 17:38Backend/Java

이번에 OpenJDK에 이슈를 작성하면서 ReentrantLock에 대해서 깊게 공부할 기회가 생겼다. 

 

OpenJDK 컨트리뷰터와 이야기하면서 얻은 지식이 파편화되어 있어 한 번 글로 써보면서 큰 흐름을 정리하고 싶어 글을 작성했다.


자바에서는 기본적으로 모니터락을 사용하는 syncrhonized가 있다.

 

synchronzied 키워드 하나만으로도 임계영역의 스레드 접근을 제어할 수 있지만, 내부적으로 blocking 되었을 때 interrupt 등으로 취소할 수도 없고 타임아웃 기능도 없다. synchroinized 또한 재진입 가능, wait도 모두 가능하지만, wait 할 수 있는 대기 공간이 하나로 제한돼 있는 점들이 있다는 단점이 있다.

 

이를 해결하기 위해 코드 상으로 유연하게 락을 걸 수 있는 ReentrantLock이 있다.

 

ReentrantLock이 lock()을 통해 락을 거는 상황을 간단하게 살펴보면, 가장 먼저 initialTryLock()을 통해 락을 시도하고, 이게 실패한 경우 acquire()를 통해 락을 시도하고, 대기하도록 할 수 있다.

 

이때 initialTryLock은 Sync의 추상 메서드로 구현돼 있어서 ReentrantLock이 락을 걸 때 내부 필드로 가지고 있는 Sync 추상 클래스의 lock() 메서드를 통해 락을 걸게 된다.


Sync 클래스는 ReentrantLock, CountDownLatch 내부의 static inner클래스로 구현돼 있고, 필요에 따라 Sync에 맞추어 (FairSync or NonFairSync) 구현하여 사용하도록 한다.

Sync 클래스는 위 처럼 ReentrantLock, CountDownLatch 내부의 static inner클래스로 구현돼있고, 필요에 따라 Sync에 맞추어 (FairSync or NonFairSync) 구현하여 사용하도록 한다.

 

SyncAbstractQueuedSynchronizer(AQS)라는 클래스를 구현한 추상 클래스로, ReentrantLock의 엄격성 여부에 따라 ‘공정 모드’, ‘비공정 모드’를 제공하는데, 그에 따라 Sync 추상클래스를 구현한 NonFairSync나, FairSync 구현체를 ReentrantLock에서 사용하게 된다.

 

공정 모드는 스레드가 순차적으로 락을 획득하게 하고, 비공정 모드는 재빠르게 락을 획득하려는 스레드가 우선권을 가지게 된다.


AbstractQueuedSynchronzier(AQS)란?

자바 내에서 동기화 장치를 만들기 위한 기본 뼈대를 제공하는 역할을 수행한다.

 

AQS에서는 Node 기반의 FIFO 대기 큐 (CLH 대기 큐라고 함)를 통해 락을 얻지 못한 스레드가 큐에서 대기할 수 있게 하고. state 값을 통해 락을 획득한 횟수를 나타낸다.

 

CountDownLatch나, ReentrantLock, Semaphore 들이 다 위에서 말한 Sync, 즉 AQS를 기반으로 구현돼 있다

 

그리고 이 AQS의 공통 기능을 ReentrantLock 등의 동기화 장치들의 특성에 맞게 구현해서 대기 큐를 바탕으로 추가적인 동작을 정의해서 동기화 장치를 완성시킨다.

 

예를 들어 ReentrantLock에서는 저 CLH 대기 큐와 state값을 이용해 state가 1 이상이면 그 외 스레드들은 모두 CLH 큐에 대기를 하게 하는 반면 Semaphore에서는 state가 임계값 이상을 가지면 CLH 큐에서 대기를 하게 하는 등 요구에 맞게 구현하게 된다.

 

설명하기 앞서 CLH 대기 큐를 간단하게 설명해 보면 락을 획득하려는 스레드가 자신의 노드를 큐에 추가하고 락을 얻기를 대기하는 방식으로 동작한다.

 


락을 얻는 과정을 더 자세하게 살펴보면

ReentrantLock.lock()을 통해 lock을 얻는 과정을 락을 얻을 수 있을 때, 락을 얻을 수 없을 때 크게 두 가지로 살펴보려 한다.

lock()을 호출할 때 내부 로직을 살펴보면

final void lock() {
    if (!initialTryLock())
        acquire(1);
}

initialTryLock()를 호출한다. 이는 간단하게 락을 한 번 얻어보는 동작을 수행하고, 이때 성공하면 바로 락 획득, 아닌 경우에는 acquire에서 추가적인 동작을 수행한다.

 

initialTryLock()은 FairSync, NonFairSync에 따라 구현 로직이 다른데 NonFairSync의 경우에서의 로직을 바탕으로 설명해 보겠다.

 

initialTryLock에서는 내부적으로 CompareAndSet을 통해 state를 0에서 1로 바꾸려고 한다.

final boolean initialTryLock() {
    Thread current = Thread.currentThread();
    if (compareAndSetState(0, 1)) { // first attempt is unguarded
        setExclusiveOwnerThread(current);
        return true;
    } else if (getExclusiveOwnerThread() == current) {
        int c = getState() + 1;
        if (c < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(c);
        return true;
    } else
        return false;
}

이때 state가 0일 때, 1로 바꾸는 것의 의미는, 락을 소유하고 있지 않을 때, 내가 락을 1번 얻은 것으로 바꾸는 의미이다.

 

만약 여기서 CAS를 성공한다면 바로 락을 획득하는 것이고, 아닌 경우에는 false를 통해 빠져나와서 acquire()메서드를 호출하여 대기하는 등의 동작을 수행하게 된다.

 

else if절에서는, ReentrantLock이 재진입 가능한 점으getExclusiveOwnerThread()를 통해, 현재 락을 쥐고 있는 스레드가 현재 요청하는 스레드와 같다면 단순하게 state를 증가시키는 동작을 수행한다.

 

만약 NonFairSync가 아니라 FairSync였다면 initialTryLock이 조금 달라지는데

final boolean initialTryLock() {
    Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (!hasQueuedThreads() && compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    } else if (getExclusiveOwnerThread() == current) {
        if (++c < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(c);
        return true;
    }
    return false;
}

내부적으로 hasQueuedThreads()를 통해 이미 CLH 대기 큐에서 대기하고 있는 스레드가 있는지 살펴보는 방법으로 공정성을 보장한다.

acquire에서는 AQS 내에서 final로 구현된 메서드를 실행하게 되는데

public final void acquire(int arg) {
    if (!tryAcquire(arg))
        acquire(null, arg, false, false, false, 0L);
}

내부적으로 또 tryAcquire라는 추상 메서드를 호출해서 FairSyncNonFairSync에 따라 다른 동작을 수행하게 된다.

 

tryAcquire는 또한 이전의 CAS를 한 번 실행해 봤던 걸 다시 한번 수행해 보게 된다.

 

NonFairSync에서는

protected final boolean tryAcquire(int acquires) {
    if (getState() == 0 && compareAndSetState(0, acquires)) {
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
    }
    return false;
}

 

FairSync에서는 이전과 유사한 hasQueuedPredecessors를 통해 대기 중인 스레드가 있는지 확인한다. 이는,

 protected final boolean tryAcquire(int acquires) {
      if (getState() == 0 && !hasQueuedPredecessors() &&
          compareAndSetState(0, acquires)) {
          setExclusiveOwnerThread(Thread.currentThread());
          return true;
      }
      return false;
  }

hasQueuedPredecessors()는 위의 hasQueuedThreads()랑은 조금 다른 점이 존재하는데,

 

현재 CLH 큐의 head가 현재 스레드 거나, 대기하는 스레드가 없는 경우 true를 반환한다.

 

이걸 보면서 왜 굳이 같은 setState를 initialTryLocktryAcquire에서 2번을 반복하지?라는 의심이 생겼다.

 

이 이유에 대해 정확하게는 모르겠지만, CLH Queue에 일단 들어가게 되면 park 하게 되고, 이는 스레드가 waiting상태로 변해야 하는 고비용의 작업이기 때문에 (park를 호출하면 JNI를 통해 커널 시스템콜을 호출하게 됨) CLH Queue에 들어가기 전에 최대한 그런 현상을 최소화하기 위해 CLH Queue 직전에 한 번 더 확인하는 것 같다.

 

정리하면 2번의 시도까지 락을 얻지 못하면 acquire라는 AQS의 대기 큐와 직접 관련된 메서드를 호출하게 된다.

final int acquire(Node node, int arg, boolean shared,
                      boolean interruptible, boolean timed, long time) {
    Thread current = Thread.currentThread();
    byte spins = 0, postSpins = 0;   // retries upon unpark of first thread
    boolean interrupted = false, first = false;
    Node pred = null;                // predecessor of node when enqueued

    /*
     * Repeatedly:
     *  Check if node now first
     *    if so, ensure head stable, else ensure valid predecessor
     *  if node is first or not yet enqueued, try acquiring
     *  else if node not yet created, create it
     *  else if not yet enqueued, try once to enqueue
     *  else if woken from park, retry (up to postSpins times)
     *  else if WAITING status not set, set and retry
     *  else park and clear WAITING status, and check cancellation
     */

    for (;;) {
        if (!first && (pred = (node == null) ? null : node.prev) != null &&
            !(first = (head == pred))) {
            if (pred.status < 0) {
                cleanQueue();           // predecessor cancelled
                continue;
            } else if (pred.prev == null) {
                Thread.onSpinWait();    // ensure serialization
                continue;
            }
        }
        if (first || pred == null) {
            boolean acquired;
            try {
                if (shared)
                    acquired = (tryAcquireShared(arg) >= 0);
                else
                    acquired = tryAcquire(arg);
            } catch (Throwable ex) {
                cancelAcquire(node, interrupted, false);
                throw ex;
            }
            if (acquired) {
                if (first) {
                    node.prev = null;
                    head = node;
                    pred.next = null;
                    node.waiter = null;
                    if (shared)
                        signalNextIfShared(node);
                    if (interrupted)
                        current.interrupt();
                }
                return 1;
            }
        }
        if (node == null) {                 // allocate; retry before enqueue
            if (shared)
                node = new SharedNode();
            else
                node = new ExclusiveNode();
        } else if (pred == null) {          // try to enqueue
            node.waiter = current;
            Node t = tail;
            node.setPrevRelaxed(t);         // avoid unnecessary fence
            if (t == null)
                tryInitializeHead();
            else if (!casTail(t, node))
                node.setPrevRelaxed(null);  // back out
            else
                t.next = node;
        } else if (first && spins != 0) {
            --spins;                        // reduce unfairness on rewaits
            Thread.onSpinWait();
        } else if (node.status == 0) {
            node.status = WAITING;          // enable signal and recheck
        } else {
            long nanos;
            spins = postSpins = (byte)((postSpins << 1) | 1);
            if (!timed)
                LockSupport.park(this);
            else if ((nanos = time - System.nanoTime()) > 0L)
                LockSupport.parkNanos(this, nanos);
            else
                break;
            node.clearStatus();
            if ((interrupted |= Thread.interrupted()) && interruptible)
                break;
        }
    }
    return cancelAcquire(node, interrupted, interruptible);
}

큼지막하게 살펴보면 if(node == null)에서 null이 올 테니깐 shared에 따라 ExclusiveNode, SharedNode를 생성한다. 이때의 Node가 위에서 언급한 CLH Node가 된다.

 

CLH Node란 이중 연결 리스트의 노드인데, prev, next를 가지고 FIFO 대기열을 구성하고, status와 Thread waiter를 두어서 unpark 하고 status를 통해 작업을 수행할 수 있게 도와주는 역할을 한다.

 

만약 node를 만들고, enqueue가 된 적이 없다면 (pred==null)

큐에 삽입하려는 시도를 하게 된다

else if (pred == null) {          // try to enqueue
    node.waiter = current;
    Node t = tail;
    node.setPrevRelaxed(t);         // avoid unnecessary fence
    if (t == null)
        tryInitializeHead();
    else if (!casTail(t, node))
        node.setPrevRelaxed(null);  // back out
    else
        t.next = node;
}

현재 큐의 마지막 노드를 가져와서 node.setPrevRelaxed(t);를 통해 이전 노드를 tail로 설정하고

 

casTail을 통해 cas연산으로 tail에 Node를 부착하여 FIFO 큐를 구현하는데, 내부적으로는 Unsafe의 compareAndSetReference로 구현돼 있다.

 

만약 cas에 실패하면 setPrevRelaxed를 통해 복구하는 동작을 수행한다. 이 과정은 for loop 내에서 수행되기 때문에, 실패할 경우 enqueue에 성공할 때까지 여러 번 반복하게 된다.

 

성공한 경우에는 t.next를 현재 노드로 설정하여 이중 연결 리스트를 완성한다.

else if (node.status == 0) {
    node.status = WAITING;          // enable signal and recheck
} else {
    long nanos;
    spins = postSpins = (byte)((postSpins << 1) | 1);
    if (!timed)
        LockSupport.park(this);
    else if ((nanos = time - System.nanoTime()) > 0L)
        LockSupport.parkNanos(this, nanos);
    else
        break;
    node.clearStatus();
    if ((interrupted |= Thread.interrupted()) && interruptible)
        break;
}

node의 상태는 초기에는 0이기 때문에 node.status를 WAITING 상태로 바꾸고 다음 loop에서 park 하게 된다.

 

time waiting인 경우인 tryLockNanos였을 때에는 timed조건을 만족해서 nanos만큼 park 하게 되고, 아닌 경우에는 일반 park를 수행하게 된다.

 

이 과정을 통해 락을 즉시 얻지 못할 경우에는 thread를 waiting상태로 바꾸어 효율성을 증대한다.


락을 얻을 때 spin의 역할은?

락을 얻는 과정은 타 스레드가 unpark을 해주면 현재 스레드에서 park가 풀리면서 tryAcquire를 통해 다시 락을 획득하기만 하면 될 것 같았는데, 추가적으로 spin이라는 값이 있었다.

 

tryAcquire를 하면서 spin을 내부적으로 1씩 줄이고, spin이 0이 되었을 때 spins = postSpins = (byte)((postSpins << 1) | 1);를 실행한다.

 

이는 이전에 맥락상 설명하지 않았던 spin부분인데, 내부적으로 spin을 통해 락 획득을 최적화하고 있다. spin값은 CLH Node가 CLH Queue의 head에 있을 때 역할을 수행하게 되는데

 

head에 있다는 뜻은, 다음에 락을 획득할 차례라는 뜻이다. 따라서 head에 있을 때는 최대한 park를 호출하지 않고, 락을 얻을 수 있을 때까지 spin 하면서 lock을 얻으라는 의미로 이해할 수 있다.

 

그런데 생각해 보면, 락을 얻을 수 있을 때 타 스레드로부터 unlock에서 unpark 신호를 받을 것인데 왜 head에 있을 때 spin을 하는 거지?라는 생각이 들었다.

 

이는 NonFairSync(비공정모드)에서 유효하게 사용되는데 대기 큐에 있어도, head가 unpark 이후 락을 획득하는 사이에 다른 스레드가 락을 얻어버릴 수 있기 때문이다.

 

매번 이렇게 선점당한다면 unpark을 받자마자 다시 park 상태로 바뀌게 될 것이고, 이걸 최적화하기 위해 일정 횟수 spin을 시도하면서 head Node가 락을 얻기 위해 시도하는 것이다.

 

사실 FairSync였으면 애초에 뒤에 오는 락 요청이 hasQueuedPredecessors에서 걸려서 CLH 큐의 뒤에 enqueue 될 것이다. 따라서 spin을 전혀 사용하지 않게 될 것이다.

 

그래서 NonFairSync일 때 이미 락이 선점당해도 spin 횟수만큼 tryAcquire 하여 락을 얻게 하여 최적화하는 것이다.

 

그리고 spin이 0이 됐을 때 실행하는 spins = postSpins = (byte)((postSpins << 1) | 1);이 부분은

 

spin값을 모두 소진할 때마다 더 많이 spin 하게 하여 과도한 starvation을 막게 함과 동시에 또다시 unpark한 스레드가 다시 park하는 현상을 최소화하기 위한 용도이고, 이 spin 횟수는 exponential 하게 증가시키는 것으로 이해해도 무방할 듯하다.


락을 놓는 unlock의 동작

unlock 메서드는 내부적으로 Sync의 release를 호출한다.

 public final boolean release(int arg) {
      if (tryRelease(arg)) {
          signalNext(head);
          return true;
      }
      return false;
  }

tryRelease는 Sync 공통 메서드로 구현돼 있어서 FairSync나 NonFairSync모두 로직이 동일하다.

 

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (getExclusiveOwnerThread() != Thread.currentThread())
        throw new IllegalMonitorStateException();
    boolean free = (c == 0);
    if (free)
        setExclusiveOwnerThread(null);
    setState(c);
    return free;
}

Mutex와 같은 역할을 하는 ReentrantLock은 lock을 소유하고 있는 스레드만 락을 해제할 수 있기 때문에 예외처리가 돼있고,

tryRelease를 통해 lock을 걸 때 증가시켰던 state값을 원래대로 감소한다.

 

 

이후 signalNext를 호출하는데

private static void signalNext(Node h) {
    Node s;
    if (h != null && (s = h.next) != null && s.status != 0) {
        s.getAndUnsetStatus(WAITING);
        LockSupport.unpark(s.waiter);
    }
}

head가 null이 아닐 때, head의 다음 (다음 실행할 스레드)의 waiter에게 unpark을 호출한다. 이렇게 다음 노드의 스레드를 깨워줌으로써 lock을 얻은 순서대로 동작하게 된다.

 

unpark받은 스레드는 처음에는 first==true가 아니였지만 acquire에서 loop를 돌면서 node.prev가 head가 되어 first==true로 바뀌게 되고, 
tryAcquire에서 state가 0이었기 때문에 lock을 성공적으로 얻어서, head를 본인으로 바꾼다.

 

코드로 살펴보면

public int acquire(~~) {
.
.
.

	if (first || pred == null) {
	    boolean acquired;
	    try {
	        if (shared)
	            acquired = (tryAcquireShared(arg) >= 0);
	        else
	            acquired = tryAcquire(arg);
	    } catch (Throwable ex) {
	        cancelAcquire(node, interrupted, false);
	        throw ex;
	    }
	    if (acquired) {
	        if (first) {
	            node.prev = null;
	            head = node;
	            pred.next = null;
	            node.waiter = null;
	            if (shared)
	                signalNextIfShared(node);
	            if (interrupted)
	                current.interrupt();
	        }
	        return 1;
	    }
	}
.
.
.
}

위 조건에서 first가 true조건을 만족하게 될 것이고, 내부적으로 이전의 tryAcquire를 통해 compareAndSet을 수행한다.

이때 acquire가 만족해서 락을 획득하면

if (acquired) {
    if (first) {
        node.prev = null;
        head = node;
        pred.next = null;
        node.waiter = null;
        if (shared)
            signalNextIfShared(node);
        if (interrupted)
            current.interrupt();
    }
    return 1;
}

head를 현재 node로 바꾸면서 현재 락을 얻은 CLH Node를 맨 앞으로 당기게 된다.


정리

이걸 전체적으로 간략하게 정리해 보면

ReentrantLock은 내부적으로 CLH Queue로 대기 스레드들을 관리한다.

 

만약 CLH Queue가 비어있으면 즉시 실행, CLH Queue에 넣은 이후, 본인이 Head가 된 상태는 Lock을 얻은 상태라고 이해할 수 있다

 

CLH Queue에 넣는 비용이 상대적으로 크기 때문에

lock을 걸 때에는, 대기 스레드가 하나라도 있는지 확인 후 없으면 즉시 lock 획득한다. (cas로 setState)

 

대기 스레드가 있을 경우에는 head가 본인인지 여부 확인 후 park하는 loop로 진입 하게 된다

 

FairSync에서는 모니터락처럼 내부적으로 spin을 적극적으로 사용한다고는 볼 수 없을 것 같고,

NonFairSync에서는 head에 있을 때 unpark 된 상황에서는 park를 최소화하기 위해 spin 하여 lock을 획득하려고 한다.

 

unlock을 할 때에는 CLH Queue이 자기 자신일 테니, 다음 next에 unpark신호를 보내서 작동하도록 하고, 그 unpark를 받은 스레드는 tryAcquire를 통해 락을 얻고, 얻는 데 성공하면 CLH Node에서 본인을 head로 바꿈으로써 락을 완전히 획득하게 된다.