跳至主要內容

Spring线程池多线程并发处理大批量数据,解决IO效率问题

xlc520JavaJava大约 5 分钟约 1629 字

Spring 线程池多线程并发处理大批量数据,解决 IO 效率问题

首先,叙述一下当前面临的问题所在。当前系统通过接口调用其他系统的数据,返回的数据达到 10 万级,然后将这批数据插入到 oracle 数据库。怎样尽可能提高这一过程的效率?

大致从两个时间节点来优化

一个节点是优化接口之间调用的响应速度,可以项目之间使用集群,实现负载均衡。接口拿到数据后可以暂存到 Redis 或 kafka 再者是 MQ 队列中,以提高接口直接的相率。

当然了如果项目团队允许,分布式的 Hbase 也是个不错的选择。当然了这些都不是重点,吹了半天下面才是重点。

今天的主题是大批量数据并发入库的问题,现在主流的项目工程大部分 spring 全家桶占大部分,所以咱们选择使用 spring 的线程池解决这一问题。大家可以思考一下 10 万条数据入库传统的 web 是一个线程运作,把这部分数据拆成 10 份或者 20 份分给多个线程去处理不就提高效率了?

思路有了,接下来,不哔哔了,直接干代码。

两个方案

方案一:新建几个线程,交给线程池管理

1、准备测试数据

public List<BookStatistic> getPsrList(){
  List<BookStatistic> psrList = new ArrayList<BookStatistic>();
  for(int i=0 ; i<20000 ;i++){
   BookStatistic book = new BookStatistic();
   book.setPno("zxl"+i);
   psrList.add(book);
  }
  
  return bookList;
 }

2、线程池配置类

@Configuration
@EnableAsync
public class AsyncConfig {
 
  //接收报文核心线程数
  @Value("${book.core.poolsize}")
  private int bookCorePoolSize;
  //接收报文最大线程数
  @Value("${book.max.poolsize}")
  private int bookMaxPoolSize;
  //接收报文队列容量
  @Value("${book.queue.capacity}")
  private int bookQueueCapacity;
  //接收报文线程活跃时间(秒)
  @Value("${book.keepAlive.seconds}")
  private int bookKeepAliveSeconds;
  //接收报文默认线程名称
  @Value("${book.thread.name.prefix}")
  private String bookThreadNamePrefix;
  
   /**
    * bookTaskExecutor:(接口的线程池). <br/>
    * @return TaskExecutor taskExecutor接口
    * @since JDK 1.8
    */
     @Bean(name="BookTask")
     public ThreadPoolTaskExecutor bookTaskExecutor() {
      //newFixedThreadPool
         ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
         // 设置核心线程数
         executor.setCorePoolSize(bookCorePoolSize);
         // 设置最大线程数
         executor.setMaxPoolSize(bookMaxPoolSize);
         // 设置队列容量
         executor.setQueueCapacity(bookQueueCapacity);
         // 设置线程活跃时间(秒)
         executor.setKeepAliveSeconds(bookKeepAliveSeconds);
         // 设置默认线程名称
         executor.setThreadNamePrefix(bookThreadNamePrefix);
         // 设置拒绝策略
         // rejection-policy:当pool已经达到max size的时候,如何处理新任务  
         // CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行 
         executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
         // 等待所有任务结束后再关闭线程池
         executor.setWaitForTasksToCompleteOnShutdown(true);
         executor.initialize();
         return executor;
     }
}

3、实现过程函数

public void ReceivePsrJobRun(){
  
  List<BookStatistic> bookList = null;
  
 
  bookList = getBookList();
  //接收集合各段的 执行的返回结果
  List<Future<String>> futureList = new ArrayList<Future<String>>();
  //集合总条数
  int size = bookList.size();
  //将集合切分的段数(2*CPU的核心数)
  int sunSum = 2*Runtime.getRuntime().availableProcessors();
  int listStart,listEnd;
  //当总条数不足sunSum条时 用总条数 当做线程切分值
  if(sunSum > size){
   sunSum = size;
  }
  
  //定义子线程
  /*BookThread bookThread;*/
  
  //将list 切分多份 多线程执行
  for (int i = 0; i < sunSum; i++) {
   //计算切割  开始和结束
   listStart = size / sunSum * i ;
   listEnd = size / sunSum * ( i + 1 );
   //最后一段线程会 出现与其他线程不等的情况
   if(i == sunSum - 1){
    listEnd = size;
   }
   //线程切断**/
   List<BookStatistic> sunList = bookList.subList(listStart,listEnd); 
   //子线程初始化
   bookThread = new BookThread(i,sunList);
   
   //多线程执行
   futureList.add(taskExecutor.submit(bookThread));
  }
  System.out.println("----------1111111111");
  //对各个线程段结果进行解析
  for(Future<String> future : futureList){
   try {
    String str ;
    if(null != future ){
     str = future.get().toString();
     System.out.println("##############current thread id ="+Thread.currentThread().getName()+",result="+str);
    }else{
     System.err.println("失败");
    }
   } catch (InterruptedException | ExecutionException e) {
    
    // TODO Auto-generated catch block
    e.printStackTrace();
    
   }
  }
  System.out.println("----------2222");
 
 }

