多线程实现ftp 文件下载

xiaoxiao2025-09-13  26

1 需求:

某个接口的纪录在ftp 服务器上,以类别/日期/时间.来存放文件,而一天可能会产生几百个文件,需要下载文件进行保存

问题:

1. 这个时候如果同时,要拿几个类别,某个时间段的数据,就要疯狂下载了,如果是单线程的

2. ftp 一般只允许同一个用户名,同时有几个进程连接

3. ftp 多个读取文件循环读取的时候,经常会出现,第一个文件得到字节流,而后面的文件的字节流都是null, 不管是sun的ftp,还是apache得common得ftp 包

4. 读取完文件,多线程如何通知,或者得到结果

解决办法:

1. 第一个问题,使用多线程去爬取,先取出符合条件得文件名,然后放到文件名集合,然后取根据文件名集合下载文件

2. 第二个问题,给每个线程都有一个连接

3.第三个问题: ,读取完一个文件,就关流,并执行

completePending 方法,使连接可以持续读取文件,不然读取一个文件后,读取其他文件就一直获取null, ftp.enablePassiveMode(true);//被动模式,读取文件时有时候ftp 时区和本地时区不一样,读取文件之前,需要将连接设置为被动模式,不然有时也会报错

4.使用 Executors 类 和 Future来获得通知,并使用

future.get(); 方法,这个方法会阻塞当前线程,直至线程执行完毕

具体代码:

