由于数据量级较大,对数据进行信息填充需要查询sql以及接口交互,因此采用多线程的方式进行数据二次处理,在进行数据获取时可以通过并行流的方式处理
一. 将数据入库
批处理:数据库处理速度极快,单次吞吐量很大,执行效率高,addBatch()将sql装载在一起,一次性送往数据库执行 // 构建连接 private Connection getConnection() throws Exception { Class.forName(driver); Connection conn = DriverManager.getConnection(url, user, pwd); return conn; } @Test public void importCsvFile() { // 传入文件 String inputFile = "G:\csvImport\file_00.csv"; Map<Integer, Test> testInfoMap = new HashMap<>(); Connection con = null; try { con = getConnection(); String sql = "insert into test(id, name, info) VALUES (?, ?, ?)"; con.setAutoCommit(false); // 事务处理 PreparedStatement ptatm = con.prepareStatement(sql); // 读取文件 BufferedInputStream bis = new BufferedInputStream(new FileInputStream(new File(inputFile))); BufferedReader br = new BufferedReader(new InputStreamReader(bis, StandardCharsets.UTF_8), 10 * 1024 * 1024); while (br.ready()) { String line = in.readLine(); try { String[] split = line.split(","); Test test= new Test(); test.setId(split[0]); // 数据赋值按需求修改 // 这里使用map是为了防止主键冲突 testInfoMap.put(Integer.valueOf(arr[0]), test); } catch (Exception e) { System.out.println("error:" + e + ",line:" + line); } } in.close(); int i = 0; // 用于控制条数 try { //将内存读取数据,批量写入数据库 for (Map.Entry<Integer, Test> testEntry : testInfoMap.entrySet()) { Test test= testEntry .getValue(); ptatm.setInt(1, test.getId()); ptatm.setString(2, test.getName().trim()); ptatm.setString(3, test.getInfo()); ptatm.addBatch(); //批量记录到容器里 if (i == 100000) { //当数据读取到10w条则把这部分数据先写入数据库 i = 0; //重置计数器 ptatm.executeBatch(); //执行批量SQL语句 ptatm.clearBatch(); //清除容器中已写入的数据,预备下次存入数据使用 } i++; } if (i < 100000) { //清空剩余数据 ptatm.executeBatch(); ptatm.clearBatch(); } ptatm.close(); con.commit(); } catch (Exception e) { ptatm.close(); con.commit(); e.printStackTrace(); } } catch (Exception e) { e.printStackTrace(); } }
二. 对数据进行过滤处理
public void runAndParse() { System.out.println(new Date()); // 起5个线程进行数据处理 // 控制线程结束 CountDownLatch latch = new CountDownLatch(5); // 获取执行顺序(原子性) AtomicInteger queryCnt = new AtomicInteger(0); for (int i = 0; i < 5; i++) { new Thread(() -> { // 默认从1开始处理(page默认为1),当前线程也为1 int curCnt = queryCnt.addAndGet(1); System.out.println("begin:" + curCnt); Pager<Test> testPager = testService.queryList(null, curCnt , 10000); List<Test> testList = testPager.getList(); // 进行数据解析 parseData(list); System.out.println("done:" + curCnt); // 线程处理结束标记 latch.countDown(); }).start(); } // 控制所有线程结束时重新执行递归该方法,直至数据处理完毕(结束标记根据需求设定) try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(new Date()); System.out.println("线程全部执行结束"); runThread(); } private void parseData(List<Test> list) { // 使用并行流的方式进行数据获取处理 List<Test> collect = list.parallelStream().peek(this::judgeTest).collect(Collectors.toList()); Connection con = null; try { con = getConnection(); // 后续代码跟导入时类似,进行sql处理与批量更新操作
三. 速率对比分析
数据量
操作
单线程
多线程
并行流
耗时
1w
查询、更新
是
否
否
10min
1w
查询、批量更新
是
否
否
8min
1w * 5
查询、批量更新
否
是
否
13min
1w * 5
查询、批量更新
否
是
是
6min
本网页所有视频内容由 imoviebox边看边下-网页视频下载, iurlBox网页地址收藏管理器 下载并得到。
ImovieBox网页视频下载器 下载地址: ImovieBox网页视频下载器-最新版本下载
本文章由: imapbox邮箱云存储,邮箱网盘,ImageBox 图片批量下载器,网页图片批量下载专家,网页图片批量下载器,获取到文章图片,imoviebox网页视频批量下载器,下载视频内容,为您提供.
阅读和此文章类似的: 全球云计算