concurrent复合操作问题

xiaoxiao2021-02-28  111

ConcurrentHashMap通常只被看做并发效率更高的Map,用来替换其他线程安全的Map容器,比如Hashtable和Collections.synchronizedMap。实际上,线程安全的容器,特别是Map,应用场景没有想象中的多,很多情况下一个业务会涉及容器的多个操作,即复合操作,并发执行时,线程安全的容器只能保证自身的数据不被破坏,但无法保证业务的行为是否正确。 举个例子:统计文本中单词出现的次数,把单词出现的次数记录到一个Map中,代码如下: [java]  view plain  copy  print ? private final Map<String, Long> wordCounts = new ConcurrentHashMap<>();      public long increase(String word) {       Long oldValue = wordCounts.get(word);       Long newValue = (oldValue == null) ? 1L : oldValue + 1;       wordCounts.put(word, newValue);       return newValue;   }   如果多个线程并发调用这个increase()方法,increase()的实现就是错误的,因为多个线程用相同的word调用时,很可能会覆盖相互的结果,造成记录的次数比实际出现的次数少。 除了用锁解决这个问题,另外一个选择是使用ConcurrentMap接口定义的方法: [java]  view plain  copy  print ? public interface ConcurrentMap<K, V> extends Map<K, V> {       V putIfAbsent(K key, V value);       boolean remove(Object key, Object value);       boolean replace(K key, V oldValue, V newValue);       V replace(K key, V value);   }   这是个被很多人忽略的接口,也经常见有人错误地使用这个接口。ConcurrentMap接口定义了几个基于 CAS(Compare and Set)操作,很简单,但非常有用,下面的代码用ConcurrentMap解决上面问题:

