Java-AbstractQueuedSynchronizer

作者 chauncy 日期 2016-12-09
Java-AbstractQueuedSynchronizer

Java 中的 ReentrantLock Semaphore ReentrantReadWriteLock等 这些同步的基础都是依靠AbstractQueuedSynchronizer的类来实现。为了方便起见下面使用AQS代替AbstractQueuedSynchronizer。

从ReentrantLock看AQS:
对于ReentrantLock 通常使用的如下:

reentrantLock.lock();
//to do something
reentrantLock.unlock();

它可以保证to do something 这段代码在同一个时间有且只有一个线程执行这断代码,其余的现场将被挂起,直到获得锁。

reentrantLock.lock()的实现:

public void lock() {
       sync.lock();
   }

ReentrantLock内部有一个代理类来具体的完成这个操作,ReentrantLock主要分为公平锁,和非公平锁,而这个的具体实现就是拓展了AQS

abstract static class Sync extends AbstractQueuedSynchronizer {...}
static final class NonfairSync extends Sync {...}
static final class FairSync extends Sync {...}

公平锁:类似于排队一样,先到先服务的原则
非公平锁:绕过排队直接插入,当然也有插入不进去的:)

公平锁的获取:

final void lock() {
            acquire(1);
        }

其中acquire 方法为AQS的 acquire方法

public final void acquire(int arg) {
       if (!tryAcquire(arg) &&
           acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
           selfInterrupt();
   }

这代码就是尝试的获取锁,如果获取失败,将这个请求加入的到获取锁的队列之中

tryAcquire方法在AQS类中么有给出具体的实现,显然是交给子类自己去实现:

protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

那就来看一下他的子类公平锁的具体实现:

protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

方法getState 的实现是在AQS中,使用一个volatile变了来表示一个锁的使用状态

private volatile int state;
protected final int getState() {
    return state;
}

如果 state == 0
表示锁没有被其他线程使用然后在调用hasQueuedPredecessors()方法判断队列是否还有其他线程,如果没有没有其他线程,说明没有其他线程正在占有锁,那就进行获取锁,将state 设置为获取状态,如果通过CAS(compareAndSetState)操作将状态为更新成功则代表当前线程获取锁,因此,将当前线程设置到AQS的一个变量中,说明这个线程拿走了锁。

如果不为0 意味着,锁已经被拿走了,但是,因为ReentrantLock是重入锁,是可以重复lock,unlock的,只要成对出现行。一次。这里还要再判断一次 获取锁的线程是不是当前请求锁的线程。如果是的,累加在state字段上就可以了。致此,如果获取到锁,那就返回true ,如果么有获取到,那就返回false,然后将获取的锁的线程放入到队列中,只是在放入到队列之前要进行包装一下:

private Node addWaiter(Node mode) {
     Node node = new Node(Thread.currentThread(), mode);
     // Try the fast path of enq; backup to full enq on failure
     Node pred = tail;
     if (pred != null) {
         node.prev = pred;
         if (compareAndSetTail(pred, node)) {
             pred.next = node;
             return node;
         }
     }
     enq(node);
     return node;
 }

使用当前线程通过设置mode(独占,共享)构造一个node节点,创建这个节点加入到队列的尾部,如果队列不为空队列,那么通过CAS操作将当前节点设置为最新想获取锁的节点,如果失败则将进入一个死循环进行更改。将获取锁的节点加入到队列还需要将当前线程挂起

final boolean acquireQueued(final Node node, int arg) {
      boolean failed = true;
      try {
          boolean interrupted = false;
          for (;;) {
              final Node p = node.predecessor();
              if (p == head && tryAcquire(arg)) {
                  setHead(node);
                  p.next = null; // help GC
                  failed = false;
                  return interrupted;
              }
              if (shouldParkAfterFailedAcquire(p, node) &&
                  parkAndCheckInterrupt())
                  interrupted = true;
          }
      } finally {
          if (failed)
              cancelAcquire(node);
      }
  }

if (p == head && tryAcquire(arg)) 如果当前的节点是head说明他是队列中第一个“有效的”节点,因此尝试获取,上文中有提到这个类是交给子类去扩展的, shouldParkAfterFailedAcquire(p, node) 否则,检查前一个节点的状态为,看当前获取锁失败的线程是否需要挂起。/如果需要,借助JUC包下的LockSopport类的静态方法Park挂起当前线程。知道被唤醒。

获取锁,说完了,释放锁也有异曲同工之妙,和获取锁的相应对应,获取一个锁,标示为+1,释放一个锁,标志位-1。具体的释放过程也只子类自己去实现:

protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread()) //如果释放的线程和获取锁的线程不是同一个,抛出非法监视器状态异常。
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {//因为是重入的关系,不是每次释放锁c都等于0,直到最后一次释放锁时,才通知AQS不需要再记录哪个线程正在获取锁。
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

释放锁成功过后,找到AQS头节点,并且进行唤醒线程,寻找的顺序是从队列尾部开始往前去找的最前面的一个waitStatus小于0的节点

private void unparkSuccessor(Node node) {
      int ws = node.waitStatus;
      if (ws < 0)
          compareAndSetWaitStatus(node, ws, 0);
      Node s = node.next;
      if (s == null || s.waitStatus > 0) {
          s = null;
          for (Node t = tail; t != null && t != node; t = t.prev)
              if (t.waitStatus <= 0)
                  s = t;
      }
      if (s != null)
          LockSupport.unpark(s.thread);
  }

公平锁,获取和释放都说完了,那非公平的获取和释放那就很好理解了,他们之间的区别就在于非公平的锁,在获取的时候进行更改,如果更改不成功那还是要入队列的

final void lock() {
           if (compareAndSetState(0, 1))
               setExclusiveOwnerThread(Thread.currentThread());
           else
               acquire(1);
       }

AQS只是维护一个状态,一个控制各个线程何时可以访问的状态,它只对状态负责,而这个状态表示什么含义,由子类自己去定义。