心智圖資源庫 Java 並發鎖工具
這是一篇關於Java 並發鎖工具的心智圖,這些並發鎖定工具可以在Java的並發程式設計中發揮重要作用,幫助你寫出高效、安全的並發程式碼。
編輯於2024-01-18 10:28:15Microbiologia medica, Infezioni batteriche e immunità riassume e organizza i punti di conoscenza per aiutare gli studenti a comprendere e ricordare. Studia in modo più efficiente!
La teoria cinetica dei gas rivela la natura microscopica dei fenomeni termici macroscopici e le leggi dei gas trovando la relazione tra quantità macroscopiche e quantità microscopiche. Dal punto di vista del movimento molecolare, vengono utilizzati metodi statistici per studiare le proprietà macroscopiche e modificare i modelli di movimento termico delle molecole di gas.
Este é um mapa mental sobre uma breve história do tempo. "Uma Breve História do Tempo" é um trabalho científico popular com influência de longo alcance. Ele não apenas introduz os conceitos básicos da cosmologia e da relatividade, mas também discute os buracos negros e a expansão. Do universo. questões científicas de ponta, como inflação e teoria das cordas.
Microbiologia medica, Infezioni batteriche e immunità riassume e organizza i punti di conoscenza per aiutare gli studenti a comprendere e ricordare. Studia in modo più efficiente!
La teoria cinetica dei gas rivela la natura microscopica dei fenomeni termici macroscopici e le leggi dei gas trovando la relazione tra quantità macroscopiche e quantità microscopiche. Dal punto di vista del movimento molecolare, vengono utilizzati metodi statistici per studiare le proprietà macroscopiche e modificare i modelli di movimento termico delle molecole di gas.
Este é um mapa mental sobre uma breve história do tempo. "Uma Breve História do Tempo" é um trabalho científico popular com influência de longo alcance. Ele não apenas introduz os conceitos básicos da cosmologia e da relatividade, mas também discute os buracos negros e a expansão. Do universo. questões científicas de ponta, como inflação e teoria das cordas.
Java 並發鎖工具
AbstractQueueSynchronizer
簡介
建構鎖或其他同步元件的基礎框架,主要用來管理同步狀態
主要使用方式是繼承
只能在一個時刻發生阻塞,降低上下文切換的開銷
繼承
AbstractOwnableSynchronizer
讓線程以獨佔模式佔用的同步器的基礎實現
成員變數
private transient Thread exclusiveOwnerThread
目前獨佔同步狀態的線程
主要方法
protected final void setExclusiveOwnerThread(Thread thread)
設定獨佔線程
protected final Thread getExclusiveOwnerThread()
取得當前獨佔線程
實現介面
Serializable
內部類別
static final class Node
簡介
AQS內部透過雙向的FIFO同步佇列來完成同步狀態的管理,當有執行緒取得失敗後被加入到佇列末尾,佇列的結構就是Node;頭尾節點定義在AQS中 條件隊列(condition queue)也使用Node的定義,但處於另一個隊列中
成員變數
靜態常數
static final Node SHARED = new Node()
表示節點處於共享模式
static final Node EXCLUSIVE = null
表示節點處於獨佔模式,同一時刻只有一個執行緒持有同步狀態
static final int CANCELLED = 1
結點(線程)因為超時或中斷而處於該狀態,後續不會變為其他狀態,也不應該再被阻塞
static final int SIGNAL = -1
當節點(執行緒)處於該狀態時,後繼節點處於或將處於被阻塞狀態(透過park),所以噹噹前節點釋放鎖定或被取消時需要呼叫unpark喚醒後繼節點。
static final int CONDITION = -2
表示目前節點(執行緒)處於條件佇列中(透過await),此時不屬於同步佇列的一部分,直到某一刻狀態被設定為0為止(透過signal)
static final int PROPAGATE = -3
傳播共享鎖(頭節點使用)
volatile int waitStatus
包括上面的CANCELLED/SIGNAL/CONDITION/PROPAGATE和0(表示不是這四種狀態),非負的數表示該節點不需要交互
volatile Node prev
前驅節點
volatile Node next
後繼節點
volatile Thread thread
節點對應的線程
Node nextWaiter
在同步佇列中表示節點處於共享或獨佔狀態,等於SHARED表示共享,為null表示獨佔;在條件佇列中表示指向下一個節點的引用
使用Condition僅在同步隊列為獨佔模式才可以,所以變數設計為共用
主要方法
構造方法
Node()
用於建立初始節點或建構SHARED節點
Node(Thread thread, Node mode)
用於在addWaiter中使用
Node(Thread thread, int waitStatus)
在Condition中使用
成員方法
final boolean isShared()
return nextWaiter == SHARED
final Node predecessor()
取得前驅節點,為null時拋出異常
public class ConditionObject
簡介
Condition介面的實現,服務於基本的Lock實作。使用Node建構等待佇列,每個Condition物件都包含一個FIFO佇列(單向)
佇列使用Node的nextWaiter屬性作為指向下一個節點的引用
實現介面
Condition
Serializable
成員變數
靜態常數
private static final int REINTERRUPT = 1
表示從wait離開時需要重新中斷
private static final int THROW_IE = -1
表示從wait離開時需要拋出InterruptedException
private transient Node firstWaiter
指向條件隊列的頭結點
private transient Node lastWaiter
指向條件佇列的尾節點
主要方法
public final void await()
使呼叫的執行緒進入等待佇列、釋放鎖定並進入阻塞狀態,在節點被signal/signalAll喚醒後嘗試取得鎖定,如果阻塞被中斷喚醒則會回應中斷
private Node addConditionWaiter()
新增一個新的node到等待隊列。
具體實作邏輯:當發現lastWaiter被取消,則呼叫unlinkCancelledWaiters清除掉整個佇列中被取消的方法(firstWaiter、lastWaiter都可能指向新的node);之後new一個node(CONDITION狀態)並將其新增至佇列末尾
private void unlinkCancelledWaiters()
透過while從firstWaiter往後清除waitStatus不為CONDITION的節點。由於方法呼叫在釋放鎖之前,所以循環過程並不需要加鎖。為了避免在GC中殘留,在沒有signal時該方法只在逾時或取消發生時使用
條件佇列中的node的waitStatus應該只有CONDITION和CANCELLED兩種狀態
final int fullyRelease(Node node)
釋放鎖定狀態,如果釋放鎖定狀態失敗則會拋出IllegalMonitorStateException,並將node設為CANCELLED狀態,成功則傳回release之前的state值
release
這裡呼叫了AQS的release方法,找到後面的一個合適節點喚醒
參數使用的是AQS的state屬性
final boolean isOnSyncQueue(Node node)
判斷node是否在同步佇列中(注意這裡是同步佇列而非條件佇列):判斷waitStatus為CONDITION時說明不在同步佇列,node的pre或next為null也說明不在同步佇列(這兩個屬性用於同步佇列的前驅和後繼,條件隊列不用這兩個屬性),之後呼叫findNodeFromTail方法從尾節點遍歷同步隊列看node是否在其中
同步佇列與條件佇列的關係:鎖定的競爭只依賴同步佇列,條件佇列僅只將因為缺少條件而阻塞的執行緒保存起來,當執行緒需要競爭鎖定時依然需要轉換到同步佇列中進行
程式碼中用該方法作為while的判斷條件,如果node不在同步佇列中(說明並沒有被signal/signalAll),則會進入while內部透過park阻塞。當node被喚醒後會先判斷是否被中斷喚醒,如果不是則會繼續回到while過程,否則會嘗試將node加入同步隊列並break跳出while
acquireQueued(node, savedState)
不停地為同步佇列中的node節點嘗試取得savedState同步狀態
unlinkCancelledWaiters
取得到鎖定之後如果node.nextWaiter不為null的話呼叫該方法清除取消狀態的節點
reportInterruptAfterWait
如果之前是因為中斷被喚醒,則這裡需要根據先前判斷的結果對中斷進行處理,是重置中斷狀態還是拋出中斷異常
public final long awaitNanos(long nanosTimeout)
基本邏輯與await類似,加入了超時的判斷
transferAfterCancelledWait的後面部分不是很懂
public final boolean await(long time, TimeUnit unit)
基本邏輯與awaitNanos類似,可以指定時間單位,最後都是換算為奈秒計算
public final void awaitUninterruptibly()
基本邏輯與await類似,不回應中斷(即selfInterrupt重置中斷狀態)
public final boolean awaitUntil(Date deadline)
基本邏輯與await類似,加入了最大阻塞時間
public final void signal()
將等待最長的執行緒從條件隊列移動到同步隊列
protected boolean isHeldExclusively()
傳回目前執行緒是否以獨佔模式持有該鎖,是則回傳true。這個方法是AQS中的,並且沒有提供具體的實現,但是由於該方法只在condition中使用,所以如果不使用ConditionObject的話可以不用實現,否則需要實現
private void doSignal(Node first)
將條件佇列中的節點移除佇列並轉換為同步佇列的節點。實現過程是個循環,從前到後,直至遇到一個非null和不是CANCELLED狀態的節點,將其轉換後退出循環
final boolean transferForSignal(Node node)
將node從條件佇列轉換到同步佇列,並傳回是否成功轉換。實作邏輯為先判斷node狀態是否為CONDITION,如果不是說明是被cancel的節點,返回false;接著呼叫enq入同步隊列並取得node在隊列中的前驅節點,之後如果前驅節點的狀態>0或CAS修改waitStatus失敗(發生了變化),則unpark該線程,最後傳回true
加入同步佇列後不一定直接喚醒,只有waitStatus>0或是在CAS執行期間發生了變化,才會unpark喚醒執行緒。此時不喚醒的話會等到同步佇列邏輯中再喚醒
enq
public final void signalAll()
將條件佇列中所有符合條件(未被取消)的節點轉移到同步佇列。基本邏輯與signal相似,差別在於transferForSignal方法迴圈執行
isHeldExclusively
private void doSignalAll(Node first)
transferForSignal
protected final boolean hasWaiters()
是否有線程在條件佇列中,實作方式為從頭開始循環條件佇列,如果有節點狀態為CONDITION則傳回true,否則傳回false
成員變數
靜態變數
static final long spinForTimeoutThreshold = 1000L;
用於超時的閾值,如果小於這個值就沒有必要呼叫帶超時的park,而是讓循環繼續執行,這時的自旋效率更高
private static final Unsafe unsafe
Unsafe.getUnsafe(),下面的都是透過objectFieldOffset方法取得偏移量,方便後續直接透過Unsafe的CAS操作進行修改
private static final long stateOffset
AQS的state
private static final long headOffset
AQS的head
private static final long tailOffset
AQS的tail
private static final long waitStatusOffset
Node的waitStatus
private static final long nextOffset
Node的next
private transient volatile Node head
同步隊列的頭結點,懶加載,只透過setHead修改;頭節點存在時其狀態不能為CANCELLED
private transient volatile Node tail
同步佇列的尾節點,懶加載
private volatile int state
同步狀態
主要方法
protected final int getState()
傳回同步狀態的目前值
protected final void setState(int newState)
設定當前同步狀態
protected final boolean compareAndSetState(int expect, int update)
CAS設定state,底層呼叫Unsafe的CAS方法
private Node addWaiter(Node mode)
入列方法
用指定模式新增目前執行緒進入佇列末尾
值得注意的是
參數Node用於指定模式,線程透過currentThread獲取
實作中tail不為空時先快速嘗試一次cas設定tail,成功直接返回,失敗再呼叫enq方法
private Node enq(final Node node)
循環(自旋)CAS,tail為空時先CAS初始化head(新建node),tail不為空時CAS設定tail
這裡參數node為需要新增的節點而非模式
public final boolean hasQueuedPredecessors()
判斷是否有節點在目前執行緒之前,有的話回傳true,否則回傳false。用於公平鎖的實現。 具體實作:當佇列不為空時判斷head的後繼節點為空或不為當前執行緒則傳回true
什麼時候head.next 為null?
public final boolean hasQueuedThreads()
return head != tail; 佇列中是否有線程等待
public final boolean isQueued(Thread thread)
線程是否在佇列中,實作方式是從頭到尾到頭循環
public final int getQueueLength()
取得佇列長度,實作方式為從尾到頭迴圈計算thread不為null的節點數量
public final Collection<Thread> getQueuedThreads()
取得在佇列中的執行緒集合,以Collection的形式傳回。實作方式為建構ArrayList,從尾到頭循環將不為null的線程加進去,之後又回傳這個ArrayList對象
public final boolean owns(ConditionObject condition)
condition物件是否屬於目前同步器AQS
獨佔模式
同一時刻只有一個執行緒能獲取同步狀態
public final void acquire(int arg)
獨佔模式取得同步狀態,忽略中斷;先呼叫tryAcquire嘗試取得獨佔鎖,失敗後呼叫acquireQueued(addWaiter(Node.EXCLUSIVE), arg)嘗試將執行緒加入同步佇列和判斷節點狀態,然後根據傳回的結果(是否有過線程中斷)呼叫線程interrupt方法恢復中斷狀態
acquire本身不回應中斷,故對於中斷的處理是恢復中斷狀態
protected boolean tryAcquire(int arg)
嘗試取得獨佔鎖,成功回傳true,失敗回傳false,具體實作由子類別完成,需要保證取得同步狀態時的執行緒安全
final boolean acquireQueued(final Node node, int arg)
arg可以理解為需要競爭的資源,AQS內部用state來代表,但實際上具體的用途由開發者自行定義
為已經在佇列中的執行緒不間斷的嘗試取得鎖定(node已經由addWaiter建立)
實作邏輯:自旋如下過程:如果前驅節點是head並且取得鎖定成功,則設定目前節點為head,返回不沒有被中斷;否則進行後面的操作:呼叫shouldParkAfterFailedAcquire判斷和設定節點狀態以及呼叫parkAndCheckInterrupt阻塞執行緒和檢查中斷狀態。如果過程中出現異常,則呼叫cancelAcquire取消取得
實際上即使沒有獲取到鎖也不會不停的循環,當成功設定前驅節點的狀態並用park中斷線程後,線程就被掛起了; head後面的節點會使用tryAcquire與新執行緒競爭鎖,說明鎖在這裡的實作是非公平的
private void setHead(Node node)
將節點設為head,同時置thread和prev屬性為null方便GC。實際上相當於從隊列頭部出隊
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node)
檢查並更新取得鎖定失敗的節點的前驅節點狀態為SIGNAL,並傳回目前執行緒是否需要阻塞
實作邏輯:判斷前驅節點waitStatus,當為SIGNAL時回傳true;當>0時說明前驅節點已經取消,循環跳過前驅節點,直到某個前驅節點的waitStatus<=0時,之後回傳false;否則CAS設定前驅節點的waitStatus為SIGNAL,回傳false
private final boolean parkAndCheckInterrupt()
呼叫park,檢查中斷狀態並返回
return thread.interrupted() 這裡回傳的結果用來判斷執行緒被喚醒是否是因為中斷,如果因為中斷被喚醒,在acquireQueued中依然在迴圈中,會再次被park阻塞(由於interrupted()方法會清除中斷標誌,所以再次park可以生效);當被unpark喚醒時會獲取到鎖定;偽喚醒時透過外層循環再次檢查即可
偽喚醒的原因?
private void cancelAcquire(Node node)
在整個方法拋出異常時調用,將線程從Node中置空,狀態置CANCELLED,向前找到waitStatus<=0的節點作為前驅;把自己從隊列中去除,在需要的情況下需要喚醒後面的某個合適的節點
private void unparkSuccessor(Node node)
需要注意的是,如果node的next不為空,則將其unpark;如果node的next為null,則從tail出發從後往前找到不為node的最前面的一個waitStatus<=0的節點將其unpark
之所以從後往前找是因為雙向鍊錶設定前驅和後繼並不是原子操作,可能出現某個節點的next為空但是已經在隊列裡的情況,又或者是節點剛被取消,其next指向了自己,而遍歷也剛好來到該節點
public final void acquireInterruptibly(int arg)
回應中斷的獨佔模式取得同步狀態,會在tryAcqure前先用Thread.interrupted()判斷是否中斷,是的話拋出異常
tryAcquire
先嘗試一次
private void doAcquireInterruptibly(int arg)
基本邏輯與acquireQueued相同,差別在於內部呼叫addWaiter以及不回傳是否有執行緒中斷而是直接在檢查到由中斷喚醒park後拋出InterruptedException異常
cancelAcquire
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
具有逾時時間的獨佔式獲取同步狀態,回應中斷。如果沒有在逾時時間內取得同步狀態則回傳false
tryAcquire
先嘗試一次
private boolean doAcquireNanos(int arg, long nanosTimeout)
基本邏輯與doAcquireInterruptibly相同,增加了時間的判斷邏輯,另外使用了spinForTimeoutThreshold閾值
cancelAcquire
public final boolean release(int arg)
獨佔模式釋放同步狀態。首先呼叫tryRelease嘗試修改state,成功之後判斷head != null且head.waitStatus!=0(這裡head的waitStatus為0則表示空隊列)時呼叫unparkSuccessor喚醒後面的節點,然後回傳true;tryRelease失敗回傳false
protected boolean tryRelease(int arg)
嘗試修改state,AQS中沒有具體實現,需要子類別自行實現。 release方法的arg變數用於此方法;傳回值為是否成功釋放state
arg的說明請參閱下方實作部分
unparkSuccessor
共享模式
同一時刻有多個執行緒取得同步狀態
public final void acquireShared(int arg)
共享模式取得同步狀態,忽略中斷。實作:先呼叫tryAcquireShared嘗試取得同步狀態,失敗(結果小於0)後呼叫doAcquireShared自旋取得同步狀態
protected int tryAcquireShared(int arg)
嘗試取得同步狀態,AQS中不提供具體方法,由子類別實作。返回值小於0表示獲取失敗;等於0表示獲取成功,但隨後沒有其他共享模式獲取成功;大於0表示獲取成功並且有其他的共享模式獲取也可能成功
private void doAcquireShared(int arg)
入隊後自旋嘗試取得同步狀態,基本邏輯與獨佔模式下的acquireQueued基本上相同,差異在於當節點的前驅為head時,需要對tryAcquireShared回傳的結果進行判斷,如果大於等於0(說明依然有資源,可以繼續傳播),則呼叫setHeadAndPropagate設定head以及傳播屬性(設定新的head並判斷後繼節點,如果適當就喚醒);否則依然需要呼叫shouldParkAfterFailedAcquire和parkAndCheckInterrupt進行後續操作(park以及後續中斷判斷等)。另外中斷狀態的設定selfTnterrupt方法也放在該方法中執行
tryAcquireShared
節點的前驅為head時先嘗試一遍
private void setHeadAndPropagate(Node node, int propagate)
傳播的體現。此方法實際檢查的是目前取得到鎖的節點的後繼節點,即可以向後傳播一次。開始的時候我在懷疑為什麼沒有找到類似於循環中不停向後喚醒的程式碼,後來發現setHeadAndPropagate本身就在for(;;)中,每個被喚醒的節點如果獲取到了鎖都會執行該方法,這樣就達到一個一個向後傳播的目的。注意不是一起被喚醒然後競爭,還是一個一個被喚醒的。
繼續向後喚醒節點。 此方法在tryAcquireShared傳回結果r>=0(說明還有資源可用)時才會調用,r作為propagate參數傳入方法。此方法會將node設為head,之後判斷是否可以向後傳遞,如果可以則呼叫doReleaseShared喚醒後繼節點
判斷邏輯: propagate>0(說明還有許可) 或 之前的head == null 或 之前的head.waitStatus < 0 或 現在的head(也就是node) == null 或 現在的head.waitStatus < 0 然後進入下一步: node.next == null 或 node.next.isShared()
其實並不是很明白: 1.為什麼需要判斷現在的head((h = head) == null) 2.為什麼node.next == null後面還是會doReleaseShared
1的推測:因為判斷過程中head可能被其他線程修改,所以如果新的head依然滿足這個條件那麼還是可以繼續;感覺這裡的判斷邏輯更像是不知道為什麼head為null,但還是嘗試一下,一種保守的處理策略,那2的話可能也是類似的保守策略
只判斷waitStatus<0是因為head有可能狀態改變為了SIGNAL,而CONDITION並不會出現在這裡
private void doReleaseShared()
自旋喚醒後繼節點,與獨立模式不同點在於也要保證傳播的性質 實現:循環:當佇列不為空時,判斷head的waitStatus,等於SIGNAL時先CAS設定waitStatus為0,成功則調unparkSuccessor(head)喚醒後繼節點,失敗則從頭再來;waitStatus為0且CAS設定waitStatus為PROPAGATE失敗時也重頭再來;之後判斷head==h看頭結點是否有修改,如果有修改則退出循環
注意方法沒有傳參,直接從head開始
推測:共享模式node可能只有0(剛創建的)、PROPAGATE和SIGNAL三種狀態
unparkSuccessor
cancelAcquire
public final void acquireSharedInterruptibly(int arg)
共享模式取得同步狀態,支援中斷,支援的方式與獨佔模式類似
tryAcquireShared
private void doAcquireSharedInterruptibly(int arg)
與doAcquireShared邏輯類似,在檢查中斷後拋出異常
cancelAcquire
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
具有超時時間的共享模式取得同步狀態,支援中斷。
tryAcquireShared
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
逾時判斷邏輯與獨佔模式類似,其他執行邏輯與doAcquireShared類似
cancelAcquire
public final boolean releaseShared(int arg)
共享模式釋放同步狀態
protected boolean tryReleaseShared(int arg)
空方法,由子類別實現,嘗試釋放同步狀態。 arg是releaseShared的參數,可以自行定義;傳回值為當這個方法成功釋放了同步狀態,其他執行緒可以acquire時回傳true,否則回傳false
doReleaseShared
這裡依然只是喚醒後續一個節點,後續喚醒的傳播由喚醒的這個節點進行
如何基於AQS實現自己的鎖
競爭的資源
AQS用私有變數state抽像地代表鎖競爭的資源(例如1表示佔用,0表示未被佔用)
對state的操作需要使用相關的方法:getState、setState、compareAndSetState等
按需實現的方法
tryAcquire、tryAcquireShared、tryRelease、tryReleaseShared、isHeldExclusively
AQS中的各種方法的參數arg最終都是傳入這些方法的,故這些方法決定了是否需要arg參數以及怎麼用,這是源代碼註釋中“can represent anything”的含義
排他鎖(獨立模式)
protected boolean tryAcquire(int arg);
實作查詢並取得競態資源,通常由state的CAS操作完成,傳入參數可由開發者自行定義
簡單範例: protected boolean tryAcquire(int acquires) { assert acquires == 1; // 定義傳入參數只能是1 if (compareAndSetState(0, 1)) { return true; } return false; } protected boolean tryRelease(int releases) { assert releases == 1; // 定義傳入參數只能是1 if (getState() == 0) throw new IllegalMonitorStateException(); setState(0); return true; }
protected boolean tryRelease(int arg);
實現釋放競爭資源,通常大概是state變數自減或清零,傳入參數可由開發者自行定義
方法內部不應該出現阻塞和等待的情況
共享鎖
protected int tryAcquireShared(int acquires)
傳入參數由開發者自行定義,可視為想要取得的資源個數;傳回值int可視為剩餘的共享資源個數
簡單範例: protected int tryAcquireShared(int acquires) {//non-fair for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } protected boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current releases; if (compareAndSetState(current, next)) return true; } }
protected boolean tryReleaseShared(int releases)
傳入參數由開發者自行定義,可視為想要釋放的資源個數
條件變數
雖然AQS的實作邏輯沒有用到state,但在使用條件變數時會預設將state當作release(int arg)的傳入參數,故在實作這些的方法時往往需要修改state變數
protected boolean isHeldExclusively()
傳回目前執行緒是否獨佔了條件變數對應的互斥鎖
常見的實作方式: protected boolean isHeldExclusively() { return getExclusiveOwnerThread() == Thread.currentThread(); } protected boolean tryAcquire(int acquires) { ..... setExclusiveOwnerThread(Thread.currentThread()); return true; }
ReentrantReadWriteLock
ReentrantLock
簡介
可重入鎖,分為公平鎖與非公平鎖兩種方式
實現介面
Lock
Serializable
內部類別
abstract static class Sync
抽象類,是ReetrantLock的實作基礎
繼承
AbstractQueuedSynchronizer
主要方法
abstract void lock()
取得鎖,抽象方法嗎,需要子類別實現
final boolean nonfairTryAcquire(int acquires)
非公平的tryLock實現 具體實作為:當前沒有執行緒取得鎖時CAS取得鎖,成功就設定鎖獨佔執行緒為目前執行緒;如果當前執行緒已經持有了鎖,則更新目前鎖的數量。這兩種情況會回傳true,表示持有鎖,其他情況都會回傳false
那為什麼一個非公平的實作會放在父類別Sync中?因為方法公平鎖和非公平鎖都會用到,二者的tryLock都是基於這個實現
protected final boolean tryRelease(int releases)
AQS中tryRelease的實現,具體為先計算鎖總數減去釋放鎖數量得到的剩餘持有鎖的數量c,如果c為0則清空狀態;設定state為c的值,返回是否沒有鎖被持有
protected final boolean isHeldExclusively()
AQS中isHeldExclusively實現,return getExclusiveOwnerThread() == Thread.currentThread()
final int getHoldCount()
傳回目前執行緒持有了多少鎖。此方法依賴isHeldExclusively的判斷
final boolean isLocked()
return getState() != 0; 是否有執行緒持有鎖
final Thread getOwner()
傳回持有鎖的線程,沒有時回傳null
final ConditionObject newCondition()
new ConditionObject()
private void readObject(java.io.ObjectInputStream s)
反序列化法,state置為0
static final class FairSync
公平鎖實作類
繼承
Sync
主要方法
final void lock()
acquire(1)
這裡的參數1是有實際意義的,與CountDownLatch可不一樣
protected final boolean tryAcquire(int acquires)
公平鎖的tryAcquire實作。實作過程為:佔用鎖c為0時,如果前面沒有執行緒且cas設定c為acquires成功,則設定目前執行緒為獨佔執行緒並傳回true;如果目前執行緒已經持有鎖,則更新鎖定數量並傳回true;其他情況均回傳false
hasQueuedPredecessors
這裡比非公平鎖多了這個判斷,即使當前沒有線程獲取鎖,如果有等待時間更長的線程也會放棄這次獲取鎖
static final class NonfairSync
非公平鎖實作類
繼承
Sync
主要方法
final void lock()
先嘗試直接cas(0,1),如果成功說明當前正好沒有別人獲取鎖,當前線程成功搶到鎖,之後設定獨佔線程為當前線程;如果失敗再acquire(1)
實際上非公平鎖定的tryAcquire方法中也會嘗試cas,這裡直接cas可能是為了更加突出不公平性吧,更早的呼叫則可能會更早的取得到資源
protected final boolean tryAcquire(int acquires)
return nonfairTryAcquire(acquires) 非公平的實作已經在Sync中存在了,直接調用
成員變數
private final Sync sync
表示鎖定使用公平或非公平實現,實例化後對應FairSync或NonfairSync
建構函數
public ReentrantLock()
預設構造非公平鎖
public ReentrantLock(boolean fair)
根據fair選擇構造公平鎖或非公平鎖
主要方法
public void lock()
sync.lock()
public void lockInterruptibly()
sync.acquireInterruptibly(1)
public boolean tryLock()
return sync.nonfairTryAcquire(1);如果成功取得鎖或目前執行緒已經持有鎖,則傳回true,其他情況回傳false
這裡無論是什麼鎖,這裡都是非公平的嘗試取得鎖
public boolean tryLock(long timeout, TimeUnit unit)
return sync.tryAcquireNanos(1, unit.toNanos(timeout))
tryAcquireNano呼叫了tryAcquire,故這裡是有公平和不公平的區別的
public void unlock()
sync.release(1)
public Condition newCondition()
return sync.newCondition()
public int getHoldCount()
return sync.getHoldCount() 目前執行緒持有的鎖數量
public boolean isHeldByCurrentThread()
return sync.isHeldExclusively() 目前執行緒是否持有鎖
public boolean isLocked()
return sync.isLocked()
public final boolean isFair()
return sync instanceof FairSync
protected Thread getOwner()
return sync.getOwner()
public final boolean hasQueuedThreads()
return sync.hasQueuedThreads() 是否有執行緒在佇列中
public final boolean hasQueuedThread(Thread thread)
return sync.isQueued(thread); 目前執行緒是否在佇列中
public final int getQueueLength()
return sync.getQueueLength();
protected Collection<Thread> getQueuedThreads()
return sync.getQueuedThreads();
public boolean hasWaiters(Condition condition)
是否有執行緒在給定condition上等待(對應條件佇列上是否有CONDITION狀態的執行緒)
public int getWaitQueueLength(Condition condition)
給定condition的條件佇列中CONDITION狀態的執行緒數量
protected Collection<Thread> getWaitingThreads(Condition condition)
Condition
簡介
介面;條件變數主要用來管理執行緒執行對某些狀態的依賴,將Object的monitor方法(wait/notify/notifyAll)對應到直接的物件上,透過將他們與任意Lock實現的使用相結合來實現每個物件擁有多個等待集的效果。相當於Lock取代了synchronize,Condition取代了monitor方法,即Condition需要與Lock搭配使用
Condition與monitor方法的主要差異有: 1. Condition支援多個等待隊列,一個Lock實例可以綁定多個Condition 2. Condition可以支援回應中斷和超時設置
綁定多個Condition的深層意義就是可以針對不同的情況對執行緒進行管理,所謂的「更精細」的控制,因為不同條件而阻塞的執行緒在不同的條件佇列中,在需要被喚醒的時候才會進入同步隊列進行競爭,本身就做了區分與隔離
理解
與一般的鎖競爭的差別在於condition強調的是條件,多執行緒環境下需要滿足條件才可以執行某些操作;而一般的鎖競爭則只是相當於競爭執行某段程式碼的權利
通常的程式設計模型: 取得鎖; while (條件狀態不滿足) { 釋放鎖定; 執行緒掛起等待,直到條件滿足通知; 重新取得鎖; } 臨界區操作; 釋放鎖定;
三元關係:鎖、wait、條件斷言(predicate)。每一次wait呼叫都會隱式地與特定條件斷言關聯起來;當呼叫某個特定條件斷言的wait時,呼叫者必須已經持有該斷言相關的條件佇列的鎖,且這個鎖必須保護著構成條件斷言的狀態變數
predicate:一個傳回bool類型的函數
創建
透過Lock介面的newCondition方法創建,需要實作的類別自行實作邏輯(本質上都是創建ConditionObject的實例)
主要方法
void await()
讓執行緒等待,直到被signal或interrupt喚醒;呼叫時會自動釋放condition關聯的鎖定,執行緒會停止執行目前任務,直到被signal/signalAll/interrupt/spurious wakeup喚醒。在呼叫await的方法結束之前執行緒需要重新嘗試取得condition關聯的鎖
boolean await(long time, TimeUnit unit)
與awaitNanos類似,可以指定時間的單位,時間耗完時回傳false,否則回傳true
long awaitNanos(long nanosTimeout)
與await類似,但最遲會在指定時間後醒來,返回沒有用到的時間
void awaitUninterruptibly();
與await類似,但不會回應中斷
boolean awaitUntil(Date deadline)
與awaitNanos類似,指定時間的方式不同,如果deadline已經達到則回傳false,否則回傳true
void signal()
喚醒一個等待狀態的線程,線程喚醒後需要再次取得鎖定
void signalAll()
喚醒所有等待狀態的執行緒
Lock
簡介
接口,提供了通用的一些請求/釋放鎖的方法
與synchronized的對比
synchronized的相關機制(例如monitor物件等)是內建的,在語言層面無法訪問,而Lock是語言層面用來替代的方案,故可以獲得一些語言層面的便利(例如知道是否成功獲取到鎖等等)
synchronized的加鎖和解鎖過程是由關鍵字自行完成,而Lock需要明確呼叫加鎖和解鎖方法
Lock可以使用條件變數來更靈活的使用鎖
Lock可以回應中斷,而synchronized不會
主要方法
void lock()
void lockInterruptibly()
請求鎖過程中會回應中斷
boolean tryLock()
嘗試取得鎖並直接傳回結果(並不會阻塞當前執行緒)
boolean tryLock(long time, TimeUnit unit)
具有超時和回應中斷的tryLock
void unlock();
Condition newCondition()
傳回一個與該鎖相關聯的Condition對象
LockSupport
簡介
用來建立鎖定和其他同步類別的基本執行緒阻塞原語(與許可(permit)相關)
底層依賴Unsafe實現
構造方法
私有,不允許建構該類別的對象
成員變數
private static final sun.misc.Unsafe UNSAFE
在static程式碼區塊中構建
sun.misc.Unsafe
private static final long parkBlockerOffset
透過Unsafe取得Thread類別物件中parkBlocker屬性的偏移位置
parkBlocker是線程被阻塞時的記錄,方便監視和診斷工具識別線程被阻塞的原因;線程未被阻塞時為null
private static final long SEED
透過Unsafe取得Thread類別中threadLocalRandomSeed屬性的偏移位置
private static final long PROBE
透過Unsafe取得Thread類別中threadLocalRandomProbe屬性的偏移位置
private static final long SECONDARY
透過Unsafe取得Thread類別中threadLocalRandomSecondarySeed屬性的偏移位置
這三個屬性由ThreadLocalRandom來進行設定
主要方法
public static Object getBlocker(Thread t)
透過Unsafe取得Thread物件中的parkBlocker對象
private static void setBlocker(Thread t, Object arg)
私有方法,透過Unsafe設定Thread中的parkBlocker對象,下面的設定blocker的方法都會呼叫該方法
public static void park()
阻塞線程,等待“許可”(permit)後繼續執行
具體實現(Hotspot):Parker類
具体的实现与平台相关,每个线程都有一个Parker实例,在linux实现中实际上是以mutex和condition最终实现了锁和阻塞的过程(具体看笔记文章,这里不写了)
主要成员变量(linux实现)
private volatile int _counter
表示许可,大于0表示已获取许可,每次park只会将其设置为1,不会递增
故調用兩次unpark後調用兩次park仍會在第二次時阻塞
protected pthread_mutex_t _mutex[1]
互斥变量
在代码中可以通过linux原子指令pthread_mutex_lock、pthread_mutex_trylock、pthread_mutex_unlock对变量进行加锁和解锁操作
protected pthread_cond_t _cond[1]
条件变量
利用线程间共享的全局变量进行同步的一种机制,一般和互斥锁结合使用(避免条件变量的竞争);可以通过pthread_cond_wait、pthread_cond_timedwait等指令进行操作
linux核心中同樣有等待佇列來管理阻塞進程
方法
park
unpark
與wait在使用上的區別
wait需要等待notify才可以喚醒,存在先後過程,而park等待的unpark可以先於park調用,此時park之後的程式可以繼續執行
wait需要取得物件的監視器,而park不需要
注意
park中發生中斷,則park也會回傳但不會拋出中斷異常
public static void park(Object blocker)
由於parkBlocker的作用,因此帶有blocker參數的park方法更建議使用,使用後在透過jstack等診斷工具時可以看到線程阻塞在哪個物件上的提示,例如parking to wait for <0x000000045ed12298> (a xxxx object全限定名)
public static void parkNanos(Object blocker, long nanos)
public static void parkNanos(long nanos)
最多阻塞多少時間後程式繼續執行
public static void parkUntil(Object blocker, long deadline)
public static void parkUntil(long deadline)
public static void unpark(Thread thread)