[java]  view plain  copy  print ? private final ConcurrentMap<String, Long> wordCounts = new ConcurrentHashMap<>();      public long increase(String word) {       Long oldValue, newValue;       while (true) {           oldValue = wordCounts.get(word);           if (oldValue == null) {               // Add the word firstly, initial the value as 1               newValue = 1L;               if (wordCounts.putIfAbsent(word, newValue) == null) {                   break;               }           } else {               newValue = oldValue + 1;               if (wordCounts.replace(word, oldValue, newValue)) {                   break;               }           }       }       return newValue;   }  

代码有点复杂,主要因为ConcurrentMap中不能保存value为null的值,所以得同时处理word不存在和已存在两种情况。

上面的实现每次调用都会涉及Long对象的拆箱和装箱操作,很明显,更好的实现方式是采用AtomicLong,下面是采用AtomicLong后的代码:

[java]  view plain  copy  print ? private final ConcurrentMap<String, AtomicLong> wordCounts = new ConcurrentHashMap<>();      public long increase(String word) {       AtomicLong number = wordCounts.get(word);       if (number == null) {           AtomicLong newNumber = new AtomicLong(0);           number = wordCounts.putIfAbsent(word, newNumber);           if (number == null) {               number = newNumber;           }       }       return number.incrementAndGet();   }  

这个实现仍然有一处需要说明的地方,如果多个线程同时增加一个目前还不存在的词,那么很可能会产生多个newNumber对象,但最终只有一个newNumber有用,其他的都会被扔掉。对于这个应用,这不算问题,创建AtomicLong的成本不高,而且只在添加不存在词是出现。但换个场景,比如缓存,那么这很可能就是问题了,因为缓存中的对象获取成本一般都比较高,而且通常缓存都会经常失效,那么避免重复创建对象就有价值了。下面的代码演示了怎么处理这种情况:

[java]  view plain  copy  print ? private final ConcurrentMap<String, Future<ExpensiveObj>> cache = new ConcurrentHashMap<>();      public ExpensiveObj get(final String key) {       Future<ExpensiveObj> future = cache.get(key);       if (future == null) {           Callable<ExpensiveObj> callable = new Callable<ExpensiveObj>() {               @Override               public ExpensiveObj call() throws Exception {                   return new ExpensiveObj(key);               }           };           FutureTask<ExpensiveObj> task = new FutureTask<>(callable);              future = cache.putIfAbsent(key, task);           if (future == null) {               future = task;               task.run();           }       }          try {           return future.get();       } catch (Exception e) {           cache.remove(key);           throw new RuntimeException(e);       }   }  

解决方法其实就是用一个Proxy对象来包装真正的对象,跟常见的lazy load原理类似;使用FutureTask主要是为了保证同步,避免一个Proxy创建多个对象。注意,上面代码里的异常处理是不准确的。

最后再补充一下,如果真要实现前面说的统计单词次数功能,最合适的方法是Guava包中AtomicLongMap;一般使用ConcurrentHashMap,也尽量使用Guava中的MapMaker或cache实现。

AtomicLongMap是Google Guava项目的一个类,它是线程安全、支持并发访问的。 Guava 是一个 Google 的基于java1.6的类库集合的扩展项目,包括 collections, caching, primitives support, concurrency libraries, common annotations, string processing, I/O, 等等

[java]  view plain  copy  print ? public class GuavaTest {       //来自于Google的Guava项目       AtomicLongMap<String> map = AtomicLongMap.create(); //线程安全,支持并发          Map<String, Integer> map2 = new HashMap<String, Integer>(); //线程不安全       ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); //为map2增加并发锁          ConcurrentHashMap<String, Integer> map3 = new ConcurrentHashMap<String, Integer>(); //线程安全,但也要注意使用方式          private int taskCount = 100;       CountDownLatch latch = new CountDownLatch(taskCount); //新建倒计时计数器,设置state为taskCount变量值          public static void main(String[] args) {           GuavaTest t = new GuavaTest();           t.test();       }          private void test(){           //启动线程           for(int i=1; i<=taskCount; i++){               Thread t = new Thread(new MyTask("key"100));               t.start();           }              try {               //等待直到state值为0,再继续往下执行               latch.await();           } catch (InterruptedException e) {               e.printStackTrace();           }              System.out.println("##### AtomicLongMap #####");           for(String key : map.asMap().keySet()){               System.out.println(key + ": " + map.get(key));           }              System.out.println("##### HashMap #####");           for(String key : map2.keySet()){               System.out.println(key + ": " + map2.get(key));           }              System.out.println("##### ConcurrentHashMap #####");           for(String key : map3.keySet()){               System.out.println(key + ": " + map3.get(key));           }       }          class MyTask implements Runnable{           private String key;           private int count = 0;              public MyTask(String key, int count){               this.key = key;               this.count = count;           }              @Override           public void run() {               try {                   for(int i=0; i<count; i++){                       map.incrementAndGet(key); //key值自增1后,返回该key的值                          //对map2添加写锁,可以解决线程并发问题                       lock.writeLock().lock();                       try{                           if(map2.containsKey(key)){                               map2.put(key, map2.get(key)+1);                           }else{                               map2.put(key, 1);                           }                       }catch(Exception ex){                           ex.printStackTrace();                       }finally{                           lock.writeLock().unlock();                       }                          //虽然ConcurrentHashMap是线程安全的,但是以下语句块不是整体同步,导致ConcurrentHashMap的使用存在并发问题                       if(map3.containsKey(key)){                           map3.put(key, map3.get(key)+1);                       }else{                           map3.put(key, 1);                       }                          //TimeUnit.MILLISECONDS.sleep(50); //线程休眠50毫秒                   }                  } catch (Exception e) {                   e.printStackTrace();               } finally {                   latch.countDown(); //state值减1               }           }       }      }   执行结果如下: AtomicLongMap key: 10000 HashMap key: 10000 ConcurrentHashMap key: 9311
转载请注明原文地址: https://www.6miu.com/read-24988.html

最新回复(0)