4、线程类

@Component
public class BookThread implements Callable<Boolean>{
 private static final Logger LOG = LoggerFactory.getLogger(BookThread.class);
 //当前是属于第几段线程
 private int pageIndex;
 //此段数据的集合
 private List<BookStatistic> bookList;
 
 public BookThread(int pageIndex,List<BookStatistic> list){
  this.pageIndex = pageIndex;
  this.psrList = list;
 }
 @Override
 public Boolean call() throws Exception {
  
   System.err.println(String.format("此批数据的段数为:%s 此段数据的数据条数为:%s",pageIndex,psrList.size()));
   Boolean result = Boolean.TRUE;
  
   if(null != bookList&& bookList.size() >0){
    for(BookStatistic book: bookList){
   
     try {
       //数据入库函数
     } catch (Exception e) {
      
      result = Boolean.FALSE;
      continue;
     }
    }
   }
   return result;
  }
 
 
}

方案二:只定义线程的数量,线程的新建管理都交给线程池

准备测试数据和线程池的配置和方案一一样,不再赘述。

方案二不再新建线程类了,这个过程交给 spring 线程池去处理,取而代之的是 spring 下的一个异步注解@Async

@Component
public class SyncBookHandler {
 
 
 private static final Logger LOG = LoggerFactory.getLogger(SyncBookHandler.class);
 /**
  * syncMargePsr:(多线程同步处理数据方法). <br/>
  * @author LW
  * @param bookList 一段数据集合
  * @param pageIndex 段数
  * @return Future<String> future对象
  * @since JDK 1.8
  */
 @Async
 public Future<String> syncMargePsr(List<BookStatistic> bookList,int pageIndex){
  
 
    LOG.info(String.format("此批数据的段数为:%s 此段数据的数据条数为:%s",pageIndex,psrList.size()));
   //声明future对象
    Future<String> result = new AsyncResult<String>("");
    //循环遍历该段旅客集合
   if(null != bookList && bookList.size() >0){
    for(BookStatistic book: bookList){
     try {
      //数据入库操作
      
     } catch (Exception e) {
      
      //记录出现异常的时间,线程name
      result = new AsyncResult<String>("fail,time="+System.currentTimeMillis()+",thread id="+Thread.currentThread().getName()+",pageIndex="+pageIndex);
      continue;
     }
    }
   }
   return result;
  }

实现过程函数

 @Autowired
 private SyncBookHandler syncBookHandler;
 
 //核心线程数
 @Value("${book.core.poolsize}")
 private int threadSum;
 
public void receiveBookJobRun(){
  List<BookStatistic> bookList = null;
  
 
  bookList  = getPsrList();
  //入库开始时间
  Long inserOrUpdateBegin = System.currentTimeMillis();
  LOG.info("数据更新开始时间:"+inserOrUpdateBegin);
  //接收集合各段的 执行的返回结果
  List<Future<String>> futureList = new ArrayList<Future<String>>();
  //集合总条数
  if(psrList != null){
   int listSize = bookList.size();
   
   int listStart,listEnd;
   //当总条数不足threadSum条时 用总条数 当做线程切分值
   if(threadSum > listSize){
    threadSum = listSize;
   }
  
   //将list 切分多份 多线程执行
   for (int i = 0; i < threadSum; i++) {
    //计算切割  开始和结束
    listStart = listSize / threadSum * i ;
    listEnd = listSize / threadSum * ( i + 1 );
    //最后一段线程会 出现与其他线程不等的情况
    if(i == threadSum - 1){
     listEnd = listSize;
    }
    //数据切断
    List<BookStatistic> sunList = psrList.subList(listStart,listEnd); 
 
    //每段数据集合并行入库
    futureList.add(syncPassengerHandler.syncMargePsr(sunList,i));
 
   }
   
   //对各个线程段结果进行解析
   for(Future<String> future : futureList){
    
     String str ;
     if(null != future ){
      try {
       str = future.get().toString();
       LOG.info("current thread id ="+Thread.currentThread().getName()+",result="+str);
  
      } catch (InterruptedException | ExecutionException e) {
       
       LOG.info("线程运行异常!");
      }
      
     }else{
      LOG.info("线程运行异常!");
     }
    
   }
  }
  
  
  
  Long inserOrUpdateEnd = System.currentTimeMillis();
  LOG.info("数据更新结束时间:"+inserOrUpdateEnd+"。此次更新数据花费时间为:"+(inserOrUpdateEnd-inserOrUpdateBegin));
 
 }

以上思路和代码为简单的实现过程,鄙人能力有限,欢迎各位大神提出建议!!