Java线程池多线程存储数据
1. java多线程有几种实现方法线程之间如何同步
一、为什么要线程同步
因为当我们有多个线程要同时访问一个变量或对象时,如果这些线程中既有读又有写操作时,就会导致变量值或对象的状态出现混乱,从而导致程序异常。举个例子,如果一个银行账户同时被两个线程操作,一个取100块,一个存钱100块。假设账户原本有0块,如果取钱线程和存钱线程同时发生,会出现什么结果呢?取钱不成功,账户余额是100.取钱成功了,账户余额是0.那到底是哪个呢?很难说清楚。因此多线程同步就是要解决这个问题。
二、不同步时的代码
Bank.Java
packagethreadTest;
/**
*@authorww
*
*/
publicclassBank{
privateintcount=0;//账户余额
//存钱
publicvoidaddMoney(intmoney){
count+=money;
System.out.println(System.currentTimeMillis()+"存进:"+money);
}
//取钱
publicvoidsubMoney(intmoney){
if(count-money<0){
System.out.println("余额不足");
return;
}
count-=money;
System.out.println(+System.currentTimeMillis()+"取出:"+money);
}
//查询
publicvoidlookMoney(){
System.out.println("账户余额:"+count);
}
}
SyncThreadTest.java
packagethreadTest;
publicclassSyncThreadTest{
publicstaticvoidmain(Stringargs[]){
finalBankbank=newBank();
Threadtadd=newThread(newRunnable(){
@Override
publicvoidrun(){
//TODOAuto-generatedmethodstub
while(true){
try{
Thread.sleep(1000);
}catch(InterruptedExceptione){
//TODOAuto-generatedcatchblock
e.printStackTrace();
}
bank.addMoney(100);
bank.lookMoney();
System.out.println(" ");
}
}
});
Threadtsub=newThread(newRunnable(){
@Override
publicvoidrun(){
//TODOAuto-generatedmethodstub
while(true){
bank.subMoney(100);
bank.lookMoney();
System.out.println(" ");
try{
Thread.sleep(1000);
}catch(InterruptedExceptione){
//TODOAuto-generatedcatchblock
e.printStackTrace();
}
}
}
});
tsub.start();
tadd.start();
}
}
余额不足
账户余额:0
余额不足
账户余额:100
1441790503354存进:100
账户余额:100
1441790504354存进:100
账户余额:100
1441790504354取出:100
账户余额:100
1441790505355存进:100
账户余额:100
1441790505355取出:100
账户余额:100
三、使用同步时的代码
(1)同步方法:
即有synchronized关键字修饰的方法。由于java的每个对象都有一个内置锁,当用此关键字修饰方法时,内置锁会保护整个方法。在调用该方法前,需要获得内置锁,否则就处于阻塞状态。
修改后的Bank.java
packagethreadTest;
/**
*@authorww
*
*/
publicclassBank{
privateintcount=0;//账户余额
//存钱
(intmoney){
count+=money;
System.out.println(System.currentTimeMillis()+"存进:"+money);
}
//取钱
(intmoney){
if(count-money<0){
System.out.println("余额不足");
return;
}
count-=money;
System.out.println(+System.currentTimeMillis()+"取出:"+money);
}
//查询
publicvoidlookMoney(){
System.out.println("账户余额:"+count);
}
}
再看看运行结果:
余额不足
账户余额:0
余额不足
账户余额:0
1441790837380存进:100
账户余额:100
1441790838380取出:100
账户余额:0
1441790838380存进:100
账户余额:100
1441790839381取出:100
账户余额:0
瞬间感觉可以理解了吧。
注: synchronized关键字也可以修饰静态方法,此时如果调用该静态方法,将会锁住整个类
(2)同步代码块
即有synchronized关键字修饰的语句块。被该关键字修饰的语句块会自动被加上内置锁,从而实现同步
Bank.java代码如下:
packagethreadTest;
/**
*@authorww
*
*/
publicclassBank{
privateintcount=0;//账户余额
//存钱
publicvoidaddMoney(intmoney){
synchronized(this){
count+=money;
}
System.out.println(System.currentTimeMillis()+"存进:"+money);
}
//取钱
publicvoidsubMoney(intmoney){
synchronized(this){
if(count-money<0){
System.out.println("余额不足");
return;
}
count-=money;
}
System.out.println(+System.currentTimeMillis()+"取出:"+money);
}
//查询
publicvoidlookMoney(){
System.out.println("账户余额:"+count);
}
}
运行结果如下:
余额不足
账户余额:0
1441791806699存进:100
账户余额:100
1441791806700取出:100
账户余额:0
1441791807699存进:100
账户余额:100
效果和方法一差不多。
注:同步是一种高开销的操作,因此应该尽量减少同步的内容。通常没有必要同步整个方法,使用synchronized代码块同步关键代码即可。
(3)使用特殊域变量(volatile)实现线程同步
a.volatile关键字为域变量的访问提供了一种免锁机制
b.使用volatile修饰域相当于告诉虚拟机该域可能会被其他线程更新
c.因此每次使用该域就要重新计算,而不是使用寄存器中的值
d.volatile不会提供任何原子操作,它也不能用来修饰final类型的变量
Bank.java代码如下:
packagethreadTest;
/**
*@authorww
*
*/
publicclassBank{
privatevolatileintcount=0;//账户余额
//存钱
publicvoidaddMoney(intmoney){
count+=money;
System.out.println(System.currentTimeMillis()+"存进:"+money);
}
//取钱
publicvoidsubMoney(intmoney){
if(count-money<0){
System.out.println("余额不足");
return;
}
count-=money;
System.out.println(+System.currentTimeMillis()+"取出:"+money);
}
//查询
publicvoidlookMoney(){
System.out.println("账户余额:"+count);
}
}
运行效果怎样呢?
余额不足
账户余额:0
余额不足
账户余额:100
1441792010959存进:100
账户余额:100
1441792011960取出:100
账户余额:0
1441792011961存进:100
账户余额:100
是不是又看不懂了,又乱了。这是为什么呢?就是因为volatile不能保证原子操作导致的,因此volatile不能代替synchronized。此外volatile会组织编译器对代码优化,因此能不使用它就不适用它吧。它的原理是每次要线程要访问volatile修饰的变量时都是从内存中读取,而不是存缓存当中读取,因此每个线程访问到的变量值都是一样的。这样就保证了同步。
(4)使用重入锁实现线程同步
在JavaSE5.0中新增了一个java.util.concurrent包来支持同步。ReentrantLock类是可重入、互斥、实现了Lock接口的锁,它与使用synchronized方法和快具有相同的基本行为和语义,并且扩展了其能力。
ReenreantLock类的常用方法有:
ReentrantLock() : 创建一个ReentrantLock实例
lock() : 获得锁
unlock() : 释放锁
注:ReentrantLock()还有一个可以创建公平锁的构造方法,但由于能大幅度降低程序运行效率,不推荐使用
Bank.java代码修改如下:
packagethreadTest;
importjava.util.concurrent.locks.Lock;
importjava.util.concurrent.locks.ReentrantLock;
/**
*@authorww
*
*/
publicclassBank{
privateintcount=0;//账户余额
//需要声明这个锁
privateLocklock=newReentrantLock();
//存钱
publicvoidaddMoney(intmoney){
lock.lock();//上锁
try{
count+=money;
System.out.println(System.currentTimeMillis()+"存进:"+money);
}finally{
lock.unlock();//解锁
}
}
//取钱
publicvoidsubMoney(intmoney){
lock.lock();
try{
if(count-money<0){
System.out.println("余额不足");
return;
}
count-=money;
System.out.println(+System.currentTimeMillis()+"取出:"+money);
}finally{
lock.unlock();
}
}
//查询
publicvoidlookMoney(){
System.out.println("账户余额:"+count);
}
}
运行效果怎么样呢?
余额不足
账户余额:0
余额不足
账户余额:0
1441792891934存进:100
账户余额:100
1441792892935存进:100
账户余额:200
1441792892954取出:100
账户余额:100
效果和前两种方法差不多。
如果synchronized关键字能满足用户的需求,就用synchronized,因为它能简化代码 。如果需要更高级的功能,就用ReentrantLock类,此时要注意及时释放锁,否则会出现死锁,通常在finally代码释放锁
(5)使用局部变量实现线程同步
Bank.java代码如下:
packagethreadTest;
/**
*@authorww
*
*/
publicclassBank{
privatestaticThreadLocal<Integer>count=newThreadLocal<Integer>(){
@Override
protectedIntegerinitialValue(){
//TODOAuto-generatedmethodstub
return0;
}
};
//存钱
publicvoidaddMoney(intmoney){
count.set(count.get()+money);
System.out.println(System.currentTimeMillis()+"存进:"+money);
}
//取钱
publicvoidsubMoney(intmoney){
if(count.get()-money<0){
System.out.println("余额不足");
return;
}
count.set(count.get()-money);
System.out.println(+System.currentTimeMillis()+"取出:"+money);
}
//查询
publicvoidlookMoney(){
System.out.println("账户余额:"+count.get());
}
}
运行效果:
余额不足
账户余额:0
余额不足
账户余额:0
1441794247939存进:100
账户余额:100
余额不足
1441794248940存进:100
账户余额:0
账户余额:200
余额不足
账户余额:0
1441794249941存进:100
账户余额:300
看了运行效果,一开始一头雾水,怎么只让存,不让取啊?看看ThreadLocal的原理:
如果使用ThreadLocal管理变量,则每一个使用该变量的线程都获得该变量的副本,副本之间相互独立,这样每一个线程都可以随意修改自己的变量副本,而不会对其他线程产生影响。现在明白了吧,原来每个线程运行的都是一个副本,也就是说存钱和取钱是两个账户,知识名字相同而已。所以就会发生上面的效果。
ThreadLocal与同步机制
a.ThreadLocal与同步机制都是为了解决多线程中相同变量的访问冲突问题
b.前者采用以"空间换时间"的方法,后者采用以"时间换空间"的方式
2. java多线程访问数据库怎么优化啊,并发很大
个人觉得高写入并发的话先用缓存缓冲一下,可以合并的写入合并成批量写入可以管一些用但终归写入量很大的话还是要在数据库端优化了,把并发写均衡到多台服务器上,应该没有别的办法了。如果瓶颈不再数据库那就是应用服务器处理能力不足,升级应用服务器。
3. Java实现通用线程池
线程池通俗的描述就是预先创建若干空闲线程 等到需要用多线程去处理事务的时候去唤醒某些空闲线程执行处理任务 这样就省去了频繁创建线程的时间 因为频 繁创建线程是要耗费大量的CPU资源的 如果一个应用程序需要频繁地处理大量并发事务 不断的创建销毁线程往往会大大地降低系统的效率 这时候线程池就派 上用场了
本文旨在使用Java语言编写一个通用的线程池 当需要使用线程池处理事务时 只需按照指定规范封装好事务处理对象 然后用已有的线程池对象去自动选择空 闲线程自动调用事务处理对象即可 并实现线程池的动态修改(修改当前线程数 最大线程数等) 下面是实现代码
//ThreadTask java
package polarman threadpool;
/** *//**
*线程任务
* @author ryang
*
*/
public interface ThreadTask {
public void run();
}
//PooledThread java
package polarman threadpool;
import java util Collection; import java util Vector;
/** *//**
*接受线程池管理的线程
* @author ryang
*
*/
public class PooledThread extends Thread {
protected Vector tasks = new Vector();
protected boolean running = false;
protected boolean stopped = false;
protected boolean paused = false;
protected boolean killed = false;
private ThreadPool pool;
public PooledThread(ThreadPool pool) { this pool = pool;
}
public void putTask(ThreadTask task) { tasks add(task);
}
public void putTasks(ThreadTask[] tasks) { for(int i= ; i<tasks length; i++) this tasks add(tasks[i]);
}
public void putTasks(Collection tasks) { this tasks addAll(tasks);
}
protected ThreadTask popTask() { if(tasks size() > ) return (ThreadTask)tasks remove( );
else
return null;
}
public boolean isRunning() {
return running;
}
public void stopTasks() {
stopped = true;
}
public void stopTasksSync() {
stopTasks();
while(isRunning()) { try {
sleep( );
} catch (InterruptedException e) {
}
}
}
public void pauseTasks() {
paused = true;
}
public void pauseTasksSync() {
pauseTasks();
while(isRunning()) { try {
sleep( );
} catch (InterruptedException e) {
}
}
}
public void kill() { if(!running)
interrupt();
else
killed = true;
}
public void killSync() {
kill();
while(isAlive()) { try {
sleep( );
} catch (InterruptedException e) {
}
}
}
public synchronized void startTasks() {
running = true;
this notify();
}
public synchronized void run() { try { while(true) { if(!running || tasks size() == ) { pool notifyForIdleThread(); //System out println(Thread currentThread() getId() + : 空闲 ); this wait(); }else {
ThreadTask task;
while((task = popTask()) != null) { task run(); if(stopped) {
stopped = false;
if(tasks size() > ) { tasks clear(); System out println(Thread currentThread() getId() + : Tasks are stopped );
break;
}
}
if(paused) {
paused = false;
if(tasks size() > ) { System out println(Thread currentThread() getId() + : Tasks are paused );
break;
}
}
}
running = false;
}
if(killed) {
killed = false;
break;
}
}
}catch(InterruptedException e) {
return;
}
//System out println(Thread currentThread() getId() + : Killed );
}
}
//ThreadPool java
package polarman threadpool;
import java util Collection; import java util Iterator; import java util Vector;
/** *//**
*线程池
* @author ryang
*
*/
public class ThreadPool {
protected int maxPoolSize;
protected int initPoolSize;
protected Vector threads = new Vector();
protected boolean initialized = false;
protected boolean hasIdleThread = false;
public ThreadPool(int maxPoolSize int initPoolSize) { this maxPoolSize = maxPoolSize; this initPoolSize = initPoolSize;
}
public void init() {
initialized = true;
for(int i= ; i<initPoolSize; i++) {
PooledThread thread = new PooledThread(this);
thread start(); threads add(thread);
}
//System out println( 线程池初始化结束 线程数= + threads size() + 最大线程数= + maxPoolSize);
}
public void setMaxPoolSize(int maxPoolSize) { //System out println( 重设最大线程数 最大线程数= + maxPoolSize); this maxPoolSize = maxPoolSize;
if(maxPoolSize < getPoolSize())
setPoolSize(maxPoolSize);
}
/** *//**
*重设当前线程数
* 若需杀掉某线程 线程不会立刻杀掉 而会等到线程中的事务处理完成* 但此方法会立刻从线程池中移除该线程 不会等待事务处理结束
* @param size
*/
public void setPoolSize(int size) { if(!initialized) {
initPoolSize = size;
return;
}else if(size > getPoolSize()) { for(int i=getPoolSize(); i<size && i<maxPoolSize; i++) {
PooledThread thread = new PooledThread(this);
thread start(); threads add(thread);
}
}else if(size < getPoolSize()) { while(getPoolSize() > size) { PooledThread th = (PooledThread)threads remove( ); th kill();
}
}
//System out println( 重设线程数 线程数= + threads size());
}
public int getPoolSize() { return threads size();
}
protected void notifyForIdleThread() {
hasIdleThread = true;
}
protected boolean waitForIdleThread() {
hasIdleThread = false;
while(!hasIdleThread && getPoolSize() >= maxPoolSize) { try { Thread sleep( ); } catch (InterruptedException e) {
return false;
}
}
return true;
}
public synchronized PooledThread getIdleThread() { while(true) { for(Iterator itr=erator(); itr hasNext();) { PooledThread th = (PooledThread)itr next(); if(!th isRunning())
return th;
}
if(getPoolSize() < maxPoolSize) {
PooledThread thread = new PooledThread(this);
thread start(); threads add(thread);
return thread;
}
//System out println( 线程池已满 等待 );
if(waitForIdleThread() == false)
return null;
}
}
public void processTask(ThreadTask task) {
PooledThread th = getIdleThread();
if(th != null) { th putTask(task); th startTasks();
}
}
public void processTasksInSingleThread(ThreadTask[] tasks) {
PooledThread th = getIdleThread();
if(th != null) { th putTasks(tasks); th startTasks();
}
}
public void processTasksInSingleThread(Collection tasks) {
PooledThread th = getIdleThread();
if(th != null) { th putTasks(tasks); th startTasks();
}
}
}
下面是线程池的测试程序
//ThreadPoolTest java
import java io BufferedReader; import java io IOException; import java io InputStreamReader;
import polarman threadpool ThreadPool; import polarman threadpool ThreadTask;
public class ThreadPoolTest {
public static void main(String[] args) { System out println( quit 退出 ); System out println( task A 启动任务A 时长为 秒 ); System out println( size 设置当前线程池大小为 ); System out println( max 设置线程池最大线程数为 ); System out println();
final ThreadPool pool = new ThreadPool( ); pool init();
Thread cmdThread = new Thread() { public void run() {
BufferedReader reader = new BufferedReader(new InputStreamReader(System in));
while(true) { try { String line = reader readLine(); String words[] = line split( ); if(words[ ] equalsIgnoreCase( quit )) { System exit( ); }else if(words[ ] equalsIgnoreCase( size ) && words length >= ) { try { int size = Integer parseInt(words[ ]); pool setPoolSize(size); }catch(Exception e) {
}
}else if(words[ ] equalsIgnoreCase( max ) && words length >= ) { try { int max = Integer parseInt(words[ ]); pool setMaxPoolSize(max); }catch(Exception e) {
}
}else if(words[ ] equalsIgnoreCase( task ) && words length >= ) { try { int timelen = Integer parseInt(words[ ]); SimpleTask task = new SimpleTask(words[ ] timelen * ); pool processTask(task); }catch(Exception e) {
}
}
} catch (IOException e) { e printStackTrace();
}
}
}
};
cmdThread start();
/**//*
for(int i= ; i< ; i++){
SimpleTask task = new SimpleTask( Task + i (i+ )* ); pool processTask(task);
}*/
}
}
class SimpleTask implements ThreadTask {
private String taskName;
private int timeLen;
public SimpleTask(String taskName int timeLen) { this taskName = taskName; this timeLen = timeLen;
}
public void run() { System out println(Thread currentThread() getId() +
: START TASK + taskName + );
try { Thread sleep(timeLen); } catch (InterruptedException e) {
}
System out println(Thread currentThread() getId() +
: END TASK + taskName + );
}
}
使用此线程池相当简单 下面两行代码初始化线程池
ThreadPool pool = new ThreadPool( ); pool init();
要处理的任务实现ThreadTask 接口即可(如测试代码里的SimpleTask) 这个接口只有一个方法run()
两行代码即可调用
lishixin/Article/program/Java/hx/201311/27203
4. javase线程怎么存储到容器
Java 并发重要知识点
java 线程池
ThreadPoolExecutor 类分析
ThreadPoolExecutor 类中提供的四个构造方法。我们来看最长的那个,其余三个都是在这个构造方法的基础上产生(其他几个构造方法说白点都是给定某些默认参数的构造方法比如默认制定拒绝策略是什么)。
/**
* 用给定的初始参数创建一个新的ThreadPoolExecutor。
*/
public ThreadPoolExecutor(int corePoolSize,//线程池的核心线程数量
int maximumPoolSize,//线程池的最大线程数
long keepAliveTime,//当线程数大于核心线程数时,多余的空闲线程存活的最长时间
TimeUnit unit,//时间单位
BlockingQueue<Runnable> workQueue,//任务队列,用来储存等待执行任务的队列
ThreadFactory threadFactory,//线程工厂,用来创建线程,一般默认即可
RejectedExecutionHandler handler//拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务
) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
下面这些对创建非常重要,在后面使用线程池的过程中你一定会用到!所以,务必拿着小本本记清楚。
ThreadPoolExecutor 3 个最重要的参数:
corePoolSize : 核心线程数线程数定义了最小可以同时运行的线程数量。
maximumPoolSize : 当队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。
workQueue: 当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。
ThreadPoolExecutor其他常见参数 :
keepAliveTime:当线程池中的线程数量大于 corePoolSize 的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了 keepAliveTime才会被回收销毁;
unit : keepAliveTime 参数的时间单位。
threadFactory :executor 创建新线程的时候会用到。
handler :饱和策略。关于饱和策略下面单独介绍一下。
下面这张图可以加深你对线程池中各个参数的相互关系的理解(图片来源:《Java 性能调优实战》):
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-nzbqGRz9-1654600571133)(https://javaguide.cn/assets/%E7%BA%BF%E7%A8%8B%E6%B1%A0%E5%90%84%E4%B8%AA%E5%8F%82%E6%95%B0%E4%B9%8B%E9%97%B4%E7%9A%84%E5%85%B3%E7%B3%BB.d65f3309.png)]
ThreadPoolExecutor 饱和策略定义:
如果当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了任务时,ThreadPoolTaskExecutor 定义一些策略:
ThreadPoolExecutor.AbortPolicy :抛出 RejectedExecutionException来拒绝新任务的处理。
ThreadPoolExecutor.CallerRunsPolicy :调用执行自己的线程运行任务,也就是直接在调用execute方法的线程中运行(run)被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交速度,影响程序的整体性能。如果您的应用程序可以承受此延迟并且你要求任何一个任务请求都要被执行的话,你可以选择这个策略。
ThreadPoolExecutor.DiscardPolicy :不处理新任务,直接丢弃掉。
ThreadPoolExecutor.DiscardOldestPolicy : 此策略将丢弃最早的未处理的任务请求。
举个例子:
Spring 通过 ThreadPoolTaskExecutor 或者我们直接通过 ThreadPoolExecutor 的构造函数创建线程池的时候,当我们不指定 RejectedExecutionHandler 饱和策略的话来配置线程池的时候默认使用的是 ThreadPoolExecutor.AbortPolicy。在默认情况下,ThreadPoolExecutor 将抛出 RejectedExecutionException 来拒绝新来的任务 ,这代表你将丢失对这个任务的处理。 对于可伸缩的应用程序,建议使用 ThreadPoolExecutor.CallerRunsPolicy。当最大池被填满时,此策略为我们提供可伸缩队列。(这个直接查看 ThreadPoolExecutor 的构造函数源码就可以看出,比较简单的原因,这里就不贴代码了。)
推荐使用 ThreadPoolExecutor 构造函数创建线程池
在《阿里巴巴 Java 开发手册》“并发处理”这一章节,明确指出线程资源必须通过线程池提供,不允许在应用中自行显式创建线程。
为什么呢?
使用线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源开销,解决资源不足的问题。如果不使用线程池,有可能会造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。
另外,《阿里巴巴 Java 开发手册》中强制线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 构造函数的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险
Executors 返回线程池对象的弊端如下(后文会详细介绍到):
FixedThreadPool 和 SingleThreadExecutor : 允许请求的队列长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM。
CachedThreadPool 和 ScheledThreadPool : 允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。
方式一:通过ThreadPoolExecutor构造函数实现(推荐)通过构造方法实现
方式二:通过 Executor 框架的工具类 Executors 来实现 我们可以创建三种类型的 ThreadPoolExecutor:
FixedThreadPool
SingleThreadExecutor
CachedThreadPool
对应 Executors 工具类中的方法如图所示:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YGd4ygZu-1654600571136)(https://javaguide.cn/assets/Executors%E5%B7%A5%E5%85%B7%E7%B1%BB.4b0cbd16.png)]
正确配置线程池参数
说到如何给线程池配置参数,美团的骚操作至今让我难忘(后面会提到)!
我们先来看一下各种书籍和博客上一般推荐的配置线程池参数的方式,可以作为参考!
常规操作
很多人甚至可能都会觉得把线程池配置过大一点比较好!我觉得这明显是有问题的。就拿我们生活中非常常见的一例子来说:并不是人多就能把事情做好,增加了沟通交流成本。你本来一件事情只需要 3 个人做,你硬是拉来了 6 个人,会提升做事效率嘛?我想并不会。 线程数量过多的影响也是和我们分配多少人做事情一样,对于多线程这个场景来说主要是增加了上下文切换成本。不清楚什么是上下文切换的话,可以看我下面的介绍。
上下文切换:
多线程编程中一般线程的个数都大于 CPU 核心的个数,而一个 CPU 核心在任意时刻只能被一个线程使用,为了让这些线程都能得到有效执行,CPU 采取的策略是为每个线程分配时间片并轮转的形式。当一个线程的时间片用完的时候就会重新处于就绪状态让给其他线程使用,这个过程就属于一次上下文切换。概括来说就是:当前任务在执行完 CPU 时间片切换到另一个任务之前会先保存自己的状态,以便下次再切换回这个任务时,可以再加载这个任务的状态。任务从保存到再加载的过程就是一次上下文切换。
上下文切换通常是计算密集型的。也就是说,它需要相当可观的处理器时间,在每秒几十上百次的切换中,每次切换都需要纳秒量级的时间。所以,上下文切换对系统来说意味着消耗大量的 CPU 时间,事实上,可能是操作系统中时间消耗最大的操作。
Linux 相比与其他操作系统(包括其他类 Unix 系统)有很多的优点,其中有一项就是,其上下文切换和模式切换的时间消耗非常少。
类比于实现世界中的人类通过合作做某件事情,我们可以肯定的一点是线程池大小设置过大或者过小都会有问题,合适的才是最好。
如果我们设置的线程池数量太小的话,如果同一时间有大量任务/请求需要处理,可能会导致大量的请求/任务在任务队列中排队等待执行,甚至会出现任务队列满了之后任务/请求无法处理的情况,或者大量任务堆积在任务队列导致 OOM。这样很明显是有问题的! CPU 根本没有得到充分利用。
但是,如果我们设置线程数量太大,大量线程可能会同时在争取 CPU 资源,这样会导致大量的上下文切换,从而增加线程的执行时间,影响了整体执行效率。
有一个简单并且适用面比较广的公式:
CPU 密集型任务(N+1): 这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。
I/O 密集型任务(2N): 这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。
如何判断是 CPU 密集任务还是 IO 密集任务?
CPU 密集型简单理解就是利用 CPU 计算能力的任务比如你在内存中对大量数据进行排序。但凡涉及到网络读取,文件读取这类都是 IO 密集型,这类任务的特点是 CPU 计算耗费时间相比于等待 IO 操作完成的时间来说很少,大部分时间都花在了等待 IO 操作完成上。
美团的骚操作
美团技术团队在《Java线程池实现原理及其在美团业务中的实践》open in new window这篇文章中介绍到对线程池参数实现可自定义配置的思路和方法。
美团技术团队的思路是主要对线程池的核心参数实现自定义可配置。这三个核心参数是:
corePoolSize : 核心线程数线程数定义了最小可以同时运行的线程数量。
maximumPoolSize : 当队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。
workQueue: 当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。
为什么是这三个参数?
我在这篇《新手也能看懂的线程池学习总结》open in new window 中就说过这三个参数是 ThreadPoolExecutor 最重要的参数,它们基本决定了线程池对于任务的处理策略。
如何支持参数动态配置? 且看 ThreadPoolExecutor 提供的下面这些方法。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Sm89qdJZ-1654600571137)(https://javaguide.cn/assets/b6fd95a7-4c9d-4fc6-ad26-890adb3f6c4c.5ff332dc.png)]
格外需要注意的是corePoolSize, 程序运行期间的时候,我们调用 setCorePoolSize() 这个方法的话,线程池会首先判断当前工作线程数是否大于corePoolSize,如果大于的话就会回收工作线程。
另外,你也看到了上面并没有动态指定队列长度的方法,美团的方式是自定义了一个叫做 的队列(主要就是把LinkedBlockingQueue的capacity 字段的final关键字修饰给去掉了,让它变为可变的)。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-cmNN5yAL-1654600571138)(https://javaguide.cn/assets/19a0255a-6ef3-4835-98d1-a839d1983332.b334d1e9.png)]
还没看够?推荐 why神的[《如何设置线程池参数?美团给出了一个让面试官虎躯一震的回答。》open in new window](如何设置线程池参数?美团给出了一个让面试官虎躯一震的回答。 (qq.com))这篇文章,深度剖析,很不错哦!
Java 常见并发容器
JDK 提供的这些容器大部分在 java.util.concurrent 包中。
ConcurrentHashMap : 线程安全的 HashMap
CopyOnWriteArrayList : 线程安全的 List,在读多写少的场合性能非常好,远远好于 Vector。
ConcurrentLinkedQueue : 高效的并发队列,使用链表实现。可以看做一个线程安全的 LinkedList,这是一个非阻塞队列。
BlockingQueue : 这是一个接口,JDK 内部通过链表、数组等方式实现了这个接口。表示阻塞队列,非常适合用于作为数据共享的通道。
ConcurrentSkipListMap : 跳表的实现。这是一个 Map,使用跳表的数据结构进行快速查找。
ConcurrentHashMap
我们知道 HashMap 不是线程安全的,在并发场景下如果要保证一种可行的方式是使用 Collections.synchronizedMap() 方法来包装我们的 HashMap。但这是通过使用一个全局的锁来同步不同线程间的并发访问,因此会带来不可忽视的性能问题。
所以就有了 HashMap 的线程安全版本—— ConcurrentHashMap 的诞生。
在 ConcurrentHashMap 中,无论是读操作还是写操作都能保证很高的性能:在进行读操作时(几乎)不需要加锁,而在写操作时通过锁分段技术只对所操作的段加锁而不影响客户端对其它段的访问。
CopyOnWriteArrayList
CopyOnWriteArrayList 简介
public class CopyOnWriteArrayList<E>
extends Object
implements List<E>, RandomAccess, Cloneable, Serializable
在很多应用场景中,读操作可能会远远大于写操作。由于读操作根本不会修改原有的数据,因此对于每次读取都进行加锁其实是一种资源浪费。我们应该允许多个线程同时访问 List 的内部数据,毕竟读取操作是安全的。
这和我们之前在多线程章节讲过 ReentrantReadWriteLock 读写锁的思想非常类似,也就是读读共享、写写互斥、读写互斥、写读互斥。JDK 中提供了 CopyOnWriteArrayList 类比相比于在读写锁的思想又更进一步。为了将读取的性能发挥到极致,CopyOnWriteArrayList 读取是完全不用加锁的,并且更厉害的是:写入也不会阻塞读取操作。只有写入和写入之间需要进行同步等待。这样一来,读操作的性能就会大幅度提升。那它是怎么做的呢?
CopyOnWriteArrayList 是如何做到的?
CopyOnWriteArrayList 类的所有可变操作(add,set 等等)都是通过创建底层数组的新副本来实现的。当 List 需要被修改的时候,我并不修改原有内容,而是对原有数据进行一次复制,将修改的内容写入副本。写完之后,再将修改完的副本替换原来的数据,这样就可以保证写操作不会影响读操作了。
从 CopyOnWriteArrayList 的名字就能看出 CopyOnWriteArrayList 是满足 CopyOnWrite 的。所谓 CopyOnWrite 也就是说:在计算机,如果你想要对一块内存进行修改时,我们不在原有内存块中进行写操作,而是将内存拷贝一份,在新的内存中进行写操作,写完之后呢,就将指向原来内存指针指向新的内存,原来的内存就可以被回收掉了。
CopyOnWriteArrayList 读取和写入源码简单分析
CopyOnWriteArrayList 读取操作的实现
读取操作没有任何同步控制和锁操作,理由就是内部数组 array 不会发生修改,只会被另外一个 array 替换,因此可以保证数据安全。
/** The array, accessed only via getArray/setArray. */
private transient volatile Object[] array;
public E get(int index) {
return get(getArray(), index);
}
@SuppressWarnings("unchecked")
private E get(Object[] a, int index) {
return (E) a[index];
}
final Object[] getArray() {
return array;
}
CopyOnWriteArrayList 写入操作的实现
CopyOnWriteArrayList 写入操作 add()方法在添加集合的时候加了锁,保证了同步,避免了多线程写的时候会 出多个副本出来。
/**
* Appends the specified element to the end of this list.
*
* @param e element to be appended to this list
* @return {@code true} (as specified by {@link Collection#add})
*/
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();//加锁
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.Of(elements, len + 1);//拷贝新数组
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();//释放锁
}
}
ConcurrentLinkedQueue
Java 提供的线程安全的 Queue 可以分为阻塞队列和非阻塞队列,其中阻塞队列的典型例子是 BlockingQueue,非阻塞队列的典型例子是 ConcurrentLinkedQueue,在实际应用中要根据实际需要选用阻塞队列或者非阻塞队列。 阻塞队列可以通过加锁来实现,非阻塞队列可以通过 CAS 操作实现。
从名字可以看出,ConcurrentLinkedQueue这个队列使用链表作为其数据结构.ConcurrentLinkedQueue 应该算是在高并发环境中性能最好的队列了。它之所有能有很好的性能,是因为其内部复杂的实现。
ConcurrentLinkedQueue 内部代码我们就不分析了,大家知道 ConcurrentLinkedQueue 主要使用 CAS 非阻塞算法来实现线程安全就好了。
ConcurrentLinkedQueue 适合在对性能要求相对较高,同时对队列的读写存在多个线程同时进行的场景,即如果对队列加锁的成本较高则适合使用无锁的 ConcurrentLinkedQueue 来替代。
BlockingQueue
BlockingQueue 简介
上面我们己经提到了 ConcurrentLinkedQueue 作为高性能的非阻塞队列。下面我们要讲到的是阻塞队列——BlockingQueue。阻塞队列(BlockingQueue)被广泛使用在“生产者-消费者”问题中,其原因是 BlockingQueue 提供了可阻塞的插入和移除的方法。当队列容器已满,生产者线程会被阻塞,直到队列未满;当队列容器为空时,消费者线程会被阻塞,直至队列非空时为止。
BlockingQueue 是一个接口,继承自 Queue,所以其实现类也可以作为 Queue 的实现来使用,而 Queue 又继承自 Collection 接口。下面是 BlockingQueue 的相关实现类:
BlockingQueue 的实现类
下面主要介绍一下 3 个常见的 BlockingQueue 的实现类:ArrayBlockingQueue、LinkedBlockingQueue 、PriorityBlockingQueue 。
ArrayBlockingQueue
ArrayBlockingQueue 是 BlockingQueue 接口的有界队列实现类,底层采用数组来实现。
public class ArrayBlockingQueue<E>
extends AbstractQueue<E>
implements BlockingQueue<E>, Serializable{}
ArrayBlockingQueue 一旦创建,容量不能改变。其并发控制采用可重入锁 ReentrantLock ,不管是插入操作还是读取操作,都需要获取到锁才能进行操作。当队列容量满时,尝试将元素放入队列将导致操作阻塞;尝试从一个空队列中取一个元素也会同样阻塞。
ArrayBlockingQueue 默认情况下不能保证线程访问队列的公平性,所谓公平性是指严格按照线程等待的绝对时间顺序,即最先等待的线程能够最先访问到 ArrayBlockingQueue。而非公平性则是指访问 ArrayBlockingQueue 的顺序不是遵守严格的时间顺序,有可能存在,当 ArrayBlockingQueue 可以被访问时,长时间阻塞的线程依然无法访问到 ArrayBlockingQueue。如果保证公平性,通常会降低吞吐量。如果需要获得公平性的 ArrayBlockingQueue,可采用如下代码:
private static ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(10,true);
1
1
LinkedBlockingQueue
LinkedBlockingQueue 底层基于单向链表实现的阻塞队列,可以当做无界队列也可以当做有界队列来使用,同样满足 FIFO 的特性,与 ArrayBlockingQueue 相比起来具有更高的吞吐量,为了防止 LinkedBlockingQueue 容量迅速增,损耗大量内存。通常在创建 LinkedBlockingQueue 对象时,会指定其大小,如果未指定,容量等于 Integer.MAX_VALUE 。
相关构造方法:
/**
*某种意义上的无界队列
* Creates a {@code LinkedBlockingQueue} with a capacity of
* {@link Integer#MAX_VALUE}.
*/
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
/**
*有界队列
* Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
*
* @param capacity the capacity of this queue
* @throws IllegalArgumentException if {@code capacity} is not greater
* than zero
*/
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
PriorityBlockingQueue
PriorityBlockingQueue 是一个支持优先级的无界阻塞队列。默认情况下元素采用自然顺序进行排序,也可以通过自定义类实现 compareTo() 方法来指定元素排序规则,或者初始化时通过构造器参数 Comparator 来指定排序规则。
PriorityBlockingQueue 并发控制采用的是可重入锁 ReentrantLock,队列为无界队列(ArrayBlockingQueue 是有界队列,LinkedBlockingQueue 也可以通过在构造函数中传入 capacity 指定队列最大的容量,但是 PriorityBlockingQueue 只能指定初始的队列大小,后面插入元素的时候,如果空间不够的话会自动扩容)。
简单地说,它就是 PriorityQueue 的线程安全版本。不可以插入 null 值,同时,插入队列的对象必须是可比较大小的(comparable),否则报 ClassCastException 异常。它的插入操作 put 方法不会 block,因为它是无界队列(take 方法在队列为空的时候会阻塞)。
推荐文章: 《解读 Java 并发队列 BlockingQueue》open in new window
ConcurrentSkipListMap
下面这部分内容参考了极客时间专栏《数据结构与算法之美》open in new window以及《实战 Java 高并发程序设计》。
为了引出 ConcurrentSkipListMap,先带着大家简单理解一下跳表。
对于一个单链表,即使链表是有序的,如果我们想要在其中查找某个数据,也只能从头到尾遍历链表,这样效率自然就会很低,跳表就不一样了。跳表是一种可以用来快速查找的数据结构,有点类似于平衡树。它们都可以对元素进行快速的查找。但一个重要的区别是:对平衡树的插入和删除往往很可能导致平衡树进行一次全局的调整。而对跳表的插入和删除只需要对整个数据结构的局部进行操作即可。这样带来的好处是:在高并发的情况下,你会需要一个全局锁来保证整个平衡树的线程安全。而对于跳表,你只需要部分锁即可。这样,在高并发环境下,你就可以拥有更好的性能。而就查询的性能而言,跳表的时间复杂度也是 O(logn) 所以在并发数据结构中,JDK 使用跳表来实现一个 Map。
跳表的本质是同时维护了多个链表,并且链表是分层的,
2级索引跳表
最低层的链表维护了跳表内所有的元素,每上面一层链表都是下面一层的子集。
跳表内的所有链表的元素都是排序的。查找时,可以从顶级链表开始找。一旦发现被查找的元素大于当前链表中的取值,就会转入下一层链表继续找。这也就是说在查找过程中,搜索是跳跃式的。如上图所示,在跳表中查找元素 18。
在跳表中查找元素18
查找 18 的时候原来需要遍历 18 次,现在只需要 7 次即可。针对链表长度比较大的时候,构建索引查找效率的提升就会非常明显。
从上面很容易看出,跳表是一种利用空间换时间的算法。
使用跳表实现 Map 和使用哈希算法实现 Map 的另外一个不同之处是:哈希并不会保存元素的顺序,而跳表内所有的元素都是排序的。因此在对跳表进行遍历时,你会得到一个有序的结果。所以,如果你的应用需要有序性,那么跳表就是你不二的选择。JDK 中实现这一数据结构的类是 ConcurrentSkipListMap。
5. Java多线程为什么使用线程池
1:提高效率 创建好一定数量的线程放在池中,等需要使用的时候就从池中拿一个,这要比需要的时候创建一个线程对象要快的多。
2:方便管理 可以编写线程池管理代码对池中的线程统一进行管理,比如说系统启动时由该程序创建100个线程,每当有请求的时候,就分配一个线程去工作, 如果刚好并发有101个请求,那多出的这一个请求可以排队等候,避免因无休止的创建线程导致系统崩溃
6. java中线程池如何管理多个线程
ExecutorService threadPoll = Executors.newCachedThreadPool(); /陵冲/创建线程池
threadPoll.execute(线程1);//执行线程一
线程池根据程序需求创建新线程乱简的,需求多时,创建的就多,需求少时,JVM自己会慢慢的释放掉多余的线哗汪裤程
不需求程序员去做什么,JVM自己会处理,程序员调用就行了..
7. Java 中几种常用的线程池
一:newCachedThreadPool
(1)缓存型池子,先查看池中有没有以前建立的线程,如果有,就reuse,如果没有,就建立一个新的线程加入池中;
(2)缓存型池子,通常用于执行一些生存周期很短的异步型任务;因此一些面向连接的daemon型server中用得不多;
(3)能reuse的线程,必须是timeout IDLE内的池中线程,缺省timeout是60s,超过这个IDLE时长,线程实例将被终止及移出池。
(4)注意,放入CachedThreadPool的线程不必担心其结束,超过TIMEOUT不活动,其会自动被终止
二:newFixedThreadPool
(1)newFixedThreadPool与cacheThreadPool差不多,也是能reuse就用,但不能随时建新的线程
(2)其独特之处胡清贺:任意时间点,最多只能有固定数目的活动线程存在,此时如果有新的线程要建立,只能放在另外的队列中等待,直到当前的线程中某个线程终止直接被移出池子
(3)和正歼cacheThreadPool不同,FixedThreadPool没有IDLE机制(可能也有,但既然文档没提,肯定非常长,类似依赖上层的TCP或UDP IDLE机制之类的),所以FixedThreadPool多数针对一些很裤派稳定很固定的正规并发线程,多用于服务器
(4)从方法的源代码看,cache池和fixed 池调用的是同一个底层池,只不过参数不同:
fixed池线程数固定,并且是0秒IDLE(无IDLE)
cache池线程数支持0-Integer.MAX_VALUE(显然完全没考虑主机的资源承受能力),60秒IDLE
三:ScheledThreadPool
(1)调度型线程池
(2)这个池子里的线程可以按schele依次delay执行,或周期执行
四:SingleThreadExecutor
(1)单例线程,任意时间池中只能有一个线程
(2)用的是和cache池和fixed池相同的底层池,但线程数目是1-1,0秒IDLE(无IDLE)
8. java线程池newCachedThreadPool怎么使用
你设定了10个线程,就可以跑10个线程,多于10个的时候会启用默认策略
9. 【Java基础】线程池的原理是什么
什么是线程池?
总归为:池化技术 ---》数据库连接池 缓存架构 缓存池 线程池 内存池,连接池,这种思想演变成缓存架构技术---> JDK设计思想有千丝万缕的联系
首先我们从最核心的ThreadPoolExecutor类中的方法讲起,然后再讲述它的实现原理,接着给出了它的使用示例,最后讨论了一下如何合理配置线程池的大小。
Java 中的 ThreadPoolExecutor 类
java.uitl.concurrent.ThreadPoolExecutor 类是线程池中最核心的一个类,因此如果要透彻地了解Java 中的线程池,必须先了解这个类。下面我们来看一下 ThreadPoolExecutor 类的具体实现源码。
在 ThreadPoolExecutor 类中提供了四个构造方法:
10. java 多线程并发请求数据,只要有一条线程获得数据,则其他线程终止运行,并打印出获得的数据
这边我写了一个例子,两个线程同时获取随机数,当获取的值为68的时候则停止所有进程。