RELATEED CONSULTING
相关咨询
选择下列产品马上在线沟通
服务时间:8:30-17:00
你可能遇到了下面的问题
关闭右侧工具栏

新闻中心

这里有您想知道的互联网营销解决方案
day1javaconcurrent包一些知识点

Concurrent:

创新互联建站主要从事网站建设、网站设计、网页设计、企业做网站、公司建网站等业务。立足成都服务鹤庆,10年网站建设经验,价格优惠、服务专业,欢迎来电咨询建站服务:13518219792

1.  BlockingQueue( 阻塞队列 )

ArrayBlockingQueue( 指定容量,不可变 ),LinkeBlocingQueue (指定容量不可变,也可以不指定容量,默认 Integer.Max_value )

PriorityBlockingQueue ( 根据实现的接口自定义排序,只有在逐个拿取的时候才有序 )

SynchronousQueue (长度以 1 只能为 1 )  

  day1 java concurrent包一些知识点

 

2.  ConcurrentMap

ConcurrentHashMap  

   1.5 分桶加锁,1.8 CAS+红黑树来保证线程安全

ConcurrentNavigableMap (针对有序列表,有 map.headMap map.subMap map.tailMap ,   返回有序的在取出范围的 map )

3.  CountDownLatch   闭锁

CountDownLatch 以一个给定的数量初始化。 countDown() 每被调用一次,这一数 量就减一。通过调用 await() 方法之一,线程可以阻塞等待这一数量到达零。

等待一定数量的线程完成。来执行其后的程序

4.  CyclicBarrier   栅栏

它能够对处理一些算法的线程实现同
步。换句话讲,它就是一个所有线程必须等待的一个栅栏,直到所有线程都到达这 里,然后所有线程才可以继续做其他事情  

5.  Exchanger 交换机  

类表示一种两个线程可以进行互相交换对象的会和点   ,只能两个线程之间交换数据

  day1 java concurrent包一些知识点

 

 

6.  Semaphore   信号量

l acquire()
l release()
计数信号量由一个指定数量的 " 许可 " 初始化。每调用一次 acquire() ,一个许可会 被调用线程取走。每调用一次 release() ,一个许可会被返还给信号量。因此,在没 有任何 release() 调用时,最多有 N 个线程能够通过 acquire() 方法, N 是该信 号量初始化时的许可的指定数量。

线程池

7.  ExecutorService

(executors.newFixedThreadPool ,长任务场景,只有核心线程,没有临时线程,容纳无限多

,newCachedThreadPool (高并发短任务场景,没有核心线程,全部都是临时线程,处理任意多的线程)

,newSingleThreadPool

,newSchedulerThreadPool (有核心线程,有临时线程)

)

提交线程的方法

execute() 提交线程 没有返回值submit(Runnable) 提交线程,返Future 可以通过Future.get()得到该线程的状态但是如果该线程未执行完成,那么该方法阻塞

submit(Callable) 同上面类似,但是线程可以带有返回值

invokeAny(.....),随机选择线程执行一个

invokeAll(),自动执行所有的线程

关闭线程池: ExecutorService.shutdown();该方法不再接受线程池,等待所有线程执行完毕后,线程池结束。

ExecutorService.shutdown();立即关闭,退出任务,正在执行的线程可能会出错。

##Callable只能用线程池提交

Callable runnable :

1.  返回值

2.  异常, runnabel没有容错机制,callable有容错机制,可以将 异常抛给上层处理

3.  Callable只能通过submit方法提交,runnable可以new 也可 以通过线程池提交

8.  ReadWriteLock 读写锁 ,可以是公平,也可以是非公平的。该锁可以跨方法

    读锁可以共享,写锁互斥

    读锁 readLock().lock();

    写锁 writeLock().lock();  




