网络知识 娱乐 Java线程池多线程查询数据库提高查询效率

Java线程池多线程查询数据库提高查询效率

Java线程池多线程查询数据库提高查询效率

  • 需求
  • 问题
  • 思路
  • 代码
  • 解析

需求

公司数据统计报表查询,几张大表关联查询,包含跨库查询,数据联查,数据过滤,数据统计。

问题

主表数据1主表数据2统计数据1统计数据2
数据数据数据数据

报表结构是主表数据+统计数据,只查主表数据速度很快,统计数据需要跨库联查,且表数据量大,拖慢了速度。
一开始的做法是,每次分页请求都先获取主表list,再foreach主表list,根据主表字段去查询统计数据并进行计算,再合并到主表进行返回,查询速度不理想。
分页查响应时间=1次主表查询时间+10次统计数据查询时间

思路

复杂的报表数据统计不应全部由DB层面去解决,而是SQL仅负责数据过滤,返回统计所需的字段,SQL尽量简单高效,service层拿到DB返回的结果集,由代码层面去进行较为复杂的数据合并与统计。

最总给到前端的是一个分页,那么优化的话是基于分页去进行,分页10条,线程池开启10个线程去并行查询,最总汇总返回给前端。
分页查响应时间=1次主表查询时间+1次统计数据查询时间(10条中最慢的一条)

代码

线程池工具类

package com.youxue.weliao.utils;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @Author lhy
 * @Date 2022/5/13
 */
@Slf4j
public class ThreadUtil {

    private volatile static ThreadUtil threadUtil;

    private ThreadPoolExecutor executor;

    /**
     * 单例
     */
    private ThreadUtil() {
    }

    public static ThreadUtil getThreadUtilInstance() {
        if (null == threadUtil) {
            synchronized (ThreadUtil.class) {
                if (null == threadUtil) {
                    threadUtil = new ThreadUtil();
                }
            }
        }
        return threadUtil;
    }

    /**
     * 提交任务
     *
     * @param task
     */
    public Future<?> submit(Runnable task) {
        if (executor == null) {
            // 初始化线程池
            executor = initialize();
        }
        // 执行线程
        return executor.submit(task);
    }

    /**
     * 初始化线程池
     *
     * @return
     */
    private synchronized ThreadPoolExecutor initialize() {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10, 0L,
            TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(1024),
            new ThreadFactoryBuilder()
                .setNameFormat("task-admin-getlist--%d")
                .build(),
            new ThreadPoolExecutor.CallerRunsPolicy());
        log.info("===================>ThreadUtil线程池初始化");
        return executor;
    }

    /**
     * 关闭线程池
     */
    public void shutdown() {
        if (executor != null) {
            executor.shutdown();
        }
    }

}

service层实际调用,这里我就不把全部业务代码放出来了,简化后如下

    @Override
    public IPage<ManageDto> getManageDto(IPage<ManageDto> page, String str, String end, List<Integer> taskAdminIds) {
    	// 主表查询
        IPage<ManageDto> manageDtos = baseMapper.getManageDto(page, str, end, taskAdminIds);
        if (manageDtos.getTotal() > 0) {
    		// 主表查询结果集
            List<ManageDto> manageDtoList = manageDtos.getRecords();
            // 异步线程Future集合
            List<Future<?>> futures = new ArrayList<>();
            for (ManageDto manageDto : manageDtoList) {
            	// 本页有几条数据便开启几条线程去进行统计数据查询
            	futures.add(ThreadUtil.getThreadUtilInstance().submit(() -> {
	        		// 统计数据查询
	        		List<ManageVo> manageVos = groupCyberArmyService.getManageVos(manageDto.getProcessAdminUserId(), manageDto.getTaskAdminId());
	        		// 业务逻辑运算后合并到主表Dto
            	}))
            }
            this.waitFinish(futures);
            manageDtos.setRecords(manageDtoList);
        	return manageDtos;
        }
        return null;
    }
    
    /**
     * 线程池内线程是否已全部执行结束
     * 
     * @param futures 异步线程Future集合
     */
    @SneakyThrows
    private void waitFinish(List<Future<?>> futures) {
        for (Future<?> future : futures) {
            future.get();
        }
    }

解析

future.get方法

线程池线程是异步提交的,但是返回分页结果是需要同步返回,Future的get是个阻塞方法。只有所有的任务全部完成,我们才能用get按照任务的提交顺序依次返回结果,调用future.get()方法查看线程池内所有方法是否已执行完成,达到线程异步提交,结果集同步返回的效果。

线程池工具类

单例就不多解释了,这里用的是DCL单例,线程池根据自身需求配置核心线程等参数。