Java并发编程 – CopyOnWrite容器类

Java
249
0
0
2023-06-18
标签   Java并发

前言

当我们对List进行遍历的时候,如果list被修改了会抛出java.util.ConcurrentModificationException错误。那么有没有办法在遍历一个list的时候,还向list中添加元素呢?办法是有的,我们可以使用java.util.concurrent包中的CopyOnWriteArrayList。

CopyOnWrite容器

CopyOnWrite容器即写时复制的容器。通俗的理解是 当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器,然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。 这样做的好处是我们可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器。

CopyOnWrite并发容器用于读多写少的并发场景。比如白名单,黑名单等场景。比如白名单,黑名单,商品类目的访问和更新场景,假如我们有一个搜索网站,用户在这个网站的搜索框中,输入关键字搜索内容,但是某些关键字不允许被搜索。这些不能被搜索的关键字会被放在一个黑名单当中,黑名单每天晚上更新一次。当用户搜索时,会检查当前关键字在不在黑名单当中,如果在,则提示不能搜索。

CopyOnWrite的缺点

CopyOnWrite容器有很多优点,但是同时也存在两个问题,即内存占用问题和数据一致性问题。

1.内存占用问题 。因为CopyOnWrite的写时复制机制,所以在进行写操作的时候,内存里会同时驻扎两个对象的内存,旧的对象和新写入的对象(注意:在复制的时候只是复制容器里的引用,只是在写的时候会创建新对象添加到新容器里,而旧容器的对象还在使用,所以有两份对象内存)。如果这些对象占用的内存比较大,比如说200M左右,那么再写入100M数据进去,内存就会占用300M,那么这个时候很有可能造成频繁的Yong GC和Full GC。之前我们系统中使用了一个服务由于每晚使用CopyOnWrite机制更新大对象,造成了每晚15秒的Full GC,应用响应时间也随之变长。

针对内存占用问题,可以通过压缩容器中的元素的方法来减少大对象的内存消耗,比如,如果元素全是10进制的数字,可以考虑把它压缩成36进制或64进制。或者不使用CopyOnWrite容器,而使用其他的并发容器,如ConcurrentHashMap。

2.数据一致性问题 。CopyOnWrite容器只能保证数据的最终一致性,不能保证数据的实时一致性。所以如果你希望写入的的数据,马上能读到,请不要使用CopyOnWrite容器。 【当执行add或remove操作没完成时,get获取的仍然是旧数组的元素】

CopyOnWriteArrayList

CopyOnWriteArrayList是ArrayList的线程安全版本,从他的名字可以推测,CopyOnWriteArrayList是在有写操作的时候会copy一份数据,然后写完再设置成新的数据。CopyOnWriteArrayList适用于读多写少的并发场景,CopyOnWriteArraySet是线程安全版本的Set实现,它的内部通过一个CopyOnWriteArrayList来代理读写等操作,使得CopyOnWriteArraySet表现出了和CopyOnWriteArrayList一致的并发行为,他们的区别在于数据结构模型的不同,set不允许多个相同的元素插入容器中,具体的细节将在下文中分析。

CopyOnWriteArrayList类图

上面的图片展示你了CopyOnWriteArrayList的类图,可以看到它实现了List接口,如果去看ArrayList的类图的话,可以发现也是实现了List接口,也就得出一句废话,ArrayList提供的api,CopyOnWriteArrayList也提供,下文中来分析CopyOnWriteArrayList是如何来做到线程安全的实现读写数据的,而且也会顺便对比ArrayList的等效实现为什么不支持线程安全的。下面首先展示了CopyOnWriteArrayList中比较重要的成员:

 /** The lock protecting all mutators */    final transient ReentrantLock lock = new ReentrantLock();
 /** The array, accessed only via getArray/setArray. */    private transient volatile Object[] array; 

可以看到,CopyOnWriteArrayList使用了ReentrantLock来支持并发操作,array就是实际存放数据的数组对象。ReentrantLock是一种支持重入的独占锁,任意时刻只允许一个线程获得锁,所以可以安全的并发去写数组,关于java中锁的细节,可以参考文章Java可重入锁详解()。接下来看一下CopyOnWriteArrayList是如何使用这个lock来实现并发写的,下面首先展示了add方法的代码:

 public boolean add(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock(); //上锁,只允许一个线程进入
    try {
        Object[] elements = getArray(); // 获得当前数组对象
        int len = elements.length;
        Object[] newElements = Arrays.copyOf(elements, len +);//拷贝到一个新的数组中
        newElements[len] = e;//插入数据元素
        setArray(newElements);//将新的数组对象设置回去
        return true;
    } finally {
        lock.unlock();//释放锁
    }
} 

