代码有点复杂,主要因为ConcurrentMap中不能保存value为null的值,所以得同时处理word不存在和已存在两种情况。
上面的实现每次调用都会涉及Long对象的拆箱和装箱操作,很明显,更好的实现方式是采用AtomicLong,下面是采用AtomicLong后的代码: [java] view plain copy print ? private final ConcurrentMap<String, AtomicLong> wordCounts = new ConcurrentHashMap<>(); public long increase(String word) { AtomicLong number = wordCounts.get(word); if (number == null) { AtomicLong newNumber = new AtomicLong(0); number = wordCounts.putIfAbsent(word, newNumber); if (number == null) { number = newNumber; } } return number.incrementAndGet(); }这个实现仍然有一处需要说明的地方,如果多个线程同时增加一个目前还不存在的词,那么很可能会产生多个newNumber对象,但最终只有一个newNumber有用,其他的都会被扔掉。对于这个应用,这不算问题,创建AtomicLong的成本不高,而且只在添加不存在词是出现。但换个场景,比如缓存,那么这很可能就是问题了,因为缓存中的对象获取成本一般都比较高,而且通常缓存都会经常失效,那么避免重复创建对象就有价值了。下面的代码演示了怎么处理这种情况:
[java] view plain copy print ? private final ConcurrentMap<String, Future<ExpensiveObj>> cache = new ConcurrentHashMap<>(); public ExpensiveObj get(final String key) { Future<ExpensiveObj> future = cache.get(key); if (future == null) { Callable<ExpensiveObj> callable = new Callable<ExpensiveObj>() { @Override public ExpensiveObj call() throws Exception { return new ExpensiveObj(key); } }; FutureTask<ExpensiveObj> task = new FutureTask<>(callable); future = cache.putIfAbsent(key, task); if (future == null) { future = task; task.run(); } } try { return future.get(); } catch (Exception e) { cache.remove(key); throw new RuntimeException(e); } }解决方法其实就是用一个Proxy对象来包装真正的对象,跟常见的lazy load原理类似;使用FutureTask主要是为了保证同步,避免一个Proxy创建多个对象。注意,上面代码里的异常处理是不准确的。
最后再补充一下,如果真要实现前面说的统计单词次数功能,最合适的方法是Guava包中AtomicLongMap;一般使用ConcurrentHashMap,也尽量使用Guava中的MapMaker或cache实现。
AtomicLongMap是Google Guava项目的一个类,它是线程安全、支持并发访问的。 Guava 是一个 Google 的基于java1.6的类库集合的扩展项目,包括 collections, caching, primitives support, concurrency libraries, common annotations, string processing, I/O, 等等
[java] view plain copy print ? public class GuavaTest { //来自于Google的Guava项目 AtomicLongMap<String> map = AtomicLongMap.create(); //线程安全,支持并发 Map<String, Integer> map2 = new HashMap<String, Integer>(); //线程不安全 ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); //为map2增加并发锁 ConcurrentHashMap<String, Integer> map3 = new ConcurrentHashMap<String, Integer>(); //线程安全,但也要注意使用方式 private int taskCount = 100; CountDownLatch latch = new CountDownLatch(taskCount); //新建倒计时计数器,设置state为taskCount变量值 public static void main(String[] args) { GuavaTest t = new GuavaTest(); t.test(); } private void test(){ //启动线程 for(int i=1; i<=taskCount; i++){ Thread t = new Thread(new MyTask("key", 100)); t.start(); } try { //等待直到state值为0,再继续往下执行 latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("##### AtomicLongMap #####"); for(String key : map.asMap().keySet()){ System.out.println(key + ": " + map.get(key)); } System.out.println("##### HashMap #####"); for(String key : map2.keySet()){ System.out.println(key + ": " + map2.get(key)); } System.out.println("##### ConcurrentHashMap #####"); for(String key : map3.keySet()){ System.out.println(key + ": " + map3.get(key)); } } class MyTask implements Runnable{ private String key; private int count = 0; public MyTask(String key, int count){ this.key = key; this.count = count; } @Override public void run() { try { for(int i=0; i<count; i++){ map.incrementAndGet(key); //key值自增1后,返回该key的值 //对map2添加写锁,可以解决线程并发问题 lock.writeLock().lock(); try{ if(map2.containsKey(key)){ map2.put(key, map2.get(key)+1); }else{ map2.put(key, 1); } }catch(Exception ex){ ex.printStackTrace(); }finally{ lock.writeLock().unlock(); } //虽然ConcurrentHashMap是线程安全的,但是以下语句块不是整体同步,导致ConcurrentHashMap的使用存在并发问题 if(map3.containsKey(key)){ map3.put(key, map3.get(key)+1); }else{ map3.put(key, 1); } //TimeUnit.MILLISECONDS.sleep(50); //线程休眠50毫秒 } } catch (Exception e) { e.printStackTrace(); } finally { latch.countDown(); //state值减1 } } } } 执行结果如下: AtomicLongMap key: 10000 HashMap key: 10000 ConcurrentHashMap key: 9311