吃透并发编程之—-Atomic原子类学习思考

Java
229
0
0
2024-01-21
标签   Java并发

什么是原子类?有何用

不可分割

一个操作是不可中断的,即使并发下

juc.atomic包下,都是原子特性的

原子类和锁都是为了保证并发下的线程安全,不过原子类相比于锁,有一定的优势:

***粒度更细,将竞争的粒度缩小到变量级别。

***效率高:在竞争不是很高的情况下:原子类的效率往往比锁的效率高。

第二个优势怎么说,。为何原子类在高度竞争的时候,效率会降低

因为原子操作利用了自旋锁和CAS算法,当并发高的时候

发生冲突的情况会大大增加 (也就是存在大量更新时去比较预期的值发生了变化,导致此次更新失效的情况),因此效率会大大降低

6类原子类

image-20230905201927512

Atomic*基本原子类

有AtomicInteger AtomicLong AtomicBoolean

以AtomicInteger为例

以CAS技术保障原子性

public final int get()//获取当前的值

public final int getAndSet(int newValue)//获取当前的值,并设置新的值

public final int getAndIncrement()//获取当前的值,并自增

public final int getAndDecrement()//获取当前的值,并自减

public fianl int getAndAdd(int delta)//获取当前的值,并加上预期的值

boolean compareAndSet(int expect,int update)//如果当前的值等于预期值,则以原子的方式设置为输入的的update值,典型的CAS技术

演示

/**
 * @Author:Joseph
 * 演示AtomicInteger的基本用法 对比非原子类的线程安全问题
 * 使用了原子类之后不需要加锁,也能保障线程安全
 */
public class AtomicIntegerDemo1 implements Runnable{
    private static final AtomicInteger atomicInteger = new AtomicInteger();
    public void incrementAtomic(){
//        atomicInteger.getAndDecrement();
//        atomicInteger.getAndIncrement()
        atomicInteger.getAndAdd(2);
    }
    private static volatile int basicCount = 0;
    public void incremeBasic(){
        basicCount++;
    }
​
    public static void main(String[] args) throws InterruptedException {
        AtomicIntegerDemo1 r = new AtomicIntegerDemo1();
        Thread thread1 = new Thread(r);
        Thread thread2 = new Thread(r);
        thread1.start();
        thread2.start();
        thread1.join();
        thread2.join();
        System.out.println("原子类的结果:"+atomicInteger.getAcquire());
        System.out.println("普通变量的结果:"+basicCount);
​
    }
    @Override
    public void run() {
        for (int i = 0; i < 10000; i++) {
            incremeBasic();
            incrementAtomic();
        }
    }
}

Atomic*Array数组类型原子类

场景:比如做账系统,财务管理,并发修改,数字特别多,就可以通过AtomicArray来保障

例子

/**
 * @Author:Joseph
 * 演示原子数组的使用方法
 */
public class AtomicArrayDemo {
​
    public static void main(String[] args) throws InterruptedException {
​
        AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(1000);
        Incrementer incrementer = new Incrementer(atomicIntegerArray);
        Decrementer decrementer = new Decrementer(atomicIntegerArray);
        Thread[] threadsIncrementer = new Thread[100];
        Thread[] threadsDecrementer = new Thread[100];
        for (int i = 0; i < 100; i++) {
            threadsDecrementer[i] = new Thread(decrementer);
            threadsIncrementer[i] = new Thread(incrementer);
            threadsDecrementer[i].start();
            threadsIncrementer[i].start();
        }
        for (int i = 0; i < 100; i++) {
            threadsDecrementer[i].join();
            threadsIncrementer[i].join();
        }
​
        for (int i = 0; i < atomicIntegerArray.length(); i++) {
            if(atomicIntegerArray.get(i)!=0){
                System.out.println("发现了错误:"+i);
            }
        }
        System.out.println("运行结束");
    }
}
class Decrementer implements Runnable{
​
    private  AtomicIntegerArray array;
​
    public Decrementer(AtomicIntegerArray array) {
        this.array = array;
    }
​
    @Override
    public void run() {
        for (int i = 0; i < array.length(); i++) {
            array.getAndDecrement(i);
        }
    }
}
class Incrementer implements Runnable{
​
    private  AtomicIntegerArray array;
​
    public Incrementer(AtomicIntegerArray array) {
        this.array = array;
    }
​
    @Override
    public void run() {
        for (int i = 0; i < array.length(); i++) {
            array.getAndIncrement(i);
        }
    }
}