package com.my; import java.io.BufferedReader; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.*; import org.dom4j.*; import sun.net.ftp.FtpClient; import sun.net.ftp.FtpProtocolException; public class FtpUtil { public static void main(String[] args) { long startTime = System.currentTimeMillis(); List list = history("201810230000","201810231200"); long endTime = System.currentTimeMillis(); System.out.println((endTime - startTime) + "毫秒"); System.out.println(list.size()); } public static FtpClient connectFTP(String url, int port, String username, String password) { //创建ftp FtpClient ftp = null; try { //创建地址 SocketAddress addr = new InetSocketAddress(url, port); //连接 ftp = FtpClient.create(); ftp.connect(addr); //登陆 ftp.login(username, password.toCharArray()); ftp.setBinaryType(); } catch (FtpProtocolException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } return ftp; } public static void close (FtpClient ftp) { if(ftp != null) { try { ftp.close(); } catch (IOException e) { e.printStackTrace(); } } } // ======================================== 对 需要 的文件编译的字符串进行 解析成Map ======================================================== //对时间进行转换 转换为美东时间 // 找出对应的日期的文件有哪些 //对四个顶层目录,分别遍历查找对应时间的文件有哪些,将这些文件名放入 对应的四个map中 // 对每个map 中的文件,分别进行解析,并放入map,然后将map 放入list //最后将四个list 放入list //顶层目录有四个 private static String [] parentFile = {"文件夹1","文件夹2","文件夹3", "文件夹4"}; private static final String url = "ftp网址"; private static final int port = 23; private static final String username = "用户名"; private static final String password = "密码"; /** * 时间为UTC-4 美东时间,与北京时间相差8个小时 * @param fromDate * @param toDate */ public static List history(String fromDate,String toDate) { FtpClient ftp = FtpUtil.connectFTP(url, port, username, password);//获得ftp 连接 List<List> list = new ArrayList<>(); List<List> nameLists = new ArrayList<>(); if(ftp.isConnected() && ftp.isLoggedIn()) { for (int i = 0; i < parentFile.length; i++) { String filePath = "/"; filePath = filePath + parentFile[i] +"/" + fromDate.substring(0,8); List<String> nameList = getChildFileName(filePath, ftp, fromDate.substring(0,12), toDate.substring(0,12)); if(nameList != null && nameList.size() > 0) { nameLists.add(nameList); } } } close(ftp); if(nameLists != null && nameLists.size() > 0) { for (List nameList: nameLists ) { FtpUtil ftpUtil = new FtpUtil(); List<Future<List<Map<String, String>>>> futureList = ftpUtil.threadSearch(nameList,10); List<Map<String, String>> childList = new ArrayList<>(); list.add(childList); for (Future<List<Map<String, String>>> future : futureList) { try { System.out.println(future.isDone() + "---" + future.toString()); List<Map<String, String>> reList = future.get(); System.out.println(future.toString() + "---单个线程所得的结果---" + reList.size()); if (reList != null) { childList.addAll(reList); } System.out.println(future.isDone() + "---" + future.toString()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } } } return list; } /** * 根据父文件名,获得列出其下的文件的字节流 * 读取字节流的同时,根据 开始时间和结束时间判断是否是需要的文件, * 将需要的文件放入list中 * @param ftpFile * @param ftp * @param fromDate * @param toDate * @return */ public static List<String> getChildFileName(String ftpFile,FtpClient ftp,String fromDate,String toDate) { long startTime = Long.valueOf(fromDate); long endTime = Long.valueOf(toDate); InputStream is = null; List<String> list = new ArrayList<>(); try { // 获取ftp上的文件 is = ftp.nameList(ftpFile); if(is != null) { BufferedReader in = new BufferedReader(new InputStreamReader(is,"UTF-8")); while (true) { String line = in.readLine(); if (line == null) { break; } else { long dateNumber = Long.valueOf(line.substring(line.lastIndexOf("/") + 1,line.length()).replace(".xml","")); if(startTime <= dateNumber && dateNumber <= endTime ) { list.add(line); } } } in.close(); } } catch (FtpProtocolException | IOException e) { e.printStackTrace(); } finally { try { if(is!=null){ is.close(); } } catch (IOException e) { e.printStackTrace(); } } return list; } public static String parseXmlFile(String ftpFile,FtpClient ftp) { InputStream is = null; StringBuffer sbf = new StringBuffer(); ftp.enablePassiveMode(true);//被动模式 try { // 获取ftp上的文件 is = ftp.getFileStream(ftpFile); if (is != null) { BufferedReader in = new BufferedReader(new InputStreamReader(is, "UTF-8")); while (true) { String line = in.readLine(); if (line == null) { break; } else { sbf.append(line); sbf.append("\r\n"); // valList.add(parseXml(line)); } } in.close(); ftp.completePending(); } } catch (FtpProtocolException | IOException e) { e.printStackTrace(); } return sbf.toString(); } /** * 多线程分配 */ public List<Future<List<Map<String,String>>>> threadSearch(List<String> list,int threadCount) { int len = threadCount;//定义分割多少份线程 if(list.size() < len){ len = list.size(); } List<List<String>> splitList = splitList(list, len);//分割一个线程执行多少个类型 TimerThread tt = null; ExecutorService exec = Executors.newCachedThreadPool();//工头 ArrayList<Future<List<Map<String,String>>>> results = new ArrayList<>();//结果通知 for (int i = 0; i < len; i++) { TimerThread timerThread = new TimerThread(splitList.get(i),connectFTP(url,port,username,password)); results.add(exec.submit(timerThread));//submit返回一个Future,代表了即将要返回的结果 } return results; } /** * 多线程具体类 * @author 12198 * */ public class TimerThread implements Callable<List<Map<String,String>>> { public List<String> li; public FtpClient ftp; public List<Map<String,String>> list; public TimerThread(List<String> li,FtpClient ftp){ this.li=li; this.ftp = ftp; this.list = new ArrayList<>(); } @Override public List<Map<String, String>> call() throws Exception { StringBuffer sbf = new StringBuffer(); if(null != li && li.size() > 0){ for(String filePath : li){ System.out.println(Thread.currentThread().getName() + "--" + ftp.toString()); // list.addAll(parseXmlFile(filePath,ftp)); sbf.append(parseXmlFile(filePath,ftp)); } } close(ftp); //最后一起解析字符串 String [] str = sbf.toString().split("\r\n"); for (String xmlString:str) { Map<String,String> map = parseXml(xmlString); if(map != null && map.size() > 0) list.add(map); } return this.list; } } // ========================= 引用的其他类方法 ==================================== public static Map<String,String> parseXml(String str) { Map<String,String> xmlMap = new HashMap<>(); Document doc = null; try { doc = DocumentHelper.parseText(str); } catch (DocumentException e) { e.printStackTrace(); } if(doc != null) { Element rootElement = doc.getRootElement(); List<Attribute> list = rootElement.attributes(); if(list != null && list.size() > 0) { if(list != null && list.size() > 0 ) { for (Attribute attr : list) { xmlMap.put(attr.getName(), attr.getValue()); } } } } return xmlMap; } public static <T> List<List<T>> splitList(List<T> list, int n) { List<List<T>> strList = new ArrayList<>(); if (list == null) return strList; int size = list.size(); int quotient = size / n; // 商数 int remainder = size % n; // 余数 int offset; // 偏移量 int len = quotient > 0 ? n : remainder; // 循环长度 int start = 0; // 起始下标 int end; // 结束下标 List<T> tempList; for (int i = 0; i < len; i++) { if (remainder != 0) { remainder--; offset = 1; } else { offset = 0; } end = start + quotient + offset; tempList = list.subList(start, end); start = end; strList.add(tempList); } return strList; } }

apache commons ftp 版得

package com.my; import org.apache.commons.net.ftp.FTPClient; import org.dom4j.*; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.*; /** * 使用apache-commons-net 包的ftp 连接工具 * * 指定每个线程获得一个FtpClient 连接实例 * */ public class ApacheFtpUtils { public static void main(String[] args) { List list = history("201810170200","201810170300"); System.out.println(list.size()); } public static FTPClient connectFTP(String hostName,int port,String username,String password) { FTPClient ftp = new FTPClient(); boolean isLogin = false; try { ftp.connect(url,port); isLogin = ftp.login(username,password); } catch (IOException e) { e.printStackTrace(); } if(isLogin) return ftp; else return null; } /** * 根据父文件名,获得列出其下的文件的字节流 * 读取字节流的同时,根据 开始时间和结束时间判断是否是需要的文件, * 将需要的文件放入list中 * @param ftpFile * @param ftp * @param fromDate * @param toDate * @return */ public static List<String> getChildFileName(String ftpFile,FTPClient ftp,String fromDate,String toDate) { long startTime = Long.valueOf(fromDate); long endTime = Long.valueOf(toDate); List<String> list = new ArrayList<>(); String [] nameStr = null; ftp.enterLocalPassiveMode();//解决 net 包因中文 年/月,无法匹配,而导致的文件查找为空问题 try { nameStr = ftp.listNames(ftpFile); } catch (IOException e) { e.printStackTrace(); } if(nameStr != null && nameStr.length >0 ) { for (int i = 0; i < nameStr.length; i++) { String line = nameStr[i]; long dateNumber = Long.valueOf(line.substring(line.lastIndexOf("/") + 1,line.length()).replace(".xml","")); if(startTime <= dateNumber && dateNumber <= endTime ) { list.add(line); } } } ftp.enterLocalActiveMode(); return list; } /** * 下载ftp 文件,并隔行解析,放入list * @param ftpFile * @param ftp * @return */ public static List<Map<String,String>> parseXmlFile(String ftpFile,FTPClient ftp) { List<Map<String,String>> valList = new ArrayList<>(); InputStream is = null; StringBuffer sbf = new StringBuffer(); try { // 获取ftp上的文件 ftp.enterLocalPassiveMode(); is = ftp.retrieveFileStream(ftpFile); if (is != null) { BufferedReader in = new BufferedReader(new InputStreamReader(is, "UTF-8")); while (true) { String line = in.readLine(); if (line == null) { break; } else { valList.add(parseXml(line)); } } in.close(); ftp.completePendingCommand();//完成等待方法 ,如果一个连接中要连续读写多个文件,需要强行关流,在执行completePendingCommand 方法 } } catch ( IOException e) { e.printStackTrace(); } ftp.enterLocalActiveMode(); return valList; } private static String [] parentFile = {"文件名1","文件名2","文件名3", "文件名4"}; private static final String url = "ftp网址"; private static final int port = 23; private static final String username = "用户名"; private static final String password = "密码"; /** * 时间为UTC-4 美东时间,与北京时间相差8个小时 * @param fromDate * @param toDate */ public static List history(String fromDate,String toDate) { FTPClient ftp = ApacheFtpUtils.connectFTP(url, port, username, password);//获得ftp 连接 List<List> list = new ArrayList<>();// 具体的List<List<Map<String,String>>> 集合 List<List> nameLists = new ArrayList<>();// 文件名列集合 if(ftp != null ) { for (int i = 0; i < parentFile.length; i++) { String filePath = "/"; filePath = filePath + parentFile[i] +"/" + fromDate.substring(0,8);//HUNTER","AGIN","XIN", "YOPLAY List<String> nameList = getChildFileName(filePath, ftp, fromDate.substring(0,12), toDate.substring(0,12)); //nameList 为0 判断 if(nameList != null && nameList.size() > 0) { nameLists.add(nameList); } } } try { ftp.logout(); } catch (IOException e) { e.printStackTrace(); } //将得到的符合查询要求的文件名集合,进行遍历,每个元素,循环处理,每个元素,分多个线程去下载解析 if(nameLists != null && nameLists.size() > 0) { for (List nameList:nameLists) { ApacheFtpUtils ftpUtils = new ApacheFtpUtils(); List<Future<List<Map<String, String>>>> futureList = ftpUtils.threadSearch(nameList); List<Map<String,String>> childList = new ArrayList<>(); list.add(childList); for (Future<List<Map<String, String>>> future : futureList) { try { System.out.println(future.isDone() + "---" + future.toString()); List<Map<String, String>> reList = future.get(); System.out.println(future.toString() + "---单个线程所得的结果---" + reList.size()); if (reList != null) { childList.addAll(reList); } System.out.println(future.isDone() + "---" + future.toString()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } } } return list; } /** * 多线程分配 * @param list * @return */ public List<Future<List<Map<String,String>>>> threadSearch(List<String> list) { int len = 5;//定义分割多少份线程 if(list.size() < len){ len = list.size(); } List<List<String>> splitList = splitList(list, len);//分割一个线程执行多少个类型 TimerThread tt = null; ExecutorService exec = Executors.newCachedThreadPool();//工头 ArrayList<Future<List<Map<String,String>>>> results = new ArrayList<>();//结果通知 for (int i = 0; i < len; i++) { TimerThread timerThread = new TimerThread(splitList.get(i),connectFTP(url,port,username,password)); results.add(exec.submit(timerThread));//submit返回一个Future,代表了即将要返回的结果 } return results; } /** * 多线程 具体类 */ public class TimerThread implements Callable<List<Map<String,String>>> { public List<String> li; public FTPClient ftp; public List<Map<String,String>> list; public TimerThread(List<String> li,FTPClient ftp){ this.li=li; this.ftp = ftp; this.list = new ArrayList<>(); } @Override public List<Map<String, String>> call() throws Exception { if(null != li && li.size() > 0){ // for(String filePath : li){ for (int i = 0; i < li.size(); i++) { System.out.println(Thread.currentThread().getName() + "--" + ftp.toString()); list.addAll(parseXmlFile(li.get(i),ftp)); } } //执行完毕,关闭连接 ftp.logout(); return this.list; } } //====================================== 引入的其他类的方法 ============================================================ /** * 解析xml 字符串 * @param str * @return */ public static Map<String,String> parseXml(String str) { Map<String,String> xmlMap = new HashMap<>(); Document doc = null; try { doc = DocumentHelper.parseText(str); } catch (DocumentException e) { e.printStackTrace(); } if(doc != null) { Element rootElement = doc.getRootElement(); List<Attribute> list = rootElement.attributes(); if(list != null && list.size() > 0) { if(list != null && list.size() > 0 ) { for (Attribute attr : list) { xmlMap.put(attr.getName(), attr.getValue()); } } } } return xmlMap; } public static <T> List<List<T>> splitList(List<T> list, int n) { List<List<T>> strList = new ArrayList<>(); if (list == null) return strList; int size = list.size(); int quotient = size / n; // 商数 int remainder = size % n; // 余数 int offset; // 偏移量 int len = quotient > 0 ? n : remainder; // 循环长度 int start = 0; // 起始下标 int end; // 结束下标 List<T> tempList; for (int i = 0; i < len; i++) { if (remainder != 0) { remainder--; offset = 1; } else { offset = 0; } end = start + quotient + offset; tempList = list.subList(start, end); start = end; strList.add(tempList); } return strList; } }

 

转载请注明原文地址: https://www.6miu.com/read-5036269.html

最新回复(0)