多线程读取文本并解析插入到数据库

xiaoxiao2021-02-28  65

package com.dragonsoft.extract.support; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /**  *  * @author:Yul  * @date:2017年8月30日  */ public class ThreadPool {     private static int DEFAULT_POOL_NUMBER = 8;     private static ThreadPool instance = new ThreadPool();     private ExecutorService insertDataPool;     private ExecutorService readDataPool;     private ThreadPool() {        // TODO Auto-generated constructor stub        insertDataPool = Executors.newFixedThreadPool(DEFAULT_POOL_NUMBER);        readDataPool = Executors.newFixedThreadPool(DEFAULT_POOL_NUMBER);     }     public static ThreadPool getInstance(){        return instance;     }     public void dispatchInsertDataThread(Runnable command){        insertDataPool.execute(command);     }     public void dispatchReadDataThread(Runnable command){        readDataPool.execute(command);     } } package com.dragonsoft.extract.support; import java.util.concurrent.ConcurrentLinkedQueue; public class DataContainer {     private boolean ending = false;     private int currCount = 0;     private int _queueCapacity;     private ConcurrentLinkedQueue<Object> _queue = new ConcurrentLinkedQueue<Object>();     public DataContainer() {        this(20);//默认大小     }     public DataContainer(int queueCapacity) {        this._queueCapacity = queueCapacity;     }     public synchronized void insertData(Object data) throws InterruptedException{        while (currCount > _queueCapacity) {            wait();        }        currCount++;        _queue.add(data);        notifyAll();     }     public synchronized Object getData() throws InterruptedException{        while (currCount <= 0) {            if(!ending){                wait();            }else{                break;            }        }        currCount--;        Object results = _queue.poll();        notifyAll();        if (results == null || results instanceof Integer) {            this.ending = true;        }        return results;     } } package com.dragonsoft.extract.support; public abstract class EtlConstants {     public final static Integer EXTRACT_END_FLAG = -1; } package com.dragonsoft.extract.service; import java.io.File; import java.util.concurrent.CountDownLatch; import com.dragonsoft.extract.support.DataContainer; /**  *  * @author:Yul  * @date:2017年8月30日  */ public class ReadDataService extends AbstractReadDataRunnable {     public ReadDataService(File srcFile, DataContainer dataContainer, CountDownLatch countDownLatch) {         super(srcFile, dataContainer, countDownLatch);         // TODO Auto-generated constructor stub     } } package com.dragonsoft.extract.service; import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.List; import java.util.concurrent.CountDownLatch; import org.apache.commons.lang.StringUtils; import com.dragonsoft.extract.Startup; import com.dragonsoft.extract.support.DataContainer; /**  *  * @author:Yul  * @date:2017年8月30日  */ public class InsertDataService extends AbstractInsertDataRunnable {     public InsertDataService(String batchInsertSQL, DataContainer dataContainer,             CountDownLatch countDownLatch) {         super(batchInsertSQL, dataContainer, countDownLatch);         // TODO Auto-generated constructor stub     }     public void setValueToPstmt(PreparedStatement pstmt, List<String> rows) throws SQLException{         for (String row : rows) {             int index = 1;             String[] fields = StringUtils.split(row, " ");             int fieldIndex = 0;             if(fields.length!=4){                 Startup.errorLog.add(row);             }else{                 pstmt.setString(index++, fields[fieldIndex++]);                 pstmt.setString(index++, fields[fieldIndex++]);                 pstmt.setString(index++, fields[fieldIndex++]);                 pstmt.setString(index++, fields[fieldIndex++]);                 pstmt.addBatch();             }         }     } } package com.dragonsoft.extract.service; import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; import com.dragonsoft.extract.support.DataContainer; import com.dragonsoft.extract.support.EtlConstants; /**  *  * @author:Yul  * @date:2017年8月30日  */ public abstract class AbstractReadDataRunnable implements Runnable {     private CountDownLatch countDownLatch;     private DataContainer dataContainer;     private File srcFile;     private static int BLOCK_SIZE = 5000;     public AbstractReadDataRunnable(File srcFile, DataContainer dataContainer, CountDownLatch countDownLatch) {         this.srcFile = srcFile;         this.dataContainer = dataContainer;         this.countDownLatch = countDownLatch;     }     @Override     public void run() {         FileReader fr = null;         BufferedReader bf = null;         try {             fr  = new FileReader(this.srcFile);             bf = new BufferedReader(fr);             String line = null;             int rowNum = 0;             List rows = new ArrayList(BLOCK_SIZE);             while ((line = bf.readLine()) != null) {                 rowNum++;                 rows.add(line);                 if (rowNum % BLOCK_SIZE == 0) {                     dataContainer.insertData(rows);                     rows = new ArrayList(BLOCK_SIZE);                 }             }             if (rows.size() > 0) {                 dataContainer.insertData(rows);             }         }  catch (Exception e) {             e.printStackTrace();         } finally {             insertEndFlag(this.dataContainer);             if (countDownLatch != null) {                 countDownLatch.countDown();             }             if (bf != null) {                 try {                     bf.close();                 } catch (IOException e) {                     e.printStackTrace();                 }             }             if (fr != null) {                 try {                     fr.close();                 } catch (IOException e) {                     e.printStackTrace();                 }             }         }     }     private void insertEndFlag(DataContainer data){         try {             data.insertData(EtlConstants.EXTRACT_END_FLAG);         } catch (InterruptedException e) {             e.printStackTrace();         }     } } package com.dragonsoft.extract.service; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.List; import java.util.concurrent.CountDownLatch; import com.dragonsoft.extract.support.DataContainer; import com.dragonsoft.extract.util.ConnectionPool; /**  *  * @author:Yul  * @date:2017年8月30日  */ public abstract class AbstractInsertDataRunnable implements Runnable {     private CountDownLatch countDownLatch;     private DataContainer dataContainer;     private Connection connection;     private String batchInsertSQL;     public AbstractInsertDataRunnable(String batchInsertSQL, DataContainer dataContainer, CountDownLatch countDownLatch) {         this.countDownLatch = countDownLatch;         this.connection = ConnectionPool.getInstance().getConnection();         this.batchInsertSQL = batchInsertSQL;         this.dataContainer = dataContainer;     }     @Override     public void run() {         try {             while (true) {                 Object rows = this.dataContainer.getData();                 if (rows == null || rows instanceof Integer) {                     break;                 } else {                     batchInsertData(connection, (List)rows);                 }             }         } catch (InterruptedException e) {             e.printStackTrace();         } finally {             if (countDownLatch != null) {                 countDownLatch.countDown();             }             if (connection != null) {                 try {                     connection.close();                 } catch (SQLException e) {                     e.printStackTrace();                 }             }         }     }     private void batchInsertData(Connection connection, List rows) {         PreparedStatement pstmt = null;         try {             connection.setAutoCommit(false);             pstmt = connection.prepareStatement(batchInsertSQL);             setValueToPstmt(pstmt, rows);             pstmt.executeBatch();             connection.commit();         } catch (SQLException ex) {             ex.printStackTrace();         } finally {             if (pstmt != null) {                 try {                     pstmt.close();                 } catch (SQLException e) {                     e.printStackTrace();                 }             }         }     }     protected abstract void setValueToPstmt(PreparedStatement pstmt, List<String> rows) throws SQLException; } package com.dragonsoft.extract.util; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; public abstract class ConnectionHelper {     public static Connection getTargetConnection() throws ClassNotFoundException, SQLException {         Class.forName("com.mysql.jdbc.Driver");         return DriverManager.getConnection("jdbc:mysql://192.168.1.181:3306/db1?useUnicode=true&characterEncoding=GBK", "root", "dragon");     } } package com.dragonsoft.extract.util; import java.beans.PropertyVetoException; import java.sql.Connection; import java.sql.SQLException; import javax.sql.DataSource; import com.mchange.v2.c3p0.ComboPooledDataSource; public class ConnectionPool {     private static final String URL = "jdbc:mysql://192.168.1.181:3306/db1?useUnicode=true&characterEncoding=GBK";     private static final String USER_NAME = "root";     private static final String PASSWORD = "dragon";     private DataSource dataSource;     private static ConnectionPool pool = new ConnectionPool();     private ConnectionPool() {         try {             dataSource = createDataSource(URL, USER_NAME, PASSWORD);         } catch (Exception e) {             // TODO: handle exception             e.printStackTrace();         }     }     public static final ConnectionPool getInstance() {         return pool;     }     public synchronized final Connection getConnection() {         try {             return dataSource.getConnection();         } catch (SQLException e) {             e.printStackTrace();         }         return null;     }     private DataSource createDataSource(String jdbcUrl, String user, String password) throws PropertyVetoException{         ComboPooledDataSource dataSource = new ComboPooledDataSource();         dataSource.setJdbcUrl(jdbcUrl);         dataSource.setUser(user);         dataSource.setPassword(password);         dataSource.setDriverClass("com.mysql.jdbc.Driver");         dataSource.setMaxPoolSize(100);         dataSource.setMinPoolSize(10);                dataSource.setInitialPoolSize(10);         dataSource.setAcquireIncrement(5);                return dataSource;     } } package com.dragonsoft.extract.util; import java.util.ArrayList; import java.util.List; public abstract class StringHelper {     public static String joinString(String... args){         StringBuilder sb = new StringBuilder();         for (int i = 0; i < args.length; i++) {             sb.append(args[i]);         }         return sb.toString();     }     public static String stringRightFil(String strTemp, int iLength, String strFil) {         StringBuilder sbCommand = new StringBuilder();         if (strTemp == null) {             strTemp = "";         }         strTemp = strTemp.trim();         if (strTemp.length() < iLength) {             for (int i = 0; i < strTemp.length(); i++) {                 sbCommand.append(strTemp.charAt(i));             }             for (int j = 0; j < iLength - strTemp.length(); j++) {                 sbCommand.append(strFil);             }         } else if (strTemp.length() > iLength) {             int iPont = strTemp.length() - iLength;             sbCommand.append(strTemp.substring(iPont, strTemp.length()));         } else if (strTemp.length() == iLength) {             sbCommand.append(strTemp);         }         return sbCommand.toString();     }     /**      *      * @param minuend 被减数字符数组      * @param subtractor 减数字符数组      * @return minuend - subtractor 的结果集      */     public static String[] stringDiffer(String[] minuend,String[] subtractor){         List<String> results = new ArrayList<String>();         List<String> subLists = new ArrayList<String>();//减数         List<String> minLists = new ArrayList<String>();//被减数         for(String s:subtractor){             subLists.add(s);         }         for(String s:minuend){             minLists.add(s);         }         for(String s:minLists){             if(!subLists.contains(s)){                 results.add(s);             }         }         return results.toArray(new String[results.size()]);     } } package com.dragonsoft.extract; import java.io.File; import java.io.FileFilter; import java.io.FileWriter; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; import org.apache.commons.lang.StringUtils; import com.dragonsoft.extract.service.InsertDataService; import com.dragonsoft.extract.service.ReadDataService; import com.dragonsoft.extract.support.DataContainer; import com.dragonsoft.extract.support.ThreadPool; public class Startup {     private static final String DATA_DIR = "E:/temp/log";     public static List errorLog = new ArrayList();     public static void main(String[] args) {         try {             String dirName = DATA_DIR;             File dir = new File(dirName);                        File[] dataFiles = dir.listFiles(new FileFilter() {                 @Override                 public boolean accept(File pathname) {                     if (StringUtils.indexOf(pathname.getName(), ".txt") != -1) {                         return true;                     }                     return false;                 }             });             for (File file : dataFiles) {                 //1.启动请求日志和结果日志数据ETL线程                 System.out.println("startEtl starting....");                 startEtl(file);                 System.out.println("startEtl end....");                 //2.写错误日志到txt                 writeErrorLogs("E:/temp/errorLog/"+file.getName()+"_error.txt");             }         } catch (Exception e) {             e.printStackTrace();         }     }     private static void startEtl(File log){         try {             DataContainer dataContainer = new DataContainer();             final CountDownLatch countDownLatch = new CountDownLatch(5);             ThreadPool.getInstance().dispatchReadDataThread(new ReadDataService(log, dataContainer, countDownLatch));             ThreadPool.getInstance().dispatchInsertDataThread(new InsertDataService(getBatchInsertSQL(), dataContainer, countDownLatch));             ThreadPool.getInstance().dispatchInsertDataThread(new InsertDataService(getBatchInsertSQL(), dataContainer, countDownLatch));             ThreadPool.getInstance().dispatchInsertDataThread(new InsertDataService(getBatchInsertSQL(), dataContainer, countDownLatch));             ThreadPool.getInstance().dispatchInsertDataThread(new InsertDataService(getBatchInsertSQL(), dataContainer, countDownLatch));             countDownLatch.await();         } catch (Exception e) {             e.printStackTrace();         }     }     private static String getBatchInsertSQL(){         return " insert into  t_log_requester1 (ID, SENDER_ID, RECEIVE_ID, RECEIVE_TIME)  values (?,?,?,?) ";     }     private static void writeErrorLogs(String path){         for (Object log : errorLog) {             appendErrorLogsToTXT(path, (String)log);         }         errorLog.clear();     }     private static void appendErrorLogsToTXT(String path,String content){         try {             FileWriter output = new FileWriter(path,true);             output.write("\n");             output.write(content);             output.close();         } catch (Exception e) {             System.out.println("write to file error");         }     } }
转载请注明原文地址: https://www.6miu.com/read-76040.html

最新回复(0)