Atomic*Reference引用类型原子类

比如我在锁那篇文章讲自旋锁的时候,就用到了引用类型原子类AtomicReference

public class SpinLock {
​
    private AtomicReference<Thread> sign = new AtomicReference<>();
​
    public void lock(){
        //获取当前线程的引用
        Thread current = Thread.currentThread();
        //通过compareAndSet命令,只有当线程为null,才设置当前线程,达到加锁目的
        while (!sign.compareAndSet(null,current)){
            System.out.println("自旋获取失败,再次尝试");
        }
    }
​
    public void unlock(){
        Thread current = Thread.currentThread();
        //解锁的时候,会判断有线程,才会设置为null,从而实现解锁 
        sign.compareAndSet(current,null);
    }
​
    public static void main(String[] args) throws InterruptedException {
        SpinLock spinLock = new SpinLock();
        Runnable runnable = ()->{
            System.out.println(Thread.currentThread().getName()+"尝试获取自旋锁");
            spinLock.lock();
            System.out.println(Thread.currentThread().getName()+"获取到了自旋锁");
            try {
                Thread.sleep(300);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                spinLock.unlock();
                System.out.println(Thread.currentThread().getName()+"释放自旋锁");
            }
        };
        Thread thread1 = new Thread(runnable, "线程1");
        Thread thread2 = new Thread(runnable, "线程2");
        thread1.start();
        thread2.start();
​
    }
​
}

看下它内部的逻辑

image-20230905210434970

可以看到没有像AtomicInteger那么多的API,主要就是这个compareAndSet方法

image-20230905210557294

这里的注释,大致意思就是如果预期值和旧的值一样的话,就更新新的值,标准的CAS操作,

既然讲到了CAS操作,我的并发专栏里也专门讲到了这个,本质是通过cpu的原子指令,来完成比较再set的操作,

这里我通过synchronized来完成目的,这个synchronized是模拟原子性的,cpu执行这个的时候,指令是原子性,不可分割

public class EqualCAS {
    private int value;
    
    public synchronized  boolean compareAndSet(int expectedValue,int newValue){
        if(value == expectedValue){
            value = newValue;
            return true;
        }
        return false;
    }
}

普通变量升级具有原子功能

场景是这样的,如果一个类被定义好了,但是普通变量不具备原子性的,且类已经被定义好,不方便再进行改变,

就可通过AtomicIntegerFiledUpdater

比如一个业务。很少需要原子的get-set操作,就可以这样升级,而不用每个类都嵌入一个原子类型。减少资源占用,

用法,代码演示:

public class AtomicIntegerFieldUpdaterDemo implements Runnable {
​
    static Candidate tom;
    static Candidate peter;
​
    public static AtomicIntegerFieldUpdater<Candidate> socreUpdater = AtomicIntegerFieldUpdater.newUpdater(Candidate.class,"score");
​
    @Override
    public void run() {
        for (int i = 0; i < 10000; i++) {
            peter.score++;
            socreUpdater.getAndIncrement(tom);
        }
    }
​
    public static class Candidate{
        volatile  int score;
    }
​
    public static void main(String[] args) throws InterruptedException {
        tom = new Candidate();
        peter = new Candidate();
        AtomicIntegerFieldUpdaterDemo r = new AtomicIntegerFieldUpdaterDemo();
        Thread thread = new Thread(r);
        thread.start();
        Thread thread3 = new Thread(r);
        thread3.start();
        thread.join();
        thread3.join();
        System.out.println("普通变量:"+peter.score);
        System.out.println("升级变量:"+tom.score);
    }
}

注意点

修饰的变量必须是可见的,也就是不能用private修饰

也不能是static修饰,因为static在初始化的时候已经定义到方法区了

