用 [TOC]来生成目录:
Java实现mongoDB的数据导入TIDB 目录 一mongoDB的优缺点二TiDB的核心特性程序实现两个数据库的数据的转移 代码块优点: - 面向文档存储(类json数据模式简单而强大) - 动态查询 - 全索引支持,扩展到内部对象和内嵌数据 - MapResuce支持复杂聚合 - 高效存储二进制大对象(比如照片和视频) - 查询记录分析 - 内置GridFS,支持大容量的存储 - 复制和故障切换支持 - Auto-Sharding自动分片支持云级扩展性 缺点: - MongoDB没有如Mysql那样成熟的维护工具 - 不支持事务 - MongoDB占用空间过大 - 无法进行关联表查询,不适用于关系多的数据 - 复杂聚合操作通过MapReduce创建,速度慢 - 模式自由,自由灵活的文件存储格式带来的数据错误
该程序是将mongoDB的数据库中一个表的数据转移到TIDB
DBConnet类-用来实现mongodb的连接,例如:
import com.mongodb.MongoClient; import com.mongodb.client.FindIterable; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoCursor; import com.mongodb.client.MongoDatabase; import org.bson.Document; import java.sql.SQLException; import java.text.ParseException; import static com.mongodb.client.model.Filters.*; public class DBConnet { //获取今天开始的毫秒数 public long starttime = 1496246400; //获取今天结束的毫秒数 public long endtime= starttime + 86400; //查询结束的时间 public long end =1498838400; public void Connet() throws SQLException, ParseException { //mongoDB的相关连接操作 MongoClient mongoClient = new MongoClient("localhost", 27017); MongoDatabase database = mongoClient.getDatabase("test"); MongoCollection<Document> track = database.getCollection("test"); //mysql的连接 DBHelper db = new DBHelper(); //获取系统当前的毫秒数 while(endtime <= end) { MongoCursor<Document> mongoCursor = find(track); while (mongoCursor.hasNext()) { Document t = mongoCursor.next(); try { db.mosaic(t); } catch (SQLException e) { e.printStackTrace(); } } } db.close(); } public MongoCursor<Document> find(MongoCollection<Document> table) { //FindIterable<Document> t = table.find().skip(skip).limit(limit).batchSize(30); Document sortKey = new Document("rtime", 1); FindIterable<Document> t = table.find(and(gte("rtime", starttime), lte("rtime", endtime))).sort(sortKey); MongoCursor<Document> mongoCursor = t.iterator(); starttime = starttime + 86400; endtime = endtime + 86400; return mongoCursor; } }DBHelper类 –实现mysql的连接,例如:
import org.bson.Document; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.SQLException; public class DBHelper { public static final String url = "jdbc:mysql://127.0.0.1:3306/test"; public static final String name = "com.mysql.jdbc.Driver"; public static final String user = "root"; public static final String password = ""; public Connection conn = null; public PreparedStatement pst = null; public int COUNT =1; public String prefix ="insert into test (Id,field1,field2,field3,field4,field5,field6,field7,field8,field9,field10,field11,field12) values "; public StringBuilder suffix = new StringBuilder(); public String deletesql ="delete from wifinew where rtime < 1498838400 limit 10000"; public DBHelper() { try { Class.forName(name);//指定连接类型 conn = DriverManager.getConnection(url, user, password);//获取连接 conn.setAutoCommit(false); pst = conn.prepareStatement("");//准备执行语句 } catch (Exception e) { e.printStackTrace(); } } //批量添加的功能 public void mosaic(Document p) throws SQLException { if ( COUNT <= 100000 ) { suffix.append("('").append(p.getString("Id")).append("'," ).append(String.valueOf(p.getDouble("field1")) ).append("," ).append(String.valueOf(p.getDouble("field2")) ).append("," ).append("'"+p.getString("field3")+"'," ).append(p.getInteger("field4") ).append("," ).append(p.getInteger("field5") ).append("," ).append(p.getInteger("field6") ).append("," ).append("'" ).append(p.getString("field7") ).append("'," ).append("'" ).append(p.getString("field8")).append("'," ).append(p.getLong("field9") ).append("," ).append(p.getLong("field10") ).append("," ).append(p.getLong("field11") ).append("," ).append(p.getInteger("field12") ).append("),"); if (COUNT % 10000 == 0) { String sql = prefix + suffix.substring(0, suffix.length() - 1); pst.addBatch(sql); pst.executeBatch(); conn.commit(); suffix = new StringBuilder(); } COUNT++; }else { COUNT = 0; } } //批量删除的功能 public void delete() throws SQLException { pst.addBatch(deletesql); int[] result =pst.executeBatch(); conn.commit(); if (result.length >= 0){ delete(); }else{ close(); return; } } public void close() { try { this.conn.close(); this.pst.close(); } catch (SQLException e) { e.printStackTrace(); } } }test类 – 程序的主入口,例如:
import java.sql.SQLException; import java.text.ParseException; public class test { public static void main(String[] args) throws SQLException { //批量删除数据库 delete(); //add(); } public static void add(){ //连接到Db DBConnet DBConnet = new DBConnet(); try { DBConnet.Connet(); } catch (SQLException e) { e.printStackTrace(); } catch (ParseException e) { e.printStackTrace(); } } public static void delete() throws SQLException { DBHelper db = new DBHelper(); try { db.delete(); } catch (SQLException e) { e.printStackTrace(); } } }pom.xml的配置文档,例如:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>carl</groupId> <artifactId>xlx2_2biaoqian</artifactId> <version>1.0-SNAPSHOT</version> <properties> <mysql.version>5.1.24</mysql.version> </properties> <dependencies> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <dependency> <groupId>org.mongodb</groupId> <artifactId>mongo-java-driver</artifactId> <version>3.2.0</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>${mysql.version}</version> </dependency> </dependencies> </project>