java.util.concurrent和工具類
三、Semaphor信號量
信號量可以干什么呢?根據一些閥值做訪問控制。我們這里模擬一個當多個線程并發一段代碼的時候,如何控制其訪問速度
import java.util.Random; ? import java.util.concurrent.Semaphore; ?
??public class SemaphoreTest { ? ??????private final static Semaphore MAX_SEMA_PHORE = new Semaphore(10); ? ??????public static void main(String []args) { ? ???????????for(int i = 0 ; i < 100 ; i++) { ? ????????????????final int num = i; ? ????????????????final Random radom = new Random(); ? ????????????????new Thread() { ? ?????????????????????public void run() { ? ?????????????????????????boolean acquired = false; ? ?????????????????????????try { ? ??????????????????????????????MAX_SEMA_PHORE.acquire(); ? ??????????????????????????????acquired = true; ? ??????????????????????????????System.out.println("我是線程:" + num + " 我獲得了使用權!" + DateTimeUtil.getDateTime()); ? ??????????????????????????????long time = 1000 * Math.max(1, Math.abs(radom.nextInt() % 10)); ? ??????????????????????????????Thread.sleep(time); ? ??????????????????????????????System.out.println("我是線程:" + num + " 我執行完了!" + DateTimeUtil.getDateTime()); ? ?????????????????????????}catch(Exception e) { ? ??????????????????????????????e.printStackTrace(); ? ?????????????????????????}finally { ? ??????????????????????????????if(acquired) { ? ?????????????????????????????????MAX_SEMA_PHORE.release(); ? ??????????????????????????????} ? ?????????????????????????} ? ??????????????????????} ? ????????????????}.start(); ? ???????????} ? ??????} ? ??} |
上述是簡單模擬并發100個線程去訪問一段程序,此時要控制最多同時運行的是10個,用到了這個信號量,運行程序用了一個線程睡眠一個隨機的時間來代替,你可以看到后面有線程說自己釋放了,就有線程獲得了,沒釋放是獲取不到的
四、Exchanger線程交互
用于線程之間交互數據,且在并發時候使用,兩兩交換,交換中不會因為線程多而混亂,發送出去沒接收到會一直等,由交互器完成交互過程
import java.util.concurrent.Exchanger; ?
public class ExchangerTest { ?
????public static void main(String []args) { ? ????????final Exchanger <Integer>exchanger = new Exchanger<Integer>(); ? ????????for(int i = 0 ; i < 10 ; i++) { ? ????????????final Integer num = i; ? ????????????new Thread() { ? ????????????????public void run() { ? ????????????????????System.out.println("我是線程:Thread_" + this.getName() + "我的數據是:" + num); ? ????????????????????try { ? ????????????????????????Integer exchangeNum = exchanger.exchange(num); ? ????????????????????????Thread.sleep(1000); ? ????????????????????????System.out.println("我是線程:Thread_" + this.getName() + "我原先的數據為:" + num + " , 交換后的數據為:" + exchangeNum); ? ????????????????????} catch (InterruptedException e) { ? ????????????????????????e.printStackTrace(); ? ????????????????????} ? ????????????????} ? ????????????}.start(); ? ????????} ? ????} ? } ? |
這里運行你可以看到,如果某個線程和另一個線程傳送了數據,它接受到的數據必然是另一個線程傳遞給他的,中間步驟由Exchanger去控制
五、CyclicBarrier關卡模式
當你在很多環節需要卡住,要多個線程同時在這里都達到后,再向下走,很有用
假如,團隊出去旅行,大家一起先達到酒店住宿,然后一起達到游樂的地方游玩,然后一起坐車回家,每次需要點名后確認相關人員均達到,然后LZ一聲令下,觸發,大伙就瘋子般的出發了
import java.util.concurrent.BrokenBarrierException; ? import java.util.concurrent.CyclicBarrier; ?
public class BarrierTest { ?
????private static final int THREAD_COUNT = 10; ?
????private final static CyclicBarrier CYCLIC_BARRIER = new CyclicBarrier(THREAD_COUNT ?, ? ????????new Runnable() { ? ????????????public void run() { ? ????????????????System.out.println("======>我是導游,本次點名結束,準備走下一個環節!"); ? ????????????} ? ????????} ? ????); ?
????public static void main(String []args) ?? ????????????throws InterruptedException, BrokenBarrierException { ? ????????for(int i = 0 ; i < 10 ; i++) { ? ????????????new Thread(String.valueOf(i)) { ? ????????????????public void run() { ? ????????????????????try { ? ????????????????????????System.out.println("我是線程:" + this.getName() + " 我們達到旅游地點!"); ? ????????????????????????CYCLIC_BARRIER.await(); ? ????????????????????????System.out.println("我是線程:" + this.getName() + " 我開始騎車!"); ? ????????????????????????CYCLIC_BARRIER.await(); ? ????????????????????????System.out.println("我是線程:" + this.getName() + " 我們開始爬山!"); ? ????????????????????????CYCLIC_BARRIER.await(); ? ????????????????????????System.out.println("我是線程:" + this.getName() + " 我們回賓館休息!"); ? ????????????????????????CYCLIC_BARRIER.await(); ? ????????????????????????System.out.println("我是線程:" + this.getName() + " 我們開始乘車回家!"); ? ????????????????????????CYCLIC_BARRIER.await(); ? ????????????????????????System.out.println("我是線程:" + this.getName() + " 我們到家了!"); ? ????????????????????} catch (InterruptedException e) { ? ????????????????????????e.printStackTrace(); ? ????????????????????} catch (BrokenBarrierException e) { ? ????????????????????????e.printStackTrace(); ? ????????????????????} ? ????????????????} ? ????????????}.start(); ? ????????} ? ????} ? } |
測試結果中可以發現,大家一起走到某個步驟后,導游說:“我是導游,本次點名結束,準備走下一個環節!”,然后才會進入下一個步驟,OK,這個有點意思吧,其實賽馬也是這個道理,只是賽馬通常只有一個步驟,所以我們還有一個方式是:
六、CountDownLatch計數器
CountDownLatch的方式來完成賽馬操作,CountDownLatch是用計數器來做的,所以它不可以被復用,如果要多次使用,就要從新new一個出來才可以。我們下面的代碼中,用兩組賽馬,每組5個參與者來,做一個簡單測試
import java.util.concurrent.CountDownLatch; ?
public class CountDownLatchTest { ?
????private final static int GROUP_SIZE = 5; ?
????public static void main(String []args) { ? ????????processOneGroup("分組1"); ? ????????processOneGroup("分組2"); ? ????} ?
????private static void processOneGroup(final String groupName) { ? ????????final CountDownLatch start_count_down = new CountDownLatch(1); ? ????????final CountDownLatch end_count_down = new CountDownLatch(GROUP_SIZE); ? ????????System.out.println("==========================>\n分組:" + groupName + "比賽開始:"); ? ????????for(int i = 0 ; i < GROUP_SIZE ; i++) { ? ????????????new Thread(String.valueOf(i)) { ? ????????????????public void run() { ? ????????????????????System.out.println("我是線程組:【" + groupName + "】,第:" + this.getName() + " 號線程,我已經準備就緒!"); ? ????????????????????try { ? ????????????????????????start_count_down.await();//等待開始指令發出即:start_count_down.countDown(); ? ????????????????????} catch (InterruptedException e) { ? ????????????????????????e.printStackTrace(); ? ????????????????????} ? ????????????????????System.out.println("我是線程組:【" + groupName + "】,第:" + this.getName() + " 號線程,我已執行完成!"); ? ????????????????????end_count_down.countDown(); ? ????????????????} ? ????????????}.start(); ? ????????} ? ????????try { ? ????????????Thread.sleep(1000); ? ????????} catch (InterruptedException e) { ? ????????????e.printStackTrace(); ? ????????} ? ????????System.out.println("各就各位,預備!"); ? ????????start_count_down.countDown();//開始賽跑 ? ????????try { ? ????????????end_count_down.await();//等待多個賽跑者逐個結束 ? ????????} catch (InterruptedException e) { ? ????????????e.printStackTrace(); ? ????????} ? ????????System.out.println("分組:" + groupName + "比賽結束!"); ? ????} ? } |
本教程由尚硅谷教育大數據研究院出品,如需轉載請注明來源,歡迎大家關注尚硅谷公眾號(atguigu)了解更多。