这个类使用的时候需要使用类名,filed,这个和反射很相似,对,他背后也是通过反射来做的,一定要注意这两点

非常重要的Adder累加器

java8引入的,相对较新的一个类,

高并发下,LongAdder比AtmoicLong效率高,不过本质是空间换时间

竞争激烈的时候,Long 改,降低了冲突的概率,是多段锁的理念,提高了并发性

这里演示一个例子,目的为了展示Atomic在多线程下,性能会有瓶颈,每一次加法,都要flush和refresh

**
 * @Author:Joseph
 *  演示高并发场景下,LongAdder比AtomicLong性能好
 */
public class AtomicLongDemo {
    public static void main(String[] args) throws InterruptedException {
        AtomicLong counter = new AtomicLong(0);
        long start = System.currentTimeMillis();
        ExecutorService service = Executors.newFixedThreadPool(20);
        for (int i = 0; i < 10000; i++) {
            service.submit(new Task(counter));
        }
        service.shutdown();
        while (!service.isTerminated()){
        }
        long end = System.currentTimeMillis();
        System.out.println(counter.get());
        System.out.println("AtomicLong耗时"+(end - start));
    }
​
    private static class Task implements Runnable{
        private AtomicLong counter;
​
        public Task(AtomicLong counter) {
            this.counter = counter;
        }
​
        @Override
        public void run() {
            for (int i = 0; i < 10000; i++) {
                counter.incrementAndGet();
            }
        }
    }
}
/**
 * @Author:Joseph
 *  演示高并发场景下,LongAdder比AtomicLong性能好
 */
public class LongAdderDemo {
    public static void main(String[] args) throws InterruptedException {
        LongAdder counter = new LongAdder();
        long start = System.currentTimeMillis();
        ExecutorService service = Executors.newFixedThreadPool(20);
        for (int i = 0; i < 10000; i++) {
            service.submit(new Task(counter));
        }
        service.shutdown();
        while (!service.isTerminated()){
        }
        long end = System.currentTimeMillis();
        System.out.println(counter.sum());
        System.out.println("LongAdder耗时"+(end - start));
    }
​
    private static class Task implements Runnable{
        private LongAdder counter;
​
        public Task(LongAdder counter) {
            this.counter = counter;
        }
​
        @Override
        public void run() {
            for (int i = 0; i < 10000; i++) {
                counter.increment();
            }
        }
    }
}
​

自己运行一下

看到了吧,差这么多,就是因为AtomicLong每次加法都要flush和refresh

什么是flush和refresh???

这个我在JMM那个文章讲过

Atomic在每次加的时候要通过flush到主存,然后其他线程refresh到工作内存,这就浪费了性能

而LongAdder采用了分段锁的思想,并发量高的时候,每个线程在自己的cell中加数据,最后再汇总

它是这样搞的:

LongAdder有两个变量,base变量作为正常的累加值,并发量不高的时候直接在这里加

还有一个cell【】数组,并发搞的时候会把线程分散累加到自己的槽cell【i】中,通过hash值,为每个线程分配一个cell,用空间换时间的思想

sum源码分析

image-20230907192933896

as为null,就直接return sum。不为null。就遍历数组加到sum再返回

这个过程是不加锁的,所有就会有安全问题,累加的时候,可能被更改,造成不准确的问题,

这样是LongAdder的 一个不好的地方。

场景

image-20230907193128497

Accumlator累加器

Accumlator与Adder类似,就是一个更通用版本的Adder

public class LongAccumlatorDemo {
    public static void main(String[] args) {
        //x+y可以换成x*y,max(x,y)等等,就很方便
        LongAccumulator accumulator = new LongAccumulator((x, y) -> x + y, 100);
        ExecutorService service = Executors.newFixedThreadPool(8);
        IntStream.range(1,10).forEach(i->accumulator.accumulate(i));
        service.shutdown();
        while (!service.isTerminated()){}
​
        System.out.println(accumulator.getThenReset());
    }
}

适合场景:

适合大量的计算,需要通过并行方式提高效率的计算

但是要注意线程的先后不影响原先逻辑的情况下