开始:
一直以来mongodb的同步驱动mongo-java-driver,为了提高性能,WriteConcern设置成了NORMAL模式(3.0以后更名为UNACKNOWLEGE),这意味着驱动把数据写入socket就立马返回成功,mongodb端有任何异常是捕获不到的,也有人称之为“fire and forget”。那么问题来了:数据可靠性为0,鬼知道数据有没有存储成功!
为了提高数据可靠性,其实也很简单:调高WriteConcern的级别就行。具体什么级别,根据业务确定。下面给出WriteConcern的定义:
WriteConcern的定义:
Ø WriteConcern.NORMAL:仅抛出网络错误异常,没有服务器错误异常
Ø WriteConcern.SAFE:抛出网络错误异常、服务器错误异常;并等待服务器完成写操作。
Ø WriteConcern.MAJORITY: 抛出网络错误异常、服务器错误异常;并等待集群中一半以上服务器完成写操作。
Ø WriteConcern.FSYNC_SAFE:抛出网络错误异常、服务器错误异常;写操作等待服务器将数据刷新到磁盘。
Ø WriteConcern.JOURNAL_SAFE:抛出网络错误异常、服务器错误异常;写操作等待服务器提交到磁盘的日志文件。
Ø WriteConcern.REPLICAS_SAFE:抛出网络错误异常、服务器错误异常;等待至少2台服务器完成写操作。
于是:
我将WriteConcern级别调成SAFE模式:
该模式下,数据会写到内存并返回,每隔100ms将数据刷到journal日志,宕机重启时会优先从journal日志恢复数据。理论上,SAFE模式下最多会丢失100ms的数据。
问题来了:
改完WriteConcern,一测性能,下降了1/3左右,这怎么可以忍!!!然后就开始解决性能问题,出发点是,通过其它方法提高性能,一通查资料,就发现了mongodb的异步驱动(心里美滋滋,浪里个浪~~~)。心想异步坑定快啊(允许丢失极少数据,但是需要知道丢了哪个。而之前的NORMAL模式做不到),就算出现异常,我可以稍后捕获。于是搞了一个新组合:WriteConcern.SAFE+mongodb-async-driver。理论上,不管从哪个角度考虑,新组合都优于原有的组合:WriteConcern.NORMAL+mongo-java-driver。
但是:
对,‘但是’来了。测试中发现新组合确实快,但是数据丢了,扔给我一个找不到答案的异常,,,
描述: 单线程循环插入1000条数据,结果只有600左右插入成功,异常如下: com.mongodb.MongoWaitQueueFullException: Too many operations are already waiting for a server. Max number of operations (maxWaitQueueSize) of 500 has been exceeded. 猜测: 每条插入命令,都会占用一个线程,导致等待获取connection的线程超过了限制(默认500)。 疑问: 为什么有那么多线程?如果是每个插入操作都开启一个线程,那根本没法实际应用,难道没有一种管控策略吗?
测试代码:
public static ExecutorService executorService = Executors.newFixedThreadPool(10); public static CountDownLatch gloabLatch = new CountDownLatch(1); public static volatile long id = 0; public static void main(String[] args) { // To directly connect to the default server localhost on port 27017 MongoClient mongoClient = null; try { mongoClient = MongoClients.create(); //获取数据库 MongoDatabase database = mongoClient.getDatabase("mydb"); MongoCollection<Document> col = database.getCollection("mycollection"); TestAsyncDriver test = new TestAsyncDriver(); test.testInsert(col); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * 控制线程 * @param col */ private void testInsert(MongoCollection<Document> col) { int insertNum = 1000; Runnable runner = new InsertThread(col, insertNum); long startTime = System.currentTimeMillis(); System.out.println("开始时间:" + (startTime) + "ms"); executorService.execute(runner); while(true) { if (id>=(insertNum-1)) { executorService.shutdownNow(); break; } } System.out.println("id=" + id); long endTime = System.currentTimeMillis(); System.out.println("所有回调完成耗时:" + (endTime - startTime) + "ms"); } /** * 插入线程 * @author huqi * */ class InsertThread implements Runnable { private final MongoCollection<Document> col; private int insertNum; public InsertThread(MongoCollection<Document> col, int insertNum) { this.col = col; this.insertNum = insertNum; } private synchronized long addId() { return id++; } public void run() { try { long startTime = System.currentTimeMillis(); for (int i=0; i< insertNum; i++) { //插入数据 Document doc = new Document("name", "MongoDB") .append("type", "database") .append("id", i) .append("info", new Document("x", 203).append("y", 102)); col.insertOne(doc, new SingleResultCallback<Void>(){ @Override public void onResult(final Void result, final Throwable t) { addId(); if (t != null) { System.out.println("t=" + t); } } }); } long endTime = System.currentTimeMillis(); System.out.println("所有插入命令调用完成,耗时:" + (endTime - startTime) + "ms"); } catch (Exception e) { e.printStackTrace(); } } }