|
| 1 | +# java异步编程规范 |
| 2 | + |
| 3 | +## async使用规范 |
| 4 | + |
| 5 | +在java中推荐的异步编程方式是使用Spring的Async注解,该方式的优点是简单易用,当方法标注了Async注解以后,将在异步线程中执行该方法,但有以下限制: |
| 6 | + |
| 7 | +- Async方法必须是类的第一个被调用的方法; |
| 8 | + |
| 9 | +- Async必须是实例方法,且该方法的对象必须是Spring注入的。 |
| 10 | + |
| 11 | +使用时除以上限制需要注意之外,我们还需要解决一些其他问题,如下: |
| 12 | + |
| 13 | +### 1. 日志记录 |
| 14 | + |
| 15 | +由于Sync方法并非通过Controller进入,绕开了我们的通用异常拦截层,即CommonExceptionHandler,所以对于异步方法发生的异常没有进行日志记录, |
| 16 | +为解决此问题,我们编写了 @AsyncExceptionLogger注解,在所有@Async方法同时加上此注解,即可保证异常得到记录。 |
| 17 | + |
| 18 | +例如: |
| 19 | + |
| 20 | +```java |
| 21 | +@Async |
| 22 | +@AsyncExceptionLogger |
| 23 | +public void asyncMethod() { |
| 24 | + //... |
| 25 | +} |
| 26 | +``` |
| 27 | + |
| 28 | +### 2. 与JPA整合 |
| 29 | + |
| 30 | +由于Async方法在新的异步线程中执行,JPA的OpenSessionInView无效,导致执行上下文中并不存在对应的EnitityManager,这会产生一系列问题,典型场景就是导致lazyLoading无效。为解决此问题,我们编写了@OpenJpaSession注解,在需要访问数据库的@Async方法中加此注解,即可实现与OpenSessionInView类似的效果。 |
| 31 | + |
| 32 | +示例代码: |
| 33 | + |
| 34 | +```java |
| 35 | +@Async |
| 36 | +@OpenJpaSession |
| 37 | +@AsyncExceptionLogger |
| 38 | +public void asyncMethod() { |
| 39 | + //... |
| 40 | +} |
| 41 | +``` |
| 42 | + |
| 43 | +### 3. 使用事务 |
| 44 | + |
| 45 | +我们当前使用事务的方式是使用@Transactional注解,但由于@Transational只能用在对象被调用的的第一个公有方法上,使用起来多有不便(为了事务而创建一个类实在不能接受),而且在很多情况下,我们都需要更加灵活地进行事务范围的控制。为此,我们编写了一个辅助类**TransactionHelper**,使用方式如下: |
| 46 | + |
| 47 | +```java |
| 48 | +//事务辅助对象,需要注入 |
| 49 | +private TransactionHelper transactionHelper; |
| 50 | + |
| 51 | +@Async |
| 52 | +@OpenJpaSession |
| 53 | +@AsyncExceptionLogger |
| 54 | +public void createTicketConfirmTaskIfRequired(long orderId) { |
| 55 | + |
| 56 | + //创建出票任务,在事务中执行 |
| 57 | + Long newTaskId = transactionHelper.withTransaction(() -> |
| 58 | + doCreateTicketConfirmTaskIfRequired(orderId) |
| 59 | + ); |
| 60 | + |
| 61 | + if (newTaskId != null) { |
| 62 | + LOG.info("出票任务已创建,开始异步调用出票请求"); |
| 63 | + ticketingDomainService.autoTicketingAsync(newTaskId); |
| 64 | + } |
| 65 | +} |
| 66 | + |
| 67 | +//需要在事务中执行 |
| 68 | +private Long doCreateTicketConfirmTaskIfRequired(long orderId) { |
| 69 | + //todo |
| 70 | +} |
| 71 | +``` |
| 72 | + |
| 73 | +我们只需要将事务中的代码以lamda调用的方式包裹在一个withTransaction调用中即可。 |
| 74 | + |
| 75 | +### 4. 线程同步 |
| 76 | + |
| 77 | +有些场景中,我们需要在主线程中的事务尚未完成的情况下发起一个异步方法调用,在异步方法执行时,又希望在主线程的事务成功提交以后再开始。 |
| 78 | + |
| 79 | +例如, 我们需要在机票的所有子订单都出票成功的情况下,才开始发送短信通知,代码如下: |
| 80 | + |
| 81 | +```java |
| 82 | +//事务辅助对象,需要注入 |
| 83 | +private TransactionHelper transactionHelper; |
| 84 | +//短信通知服务 |
| 85 | +private SmsNotifyService notifyService; |
| 86 | +//订单仓储 |
| 87 | +private FlightOrderRepository orderRepo; |
| 88 | + |
| 89 | +//主线程的出票确认操作 |
| 90 | +public void ticketConfirm(FlightOrder order, FlightTask ticketConfirmTask) { |
| 91 | + |
| 92 | + //定义一个异步通知事件,用来在主事务提交完成以后通知异步线程开始 |
| 93 | + AsyncEvent asyncEvent = new AsyncEvent(); |
| 94 | + |
| 95 | + try { |
| 96 | + transactionHelper.withTransaction(() -> |
| 97 | + //对每个子订单进行出票操作 |
| 98 | + order.getOrderTickets().foreach(ticket -> { |
| 99 | + //对子订单作出票处理 |
| 100 | + ticket.ticketSuccess(); |
| 101 | + |
| 102 | + //异步发送通知 |
| 103 | + notifyService.sendNotifyAsync(order, ticket, asyncEvent); |
| 104 | + }); |
| 105 | + |
| 106 | + //其他操作:更新任务已完成 |
| 107 | + ticketConfirmTask.updateToCompleted(); |
| 108 | + orderRepo.save(order); |
| 109 | + ); |
| 110 | + |
| 111 | + //事务执行成功,通知异步线程开始 |
| 112 | + asyncEvent.setAsCompleted(true); |
| 113 | + |
| 114 | + } catch (Excepton ex) { |
| 115 | + //事务执行失败,也通知异步线程 |
| 116 | + asyncEvent.setAsCompleted(false); |
| 117 | + thrown ex; |
| 118 | + } |
| 119 | +} |
| 120 | + |
| 121 | +//SmsNotifyService类 |
| 122 | +@Async |
| 123 | +@OpenJpaSession |
| 124 | +@AsyncExceptionLogger |
| 125 | +private void sendNotifyAsync(long orderId, long ticketId, AsyncEvent asyncEvent) { |
| 126 | + |
| 127 | + //等待主线程执行成功再开始 |
| 128 | + if (asyncEvent.await() && asyncEvent.isSuccessful()) { |
| 129 | + //发送短信通知 |
| 130 | + //todo |
| 131 | + } |
| 132 | +} |
| 133 | +``` |
| 134 | + |
| 135 | +为实现线程间的同步,我们设计了一个AsyncEvent类,该类内部包含一个CountDownLatch对象,初值为1。异步线程等待这个CountDownLatch对象,当主线程事务完成以后,调用CountDownLatch的countDown()方法,从而触发异步线程的执行。 |
| 136 | + |
| 137 | +```java |
| 138 | +public class AsyncEvent { |
| 139 | + |
| 140 | + private boolean successful; |
| 141 | + private CountDownLatch countDownLatch; |
| 142 | + |
| 143 | + public AsyncEvent() { |
| 144 | + successful = false; |
| 145 | + countDownLatch = new CountDownLatch(1); |
| 146 | + } |
| 147 | + |
| 148 | + /** |
| 149 | + * 异步线程调用此方法以等待主线程执行完成 |
| 150 | + * @param timeout 等待时长 |
| 151 | + * @param unit |
| 152 | + * @return 如果主线程在等待时间内执行完成为true, 如果等待超时则为false |
| 153 | + */ |
| 154 | + public boolean await(long timeout, TimeUnit unit) { |
| 155 | + try { |
| 156 | + return countDownLatch.await(timeout, unit); |
| 157 | + |
| 158 | + } catch (InterruptedException ex) { |
| 159 | + throw new BusinessException(String.format("AsyncEvent await exception happens, message: %s", ex.getMessage()), ex); |
| 160 | + } |
| 161 | + } |
| 162 | + |
| 163 | + /** |
| 164 | + * 设置主线程执行已完成,调用此方法将触发异步线程开始执行。此方法只能调用一次。 |
| 165 | + * @param successful 执行成功为true, 否则为false |
| 166 | + */ |
| 167 | + public void setAsCompleted(boolean successful) { |
| 168 | + if (this.countDownLatch.getCount() > 0) { |
| 169 | + this.successful = successful; |
| 170 | + this.countDownLatch.countDown(); |
| 171 | + |
| 172 | + } else { |
| 173 | + throw new BusinessException("AsyncEvent.setAsCompleted invoke allowed only once"); |
| 174 | + } |
| 175 | + } |
| 176 | + |
| 177 | + public boolean isSuccessful() { |
| 178 | + return successful; |
| 179 | + } |
| 180 | +} |
| 181 | +``` |
0 commit comments