为了对比ArrayList,下面展示了ArrayList中的add方法的细节:

 public boolean add(E e) {
    ensureCapacityInternal(size +);  // Increments modCount!!
    elementData[size++] = e;
    return true;
}
 private void ensureCapacityInternal(int minCapacity) {
    if (elementData == DEFAULTCAPACITY_EMPTY_ELEMENTDATA) {
        minCapacity = Math.max(DEFAULT_CAPACITY, minCapacity);
    }
     ensureExplicitCapacity(minCapacity);
}

private void ensureExplicitCapacity(int minCapacity) {
    modCount++;
     // overflow-conscious code
    if (minCapacity - elementData.length >)
        grow(minCapacity);
}

private void grow(int minCapacity) {
    // overflow-conscious code
    int oldCapacity = elementData.length;
    int newCapacity = oldCapacity + (oldCapacity >>);
    if (newCapacity - minCapacity <)
        newCapacity = minCapacity;
    if (newCapacity - MAX_ARRAY_SIZE >)
        newCapacity = hugeCapacity(minCapacity);
    // minCapacity is usually close to size, so this is a win:
    elementData = Arrays.copyOf(elementData, newCapacity);
}         

相比CopyOnWriteArrayList,ArrayList的add方法实现就显得啰嗦的多,而且ArrayList并不支持线程安全,至于为什么不支持线程安全,看代码就知道了,这几个调用的方法中都没有类似锁(与锁等效语义的组件)出现。下面再来看另一个版本的add方法:

 public void add(int index, E element) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        if (index > len || index <)
            throw new IndexOutOfBoundsException("Index: "+index+
                                                ", Size: "+len);
        Object[] newElements;
        int numMoved = len - index;
        if (numMoved ==)
            newElements = Arrays.copyOf(elements, len +);
        else {
            newElements = new Object[len +];
            System.arraycopy(elements,, newElements, 0, index);
            System.arraycopy(elements, index, newElements, index +,
                             numMoved);
        }
        newElements[index] = element;
        setArray(newElements);
    } finally {
        lock.unlock();
    }
} 

在操作之前都是先lock住的,这里面有一个有意思的地方,因为该方法可以指定index来插入value,如果这个index位置上已经有旧值,那么该方法的作用类似replace,如果该index为当前数组的长度,那么该方法和上面分析的add方法等效,现在分析一下index位置上已经有值的情况,会分为两段copy,然后在中间设置新值。现在来分析一下读操作,下面是get方法的细节:

 public E get(int index) {
    return get(getArray(), index);
}
 private E get(Object[] a, int index) {
    return (E) a[index];
} 

可以发现是非常简单的,而且读是允许多个线程进入的。下面来分析一下CopyOnWriteArrayList提供的迭代器。下面是两个重要的变量:

 /** Snapshot of the array */        private final Object[] snapshot;
/** Index of element to be returned by subsequent call to next.  */        private int cursor; 

遍历的时候 首先会获得当前数组对象的一个拷贝,称为快照 ,然后遍历的操作会在该快照上进行,那如果获取了迭代器之后再对CopyOnWriteArrayList进行写操作会怎么样?迭代器能感知到这种变化吗?下面实际实验一下:

 CopyOnWriteArrayList<String> copyOnWriteArrayList = new CopyOnWriteArrayList<>();
 copyOnWriteArrayList.add("first");
copyOnWriteArrayList.add("second");
 Iterator<String> iterator = copyOnWriteArrayList.iterator();
 copyOnWriteArrayList.add("third");
 while (iterator.hasNext()) {
    System.out.println(iterator.next());
}

//output:      
first
second 

结果是不能感知,也就是说,这个快照并不会和外界有任何联系,某个线程在获取迭代器的时候就会拷贝一份,或者说,每一个线程都将获得当前时刻的一个快照,所以不需要加锁就可以安全的实现遍历,下面的代码也证实了上面的说法:

 public Iterator<E> iterator() {
    return new COWIterator<E>(getArray(),);
} 

CopyOnWriteArraySet

CopyOnWriteArraySet使用一个CopyOnWriteArrayList来做代理,它的所有api都是依赖于CopyOnWriteArrayList来实现的,下面的代码也展示了这种代理的事实:

 private final CopyOnWriteArrayList<E> al;
 /**
 * Creates an empty set.
 */    public CopyOnWriteArraySet() {
    al = new CopyOnWriteArrayList<E>();
} 

