什么是原子类?有何用
不可分割
一个操作是不可中断的,即使并发下
juc.atomic包下,都是原子特性的
原子类和锁都是为了保证并发下的线程安全,不过原子类相比于锁,有一定的优势:
***粒度更细,将竞争的粒度缩小到变量级别。
***效率高:在竞争不是很高的情况下:原子类的效率往往比锁的效率高。
第二个优势怎么说,。为何原子类在高度竞争的时候,效率会降低
因为原子操作利用了自旋锁和CAS算法,当并发高的时候
发生冲突的情况会大大增加 (也就是存在大量更新时去比较预期的值发生了变化,导致此次更新失效的情况),因此效率会大大降低
6类原子类
Atomic*基本原子类
有AtomicInteger AtomicLong AtomicBoolean
以AtomicInteger为例
以CAS技术保障原子性
public final int get()//获取当前的值
public final int getAndSet(int newValue)//获取当前的值,并设置新的值
public final int getAndIncrement()//获取当前的值,并自增
public final int getAndDecrement()//获取当前的值,并自减
public fianl int getAndAdd(int delta)//获取当前的值,并加上预期的值
boolean compareAndSet(int expect,int update)//如果当前的值等于预期值,则以原子的方式设置为输入的的update值,典型的CAS技术
演示
/** | |
* @Author:Joseph | |
* 演示AtomicInteger的基本用法 对比非原子类的线程安全问题 | |
* 使用了原子类之后不需要加锁,也能保障线程安全 | |
*/ | |
public class AtomicIntegerDemo1 implements Runnable{ | |
private static final AtomicInteger atomicInteger = new AtomicInteger(); | |
public void incrementAtomic(){ | |
// atomicInteger.getAndDecrement(); | |
// atomicInteger.getAndIncrement() | |
atomicInteger.getAndAdd(2); | |
} | |
private static volatile int basicCount = 0; | |
public void incremeBasic(){ | |
basicCount++; | |
} | |
| |
public static void main(String[] args) throws InterruptedException { | |
AtomicIntegerDemo1 r = new AtomicIntegerDemo1(); | |
Thread thread1 = new Thread(r); | |
Thread thread2 = new Thread(r); | |
thread1.start(); | |
thread2.start(); | |
thread1.join(); | |
thread2.join(); | |
System.out.println("原子类的结果:"+atomicInteger.getAcquire()); | |
System.out.println("普通变量的结果:"+basicCount); | |
| |
} | |
public void run() { | |
for (int i = 0; i < 10000; i++) { | |
incremeBasic(); | |
incrementAtomic(); | |
} | |
} | |
} |
Atomic*Array数组类型原子类
场景:比如做账系统,财务管理,并发修改,数字特别多,就可以通过AtomicArray来保障
例子
/** | |
* @Author:Joseph | |
* 演示原子数组的使用方法 | |
*/ | |
public class AtomicArrayDemo { | |
| |
public static void main(String[] args) throws InterruptedException { | |
| |
AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(1000); | |
Incrementer incrementer = new Incrementer(atomicIntegerArray); | |
Decrementer decrementer = new Decrementer(atomicIntegerArray); | |
Thread[] threadsIncrementer = new Thread[100]; | |
Thread[] threadsDecrementer = new Thread[100]; | |
for (int i = 0; i < 100; i++) { | |
threadsDecrementer[i] = new Thread(decrementer); | |
threadsIncrementer[i] = new Thread(incrementer); | |
threadsDecrementer[i].start(); | |
threadsIncrementer[i].start(); | |
} | |
for (int i = 0; i < 100; i++) { | |
threadsDecrementer[i].join(); | |
threadsIncrementer[i].join(); | |
} | |
| |
for (int i = 0; i < atomicIntegerArray.length(); i++) { | |
if(atomicIntegerArray.get(i)!=0){ | |
System.out.println("发现了错误:"+i); | |
} | |
} | |
System.out.println("运行结束"); | |
} | |
} | |
class Decrementer implements Runnable{ | |
| |
private AtomicIntegerArray array; | |
| |
public Decrementer(AtomicIntegerArray array) { | |
this.array = array; | |
} | |
| |
public void run() { | |
for (int i = 0; i < array.length(); i++) { | |
array.getAndDecrement(i); | |
} | |
} | |
} | |
class Incrementer implements Runnable{ | |
| |
private AtomicIntegerArray array; | |
| |
public Incrementer(AtomicIntegerArray array) { | |
this.array = array; | |
} | |
| |
public void run() { | |
for (int i = 0; i < array.length(); i++) { | |
array.getAndIncrement(i); | |
} | |
} | |
} |
Atomic*Reference引用类型原子类
比如我在锁那篇文章讲自旋锁的时候,就用到了引用类型原子类AtomicReference
public class SpinLock { | |
| |
private AtomicReference<Thread> sign = new AtomicReference<>(); | |
| |
public void lock(){ | |
//获取当前线程的引用 | |
Thread current = Thread.currentThread(); | |
//通过compareAndSet命令,只有当线程为null,才设置当前线程,达到加锁目的 | |
while (!sign.compareAndSet(null,current)){ | |
System.out.println("自旋获取失败,再次尝试"); | |
} | |
} | |
| |
public void unlock(){ | |
Thread current = Thread.currentThread(); | |
//解锁的时候,会判断有线程,才会设置为null,从而实现解锁 | |
sign.compareAndSet(current,null); | |
} | |
| |
public static void main(String[] args) throws InterruptedException { | |
SpinLock spinLock = new SpinLock(); | |
Runnable runnable = ()->{ | |
System.out.println(Thread.currentThread().getName()+"尝试获取自旋锁"); | |
spinLock.lock(); | |
System.out.println(Thread.currentThread().getName()+"获取到了自旋锁"); | |
try { | |
Thread.sleep(300); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
}finally { | |
spinLock.unlock(); | |
System.out.println(Thread.currentThread().getName()+"释放自旋锁"); | |
} | |
}; | |
Thread thread1 = new Thread(runnable, "线程1"); | |
Thread thread2 = new Thread(runnable, "线程2"); | |
thread1.start(); | |
thread2.start(); | |
| |
} | |
| |
} |
看下它内部的逻辑
可以看到没有像AtomicInteger那么多的API,主要就是这个compareAndSet方法
这里的注释,大致意思就是如果预期值和旧的值一样的话,就更新新的值,标准的CAS操作,
既然讲到了CAS操作,我的并发专栏里也专门讲到了这个,本质是通过cpu的原子指令,来完成比较再set的操作,
这里我通过synchronized来完成目的,这个synchronized是模拟原子性的,cpu执行这个的时候,指令是原子性,不可分割
public class EqualCAS { | |
private int value; | |
public synchronized boolean compareAndSet(int expectedValue,int newValue){ | |
if(value == expectedValue){ | |
value = newValue; | |
return true; | |
} | |
return false; | |
} | |
} |
普通变量升级具有原子功能
场景是这样的,如果一个类被定义好了,但是普通变量不具备原子性的,且类已经被定义好,不方便再进行改变,
就可通过AtomicIntegerFiledUpdater
比如一个业务。很少需要原子的get-set操作,就可以这样升级,而不用每个类都嵌入一个原子类型。减少资源占用,
用法,代码演示:
public class AtomicIntegerFieldUpdaterDemo implements Runnable { | |
| |
static Candidate tom; | |
static Candidate peter; | |
| |
public static AtomicIntegerFieldUpdater<Candidate> socreUpdater = AtomicIntegerFieldUpdater.newUpdater(Candidate.class,"score"); | |
| |
@Override | |
public void run() { | |
for (int i = 0; i < 10000; i++) { | |
peter.score++; | |
socreUpdater.getAndIncrement(tom); | |
} | |
} | |
| |
public static class Candidate{ | |
volatile int score; | |
} | |
| |
public static void main(String[] args) throws InterruptedException { | |
tom = new Candidate(); | |
peter = new Candidate(); | |
AtomicIntegerFieldUpdaterDemo r = new AtomicIntegerFieldUpdaterDemo(); | |
Thread thread = new Thread(r); | |
thread.start(); | |
Thread thread3 = new Thread(r); | |
thread3.start(); | |
thread.join(); | |
thread3.join(); | |
System.out.println("普通变量:"+peter.score); | |
System.out.println("升级变量:"+tom.score); | |
} | |
} |
注意点
修饰的变量必须是可见的,也就是不能用private修饰
也不能是static修饰,因为static在初始化的时候已经定义到方法区了
这个类使用的时候需要使用类名,filed,这个和反射很相似,对,他背后也是通过反射来做的,一定要注意这两点
非常重要的Adder累加器
java8引入的,相对较新的一个类,
高并发下,LongAdder比AtmoicLong效率高,不过本质是空间换时间
竞争激烈的时候,Long 改,降低了冲突的概率,是多段锁的理念,提高了并发性
这里演示一个例子,目的为了展示Atomic在多线程下,性能会有瓶颈,每一次加法,都要flush和refresh
** | |
* | :Joseph|
* 演示高并发场景下,LongAdder比AtomicLong性能好 | |
*/ | |
public class AtomicLongDemo { | |
public static void main(String[] args) throws InterruptedException { | |
AtomicLong counter = new AtomicLong(0); | |
long start = System.currentTimeMillis(); | |
ExecutorService service = Executors.newFixedThreadPool(20); | |
for (int i = 0; i < 10000; i++) { | |
service.submit(new Task(counter)); | |
} | |
service.shutdown(); | |
while (!service.isTerminated()){ | |
} | |
long end = System.currentTimeMillis(); | |
System.out.println(counter.get()); | |
System.out.println("AtomicLong耗时"+(end - start)); | |
} | |
| |
private static class Task implements Runnable{ | |
private AtomicLong counter; | |
| |
public Task(AtomicLong counter) { | |
this.counter = counter; | |
} | |
| |
public void run() { | |
for (int i = 0; i < 10000; i++) { | |
counter.incrementAndGet(); | |
} | |
} | |
} | |
} | |
/** | |
* @Author:Joseph | |
* 演示高并发场景下,LongAdder比AtomicLong性能好 | |
*/ | |
public class LongAdderDemo { | |
public static void main(String[] args) throws InterruptedException { | |
LongAdder counter = new LongAdder(); | |
long start = System.currentTimeMillis(); | |
ExecutorService service = Executors.newFixedThreadPool(20); | |
for (int i = 0; i < 10000; i++) { | |
service.submit(new Task(counter)); | |
} | |
service.shutdown(); | |
while (!service.isTerminated()){ | |
} | |
long end = System.currentTimeMillis(); | |
System.out.println(counter.sum()); | |
System.out.println("LongAdder耗时"+(end - start)); | |
} | |
| |
private static class Task implements Runnable{ | |
private LongAdder counter; | |
| |
public Task(LongAdder counter) { | |
this.counter = counter; | |
} | |
| |
public void run() { | |
for (int i = 0; i < 10000; i++) { | |
counter.increment(); | |
} | |
} | |
} | |
} | |
|
自己运行一下
看到了吧,差这么多,就是因为AtomicLong每次加法都要flush和refresh
什么是flush和refresh???
这个我在JMM那个文章讲过
Atomic在每次加的时候要通过flush到主存,然后其他线程refresh到工作内存,这就浪费了性能
而LongAdder采用了分段锁的思想,并发量高的时候,每个线程在自己的cell中加数据,最后再汇总
它是这样搞的:
LongAdder有两个变量,base变量作为正常的累加值,并发量不高的时候直接在这里加
还有一个cell【】数组,并发搞的时候会把线程分散累加到自己的槽cell【i】中,通过hash值,为每个线程分配一个cell,用空间换时间的思想
sum源码分析
as为null,就直接return sum。不为null。就遍历数组加到sum再返回
这个过程是不加锁的,所有就会有安全问题,累加的时候,可能被更改,造成不准确的问题,
这样是LongAdder的 一个不好的地方。
场景
Accumlator累加器
Accumlator与Adder类似,就是一个更通用版本的Adder
public class LongAccumlatorDemo { | |
public static void main(String[] args) { | |
//x+y可以换成x*y,max(x,y)等等,就很方便 | |
LongAccumulator accumulator = new LongAccumulator((x, y) -> x + y, 100); | |
ExecutorService service = Executors.newFixedThreadPool(8); | |
IntStream.range(1,10).forEach(i->accumulator.accumulate(i)); | |
service.shutdown(); | |
while (!service.isTerminated()){} | |
| |
System.out.println(accumulator.getThenReset()); | |
} | |
} |
适合场景:
适合大量的计算,需要通过并行方式提高效率的计算
但是要注意线程的先后不影响原先逻辑的情况下