Skip to content

优雅解决线程池内部异常以及拒绝策略

上一篇介绍了如何在 Spring boot 中自定义线程池, 线程池内部的异常处理和任务拒绝策略是一件对于保证系统的稳定性和可靠性非常的重要,这篇文章主要介绍如何自定义线程池内部异常以及线程池的拒绝策略

线程池内部异常处理

问题描述

在使用线程池执行任务时,如果任务抛出了未捕获的异常,默认情况下这些异常会被吞掉,不会显示在控制台或日志中。这会导致我们在调试时很难发现和定位问题。

解决方案

为了捕获和记录任务中的未捕获异常,我们可以在创建线程时设置未捕获异常处理器(UncaughtExceptionHandler)。这样,当任务抛出异常时,我们可以记录这些异常,以便进行后续的调试和处理。

任务拒绝策略

问题描述

当线程池的队列已满且没有空闲线程时,新的任务会被拒绝。RejectedExecutionHandler 有是一个接口,有以下几种策略

  • AbortPolicy: 默认策略,在需要拒绝任务时抛出 RejectedExecutionException
  • DiscardPolicy: 直接丢弃任务
  • CallerRunsPolicy: 直接在 execute 方法的调用线程中运行被拒绝的任务,如果线程池已经关闭,任务将被丢弃;
  • DiscardOldestPolicy: 丢弃队列中等待时间最长的任务,并执行当前提交的任务,如果线程池已经关闭,任务将被丢弃。

解决方案

我们希望可以记录被拒绝的任务信息,将任务放入备用队列中重新尝试, 触发报警,入库方便进行任务补偿等功能,这样我们就需要自定义实现 RejectedExecutionHandler 进行实现我们自己的策略。

参考代码

java
@Slf4j
public class BestPracticeThreadPool {

    public static void main(String[] args) throws InterruptedException {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 60l, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread thread = new Thread(r);
                        thread.setUncaughtExceptionHandler((t, e) -> {
                            log.error("Uncaught exception in thread {}: {}", t.getName(), e.getMessage(), e);
                        });
                        return thread;
                    }
                }, (r, executor1) -> {
            Runnable originalTask = extractOriginalTask(r);
            //todo 报警, 入库等操作
            if (originalTask instanceof MyTask) {
                MyTask task = (MyTask) originalTask;
                log.error("Task {} is rejected.", task.getTaskName());
            } else {
                log.error("Task is rejected.");
            }
        });
        executor.execute(new MyTask("MyTask-export"));
        executor.execute(new MyTask("MyTask-export"));
        executor.execute(new MyTask("MyTask-export"));
        executor.execute(new MyTask("MyTask-export"));
        executor.execute(new MyTask("import"));
        executor.execute(new MyTask("import"));
        Thread.sleep(Integer.MAX_VALUE);

    }


    public static class MyTask implements Runnable {

        private String taskName;

        public MyTask(String taskName) {
            this.taskName = taskName;
        }

        @Override
        public void run() {
            if (taskName.startsWith("MyTask")) {
                try {
                    Thread.sleep(500);
                    log.info("{} is running", taskName);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            } else {
                throw new RuntimeException("not support");
            }
        }

        public String getTaskName() {
            return taskName;
        }

        public void setTaskName(String taskName) {
            this.taskName = taskName;
        }
    }

    private static Runnable extractOriginalTask(Runnable r) {
        if (r instanceof FutureTask) {
            try {
                Field syncField = FutureTask.class.getDeclaredField("sync");
                syncField.setAccessible(true);
                Object sync = syncField.get(r);
                Field runnerField = sync.getClass().getDeclaredField("runner");
                runnerField.setAccessible(true);
                Object runner = runnerField.get(sync);
                if (runner instanceof Runnable) {
                    return (Runnable) runner;
                }
            } catch (NoSuchFieldException | IllegalAccessException e) {
                log.error("Failed to extract original task from FutureTask: {}", e.getMessage(), e);
            }
        }
        return r;
    }
}

输出如下:

text
11:20:11.457 [main] ERROR com.ssn.kit.thread.pool.BestPracticeThreadPool - Task MyTask-export is rejected.
11:20:11.459 [main] ERROR com.ssn.kit.thread.pool.BestPracticeThreadPool - Task import is rejected.
11:20:11.459 [main] ERROR com.ssn.kit.thread.pool.BestPracticeThreadPool - Task import is rejected.
11:20:11.958 [Thread-0] INFO com.ssn.kit.thread.pool.BestPracticeThreadPool - MyTask-export is running
11:20:12.458 [Thread-0] INFO com.ssn.kit.thread.pool.BestPracticeThreadPool - MyTask-export is running
11:20:12.959 [Thread-0] INFO com.ssn.kit.thread.pool.BestPracticeThreadPool - MyTask-export is running

总结

通过上述方法,我们可以优雅地解决线程池内部异常处理和任务拒绝策略的问题。设置未捕获异常处理器可以确保任务中的异常被捕获并记录,而自定义拒绝策略则可以灵活地处理被拒绝的任务。

Released under the MIT License.