下面来分析一下CopyOnWriteArraySet的写操作实现,比如add方法:

 public boolean add(E e) {
    return al.addIfAbsent(e);
}

public boolean addIfAbsent(E e) {
    Object[] snapshot = getArray();
    return indexOf(e, snapshot,, snapshot.length) >= 0 ? false :
        addIfAbsent(e, snapshot);
}

private boolean addIfAbsent(E e, Object[] snapshot) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] current = getArray();
        int len = current.length;
        if (snapshot != current) {
            // Optimize for lost race to another addXXX operation
            int common = Math.min(snapshot.length, len);
            for (int i =; i < common; i++)
                if (current[i] != snapshot[i] && eq(e, current[i]))
                    return false;
            if (indexOf(e, current, common, len) >=)
                    return false;
        }
        Object[] newElements = Arrays.copyOf(current, len +);
        newElements[len] = e;
        setArray(newElements);
        return true;
    } finally {
        lock.unlock();
    }
}     

set是一种不允许有重复元素的简单数据结构,所以和CopyOnWriteArrayList不同,CopyOnWriteArraySet需要add在插入新元素的时候多做一些判断,而CopyOnWriteArraySet在实现上使用了CopyOnWriteArrayList的addIfAbsent方法,这个方法的意思就是如果存在就不再插入,如果不存在再进行插入。

使用案例

例子1(多线程操作CopyOnWriteArrayList):

 import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

public class CopyOnWriteArrayListTest { public static void main(String[] args) throws InterruptedException {
    final CopyOnWriteArrayList<String> copyOnWriteArrayList = new CopyOnWriteArrayList<String>();
    Thread t = new Thread(new Runnable() {
        int count =;
         @Override
        public void run() {
            while (true) {
                try {
                    Thread.currentThread().sleep();
                }catch (InterruptedException e){
                 }
                copyOnWriteArrayList.add(count++ + "");
            }
        }
    },"thread-write");
     t.start();
    Thread t = new Thread(new Runnable() {
        @Override
        public void run() {
            while(true){
                try {
                    Thread.currentThread().sleep();
                } catch (InterruptedException e) {
                }
                Iterator<String> iterator = copyOnWriteArrayList.iterator();
                List<String> temp = new ArrayList<>();
                while (iterator.hasNext()){
                    temp.add(iterator.next());
                }
                System.out.println(Thread.currentThread().getName() + ", list的内容:" + temp);
            }
        }
    },"thread-read");
    t.start();
  }
} 

输出结果:

 thread-read, list的内容:[, 1, 2, 3]
thread-read, list的内容:[, 1, 2, 3, 4, 5, 6, 7, 8]
thread-read, list的内容:[, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13]
thread-read, list的内容:[, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18]
thread-read, list的内容:[, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23]
thread-read, list的内容:[, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28] 

例子2(创建CopyOnWriteMap类和用它支持黑名单服务):

import java.util.Collection;
import java.util.Map;
import java.util.Set;
 
public class CopyOnWriteMap<K, V> implements Map<K, V>, Cloneable { private volatile Map<K, V> internalMap;
    public CopyOnWriteMap() 
    {
        internalMap = new HashMap<K, V>();
    }
    public V put(K key, V value) {
        synchronized (this) {
            Map<K, V> newMap = new HashMap<K, V>(internalMap);
            V val = newMap.put(key, value);
            internalMap = newMap;
            return val;
        }
    }
    public V get(Object key) {
        return internalMap.get(key);
    }
    public void putAll(Map<? extends K, ? extends V> newData) {
        synchronized (this) {
            Map<K, V> newMap = new HashMap<K, V>(internalMap);
            newMap.putAll(newData);
            internalMap = newMap;
        }
    }
}
    
public class BlackListServiceImpl {   
    private static CopyOnWriteMap<String, Boolean> blackListMap = new CopyOnWriteMap<String, Boolean>();
    public static boolean isBlackList(String id) {
        return blackListMap.get(id) == null ? false : true;
    }
    public static void addBlackList(String id) {
        blackListMap.put(id, Boolean.TRUE);
    }
    /**
    * 批量添加黑名单
    *
    * @param ids
    */    
    public static void addBlackList(Map<String,Boolean> ids) {
        blackListMap.putAll(ids);
    }

} 

———————————————————-