其他
图文并茂理解 Java 多线程!
点击上方 Java后端,选择 设为星标
优质文章,及时送达
线程
NEW:线程刚创建 RUNNABLE: 在JVM中正在运行的线程,其中运行状态可以有运行中RUNNING和READY两种状态,由系统调度进行状态改变。 BLOCKED:线程处于阻塞状态,等待监视锁,可以重新进行同步代码块中执行 WAITING : 等待状态 TIMED_WAITING: 调用sleep() join() wait()方法可能导致线程处于等待状态 TERMINATED: 线程执行完毕,已经退出
它们最大本质的区别是,Sleep()不释放同步锁,Wait()释放同步锁。 还有用法的上的不同是:Sleep(milliseconds)可以用时间指定来使他自动醒过来,如果时间不到你只能调用Interreput()来强行打断;Wait()可以用Notify()直接唤起。 这两个方法来自不同的类分别是Thread和Object 最主要是Sleep方法没有释放锁,而Wait方法释放了锁,使得其他线程可以使用同步控制块或者方法。
相同 :Sleep()和yield()都会释放CPU。 不同:Sleep()使当前线程进入停滞状态,所以执行Sleep()的线程在指定的时间内肯定不会执行;yield()只是使当前线程重新回到可执行状态,所以执行yield()的线程有可能在进入到可执行状态后马上又被执行。Sleep()可使优先级低的线程得到执行的机会,当然也可以让同优先级和高优先级的线程有执行的机会;yield()只能使同优先级的线程有执行的机会。
互斥条件:顾名思义,线程对资源的访问是排他性,当该线程释放资源后下一线程才可进行占用。 请求和保持:简单来说就是自己拿的不放手又等待新的资源到手。线程T1至少已经保持了一个资源R1占用,但又提出对另一个资源R2请求,而此时,资源R2被其他线程T2占用,于是该线程T1也必须等待,但又对自己保持的资源R1不释放。 不可剥夺:在没有使用完资源时,其他线性不能进行剥夺。 循环等待:一直等待对方线程释放资源。
原子性:Atomic包、CAS算法、Synchronized、Lock。 可见性:Synchronized、Volatile(不能保证原子性)。 有序性:Happens-before规则。
互斥同步:Synchronized、Lock。 非阻塞同步:CAS。 无需同步的方案:如果一个方法本来就不涉及共享数据,那它自然就无需任何同步操作去保证正确性。
Synchronized关键字 Lock CAS、原子变量 ThreadLocl:简单来说就是让每个线程,对同一个变量,都有自己的独有副本,每个线程实际访问的对象都是自己的,自然也就不存在线程安全问题了。 Volatile CopyOnWrite写时复制
public static void main(String[] args) {
new MyThread().start();
}
}
class MyThread extends Thread {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "\t" + Thread.currentThread().getId());
}
}
public static void main(String[] args) {
MyRunnable runnable = new MyRunnable();
new Thread(runnable).start();
}
}
class MyRunnable implements Runnable {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "\t" + Thread.currentThread().getId());
}
}
public static void main(String[] args) throws Exception {
// 将Callable包装成FutureTask,FutureTask也是一种Runnable
MyCallable callable = new MyCallable();
FutureTask<Integer> futureTask = new FutureTask<>(callable);
new Thread(futureTask).start();
// get方法会阻塞调用的线程
Integer sum = futureTask.get();
System.out.println(Thread.currentThread().getName() + Thread.currentThread().getId() + "=" + sum);
}
}
class MyCallable implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println(Thread.currentThread().getName() + "\t" + Thread.currentThread().getId() + "\t" + new Date() + " \tstarting...");
int sum = 0;
for (int i = 0; i <= 100000; i++) {
sum += i;
}
Thread.sleep(5000);
System.out.println(Thread.currentThread().getName() + "\t" + Thread.currentThread().getId() + "\t" + new Date() + " \tover...");
return sum;
}
}
newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。 newFixedThreadPool创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。 newScheduledThreadPool创建一个定长线程池,支持定时及周期性任务执行。 newSingleThreadExecutor创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
public abstract class AbstractExecutorService implements ExecutorService
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
corePoolSize:线程池的核心线程数,即便线程池里没有任何任务,也会有corePoolSize个线程在候着等任务。 maximumPoolSize:最大线程数,不管提交多少任务,线程池里最多工作线程数就是maximumPoolSize。 keepAliveTime:线程的存活时间。当线程池里的线程数大于corePoolSize时,如果等了keepAliveTime时长还没有任务可执行,则线程退出。 Unit:这个用来指定keepAliveTime的单位,比如秒:TimeUnit.SECONDS。 BlockingQueue:一个阻塞队列,提交的任务将会被放到这个队列里。 threadFactory:线程工厂,用来创建线程,主要是为了给线程起名字,默认工厂的线程名字:pool-1-thread-3。 handler:拒绝策略,当线程池里线程被耗尽,且队列也满了的时候会调用。
import java.util.concurrent.*;
public class MyTestMap {
// 定义阻塞队列大小
private static final int maxSize = 5;
public static void main(String[] args){
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(maxSize);
new Thread(new Productor(queue)).start();
new Thread(new Customer(queue)).start();
}
}
class Customer implements Runnable {
private BlockingQueue<Integer> queue;
Customer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
this.cusume();
}
private void cusume() {
while (true) {
try {
int count = (int) queue.take();
System.out.println("customer正在消费第" + count + "个商品===");
// 只是为了方便观察输出结果
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Productor implements Runnable {
private BlockingQueue<Integer> queue;
private int count = 1;
Productor(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
this.product();
}
private void product() {
while (true) {
try {
queue.put(count);
System.out.println("生产者正在生产第" + count + "个商品");
count++;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
//输出如下
/**
生产者正在生产第1个商品
生产者正在生产第2个商品
生产者正在生产第3个商品
生产者正在生产第4个商品
生产者正在生产第5个商品
customer正在消费第1个商品===
*/
ArrayBlockingQueue的初始化必须传入队列大小,LinkedBlockingQueue则可以不传入。 ArrayBlockingQueue用一把锁控制并发,LinkedBlockingQueue俩把锁控制并发,锁的细粒度更细。即前者生产者消费者进出都是一把锁,后者生产者生产进入是一把锁,消费者消费是另一把锁。 ArrayBlockingQueue采用数组的方式存取,LinkedBlockingQueue用Node链表方式存取。
AbortPolicy:不处理,直接抛出异常。 CallerRunsPolicy:只用调用者所在线程来运行任务,即提交任务的线程。 DiscardOldestPolicy:LRU策略,丢弃队列里最近最久不使用的一个任务,并执行当前任务。 DiscardPolicy:不处理,丢弃掉,不抛出异常。
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
RUNNING:在这个状态的线程池能判断接受新提交的任务,并且也能处理阻塞队列中的任务。
SHUTDOWN:处于关闭的状态,该线程池不能接受新提交的任务,但是可以处理阻塞队列中已经保存的任务,在线程处于RUNNING状态,调用shutdown()方法能切换为该状态。
STOP:线程池处于该状态时既不能接受新的任务也不能处理阻塞队列中的任务,并且能中断现在线程中的任务。当线程处于RUNNING和SHUTDOWN状态,调用shutdownNow()方法就可以使线程变为该状态。
TIDYING:在SHUTDOWN状态下阻塞队列为空,且线程中的工作线程数量为0就会进入该状态,当在STOP状态下时,只要线程中的工作线程数量为0就会进入该状态。
TERMINATED:在TIDYING状态下调用terminated()方法就会进入该状态。可以认为该状态是最终的终止状态。
判断核心线程是否已满,是进入队列,否:创建线程 判断等待队列是否已满,是:查看线程池是否已满,否:进入等待队列 查看线程池是否已满,是:拒绝,否创建线程
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//判断当前活跃线程数是否小于corePoolSize,如果小于,则调用addWorker创建线程执行任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//如果不小于corePoolSize,则将任务添加到workQueue队列。
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//如果放入workQueue失败,则创建线程执行任务,如果这时创建线程失败(当前线程数不小于maximumPoolSize时),就会调用reject(内部调用handler)拒绝接受任务。
else if (!addWorker(command, false))
reject(command);
}
创建Worker对象,同时也会实例化一个Thread对象。在创建Worker时会调用threadFactory来创建一个线程。 然后启动这个线程。
private static int ctlOf(int rs, int wc) { return rs | wc; }
runState:即rs 表明当前线程池的状态,是否处于Running,Shutdown,Stop,Tidying。 workerCount:即wc表明当前有效的线程数。
|||
31~29位
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
shutdown会把线程池的状态改为SHUTDOWN,而shutdownNow把当前线程池状态改为STOP。 shutdown只会中断所有空闲的线程,而shutdownNow会中断所有的线程。 shutdown返回方法为空,会将当前任务队列中的所有任务执行完毕;而shutdownNow把任务队列中的所有任务都取出来返回。
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
countDownLatch是一个计数器,线程完成一个记录一个,计数器递减,只能只用一次。
CyclicBarrier的计数器更像一个阀门,需要所有线程都到达,然后继续执行,计数器递增,提供Reset功能,可以多次使用。
/**
*
* @author hxz
* @description 多线程测试类
* @version 1.0
* @data 2020年2月15日 上午9:10:09
*/
public class MyThreadTest {
public static void main(String[] args) throws Exception {
notifyThreadWithVolatile();
}
/**
* 定义一个测试
*/
private static volatile boolean flag = false;
/**
* 计算I++,当I==5时,通知线程B
* @throws Exception
*/
private static void notifyThreadWithVolatile() throws Exception {
Thread thc = new Thread("线程A"){
@Override
public void run() {
for (int i = 0; i < 10; i++) {
if (i == 5) {
flag = true;
try {
Thread.sleep(500L);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
break;
}
System.out.println(Thread.currentThread().getName() + "====" + i);
}
}
};
Thread thd = new Thread("线程B") {
@Override
public void run() {
while (true) {
// 防止伪唤醒 所以使用了while
while (flag) {
System.out.println(Thread.currentThread().getName() + "收到通知");
System.out.println("do something");
try {
Thread.sleep(500L);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return ;
}
}
}
};
thd.start();
Thread.sleep(1000L);
thc.start();
}
}
-END-