package hgs.test;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class Test {
	public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
		//ConcurrentMap
		//BlockingQueue bq = new ArrayBlockingQueue(4);
		
		//add(超过长度会报错) remove(没有元素会报错)
		/*
		bq.add("1");
		bq.add("1");
		bq.add("1");
		bq.add("1");
		bq.remove();
		bq.remove();
		bq.remove();
		bq.remove();
		*/
		
		//offer(如果可以插入返回true 否则 false) poll(如果没有元素返回null)
		
		//bq.offer("1");
		//bq.offer("1");
		//bq.offer("1");
		//bq.offer("1");
		//bq.offer("1");
		//String flag = bq.poll();
		//System.out.println(flag);
		
		//put take 阻塞式
		//bq.put("1");
		//bq.put("1");
		//bq.put("1");
		//bq.put("1");
		//bq.put("1");
		//bq.put("1");
		//bq.take();
		
		//offer(o,timeout,timeunit),poll(timeout,tameunit)  等待timeout时间,然后跳过
		
		//bq.poll(10, TimeUnit.SECONDS);
		
		
		
		//bq.element();//检查是否为空,是的话跑出异常
		//bq.peek();//阻塞
		
/*		ConcurrentNavigableMap m = new ConcurrentSkipListMap() ;
		m.put(1, "1");
		m.put(2, "2");
		m.put(5, "5");
		m.put(4, "4");
		m.put(6, "6");
		m.put(5, "5");
		
		System.out.println("head(\"5\")"+m.headMap(4));
		System.out.println();
		System.out.println();*/
		/*Boy b1 = new Boy("b1",24);
		Boy b2 = new Boy("b2",23);
		Boy b3 = new Boy("b3",26);
		Boy b4 = new Boy("b4",19);
		PriorityBlockingQueue< Boy> pbq = new PriorityBlockingQueue(100);
		pbq.add(b1);
		pbq.add(b2);
		pbq.add(b3);
		pbq.add(b4);
		for(int i =0 ;i<4;i++) {
			System.out.println(pbq.take().toString());
		}		*/
		
		//闭锁
		/*CountDownLatch cdl = new CountDownLatch(4);
		for(int i = 0; i<4; i++) {
			new Thread(new BoyRun(cdl)).start();
		}
		cdl.await();
		System.out.println("全部到达。。。");*/
		
		//栅栏
		/*CyclicBarrier cb = new CyclicBarrier(4);
		for(int i = 0; i<4; i++) {
			new Thread(new GirlRan(cb)).start();
		}
		cb.await();
		System.out.println("all comming.");*/
		
		//exchanger交换器
		
		/*Exchanger ex = new Exchanger();
		ExchangerTest e1 = new ExchangerTest(ex);
		ExchangerTest e2 = new ExchangerTest(ex);
		new Thread(e1).start();
		new Thread(e2).start();	*/
		
		//6.Semaphore 信号量
		
/*		Semaphore s = new Semaphore(5);
		for(int i = 0 ;i<9;i++) {
			new SemaphoreTest(s).start();
		}*/
		
		
		//原始创建线程池,executors.newCacheThreadPool 的底层调用该方法
/*		ExecutorService es =  new 
				ThreadPoolExecutor(5, 10, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10),
						new RejectedExecutionHandler() {
							
							@Override
							public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
							System.out.println("full......");
								
							}
						});
		for(int i = 0 ;i<24;i++) {
			es.execute(new ThreadPoolTest());
		}
		es.shutdown();*/
		
		
		
		//可重入锁 ReentrantLock  可重入读写锁ReentrantReadWriteLock
/*		
		ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
		for (int i = 0 ;i<3;i++) {
			new ReadLockTest( rwLock ).start();
		}*/
		
		
		ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
		for (int i = 0 ;i<3;i++) {
			new WriteLockTest( rwLock ).start();
		}
		 
		
	}
		
		
		
	}
class WriteLockTest extends Thread{
	ReentrantReadWriteLock rwLock ;
	public WriteLockTest(ReentrantReadWriteLock rwLock ) {
		this.rwLock = rwLock;
	}
	@Override
	public void run() {
		rwLock.writeLock().lock();
		System.out.println("reading......");
		try {
			Thread.sleep((long)(Math.random()*2000));
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		
		System.out.println("done......");
		rwLock.writeLock().unlock();
	}
	
}
class ReadLockTest extends Thread{
	ReentrantReadWriteLock rwLock ;
	public ReadLockTest(ReentrantReadWriteLock rwLock ) {
		this.rwLock = rwLock;
	}
	@Override
	public void run() {
		rwLock.readLock().lock();
		System.out.println("reading......");
		try {
			Thread.sleep((long)(Math.random()*2000));
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		
		System.out.println("done......");
		rwLock.readLock().unlock();
	}
	
}
	class ThreadPoolTest implements Runnable{
		ThreadPoolTest (){
			
		}
		@Override
		public void run() {
		 System.out.println(Thread.currentThread().getName());
		 
			try {
				Thread.sleep((long)(Math.random()*2000));
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
			
		}
		
	}
class SemaphoreTest extends Thread{
	Semaphore s = null;
	public SemaphoreTest(Semaphore s) {
		this.s = s;
	}
	@Override
	public void run() {
		try {
			s.acquire();
			System.out.println("aquire......");
			Thread.sleep((long)(Math.random()*3000));
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		System.out.println("release......");
		s.release();
		
		
	}
	
}
class ExchangerTest implements Runnable{
	Exchanger ex = null;
	
	public ExchangerTest(Exchanger ex) {
		this.ex = ex;
	}
	@Override
	public void run() {
		String my = Thread.currentThread().getName();
		String exstr = null;
		try {
			exstr = ex.exchange(my);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		System.out.println(my+"   "+exstr);
		
	}
	
}
class BoyRun implements Runnable{
	CountDownLatch cdl ;
	public BoyRun(CountDownLatch cdl ) {
		this.cdl = cdl;
	}
	@Override
	public void run() {
		System.out.println("cdl"+" +1");
		cdl.countDown();
		System.out.println("cdl"+" +1--");
	}
	
}
class GirlRan implements Runnable {
	CyclicBarrier  cb ;
	public GirlRan(CyclicBarrier  cb ) {
		this.cb = cb;
	}
	@Override
	public void run() {
		System.out.println("cdl"+" +1");
		try {
			Thread.sleep((long)(Math.random()*5000));
			cb.await();
			
		} catch (InterruptedException | BrokenBarrierException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		System.out.println("cdl"+" +1--");
		
	}
	
}
class Boy implements Comparable{
	String name;
	int age;
	public Boy(String name ,int age) {
		this.name = name;
		this.age = age;
	}
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	public int getAge() {
		return age;
	}
	public void setAge(int age) {
		this.age = age;
	}
	public int compareTo(Boy o) {
		
		return this.age - o.age;
	}
	@Override
	public String toString() {
		return "Boy [name=" + name + ", age=" + age + "]";
	}
	
	
	
}


网站名称:day1javaconcurrent包一些知识点
本文来源:http://scpingwu.com/article/ipchhi.html