实现声明式锁,支持分布式锁自定义锁、SpEL和结合事务_当前时讯
工作中遇到事务一般使用声明式事务,一个注解@Transactional搞定。编程式事务则显得略繁琐。
@Autowired private PlatformTransactionManager transactionManager; public void service() throws Exception { TransactionStatus status = transactionManager.getTransaction(new DefaultTransactionDefinition()); try { Thread.sleep(1000); } catch (Exception e) { transactionManager.rollback(status); throw e; } transactionManager.commit(status); }
但是遇到需要加锁的情况呢?绝大多数情况都是使用编程式锁。例如:
ReentrantLock lock = new ReentrantLock(); public void service() throws Exception { try { lock.lock(); Thread.sleep(1000); } finally { lock.unlock(); } }
但为什么不搞个轮子把编程式锁变为声明式锁呢?本文尝试着写一个,欢迎大家指正。
【资料图】
2.实现众所周知,声明式事务使用注解+AOP,同理声明式锁也应该一样。那么先搞一个注解。
2.1 定义注解@Target({ElementType.TYPE, ElementType.METHOD})@Retention(RetentionPolicy.RUNTIME)@Inherited@Documentedpublic @interface Locked { /** * 锁名称,如果不传会默认使用同一把锁 * 支持SpEL表达式 * @return */ String key() default ""; /** * 锁类型 * @return */ LockType lockType() default LockType.ReentrantLock; /** * 获取锁的时间,超出时间没有获取到锁返回false * 默认为0 * -1 为永不超时,这种情况下,tryLock()会一直阻塞到获取锁 * @return */ long time() default 0; /** * 获取锁的时间单位 * 默认为秒 * @return */ TimeUnit timeUnit() default TimeUnit.SECONDS;}
再定义一个枚举类型的锁类型
public enum LockType { ReentrantLock, RedissonClient, ELSE}
2.2 定义锁接口然后仿照java.util.concurrent.locks.Lock
定义一个锁的接口。
public interface LockedService { boolean lock(); boolean tryLock(); boolean tryLock(long time, TimeUnit unit) throws InterruptedException; void unLock(); /** * 如果是redisson分布式锁,调用方通过此方法将相应的redissonClient对象和key传到具体实现类 * @param redissonClient * @param key * @return */ LockedService setRedissonClient(RedissonClient redissonClient, String key); /** * 如果是ReentrantLock,调用方通过此方法将相应的lock对象传到具体实现类 * @param lock * @return */ LockedService setLock(Lock lock);}
主要是加锁解锁的动作。加锁分为阻塞与非阻塞,有时间参数与非时间参数之分。与jdk的 lock接口完全相似。里面多余的方法分别为:
setRedissonClient()
如果是redisson分布式锁,调用方通过此方法将相应的redissonClient对象和key传到具体实现类setLock()
如果是ReentrantLock,调用方通过此方法将相应的lock对象传到具体实现类如果是自定义锁,在自定义的实现类里面可以忽略这两个方法,自定义获取锁对象。
2.3 锁的实现锁的接口至少需要两个实现类,一个是ReentrantLock
,另一个是RedissonClient
。如果是直接定义两个类,
@Componentpublic class LockedServiceImpl implements LockedService{}@Componentpublic class LockedRedissonServiceImpl implements LockedService{}
然后在切面里直接使用
@Autowired private LockedService lockedService;
启动就会报错:
Field lockedService in com.nyp.test.service.LockAspect required a single bean, but 2 were found:- lockedRedissonServiceImpl: defined in file [\target\classes\com\nyp\test\service\LockedRedissonServiceImpl.class]- lockedServiceImpl: defined in file [\target\classes\com\nyp\test\service\LockedServiceImpl.class]
就算通过@Primary
或者@Qualifier
将这两个实现类都注入了进来,也不好分辨究竟使用哪一个。
这个使用就需要用到SPI了。
2.3.1 什么是SPISPI全称Service Provider Interface
,是Java提供的一套用来被第三方实现或者扩展的API,它可以用来启用框架扩展和替换组件。
有点类似于接口+策略模式+配置文件
组合实现的动态加载机制。
比如spring mvc/springboot里面有个HttpMessageConverters
,HTTP的request和response的转换器。当一个请求完成时,返回一个对象,需要它将对象转换成json串(或其它),然后以流的形式写到客户端。这个的工具有很多,比如jackson,gson等,spring默认采用jackson框架,AbstractJackson2HttpMessageConverter
.
也很多同学实际上使用的是fastjson,FastJsonHttpMessageConverter
。或者其它。
不管使用哪一种框架,它都要去实现HttpMessageConverters
接口。但springboot容器加载的时候怎么知道需要去加载哪些实现类,具体又使用哪个实现类呢。
这里就使用到了SPI机制。
2.3.2 通过SPI实现锁的多个实现类SPI有jdk的实现,有spring boot的实现。这里使用springboot的实现方法。
在resources/META-INF
目录下新装置文件 spring.factories
,内容为锁接口及其实现类的全限定类名。
com.nyp.test.service.LockedService=\com.nyp.test.service.LockedServiceImpl,\com.nyp.test.service.LockedRedissonServiceImpl
同时在类LockedServiceImpl
和LockedRedissonServiceImpl
就不需要加@Component
注解。LockedServiceImpl
类:
import org.redisson.api.RedissonClient;import org.springframework.stereotype.Component;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.Lock;/** * @projectName: Test * @package: com.nyp.test.service * @className: com.nyp.test.service.LockedService * @author: nyp * @description: jdk ReentrantLock锁实现 * @date: 2023/4/13 11:45 * @version: 1.0 */@Componentpublic class LockedServiceImpl implements LockedService{ private Lock lock; @Override public boolean lock(){ if (lock != null) { lock.lock(); return true; } return false; } @Override public boolean tryLock() { return lock.tryLock(); } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return lock.tryLock(time, unit); } @Override public void unLock(){ if (lock != null) { lock.unlock(); } } @Override public LockedService setRedissonClient(RedissonClient redissonClient, String key) { return this; } @Override public LockedService setLock(Lock lock) { this.lock = lock; return this; }}
LockedRedissonServiceImpl
类:
import org.redisson.api.RLock;import org.redisson.api.RedissonClient;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.Lock;/** * @projectName: Test * @package: com.nyp.test.service * @className: com.nyp.test.service.LockedService * @author: nyp * @description: redisson分布式锁实现 * @date: 2023/4/13 11:45 * @version: 1.0 */public class LockedRedissonServiceImpl implements LockedService { private RedissonClient redissonClient; private String key; @Override public boolean lock() { RLock rLock = redissonClient.getLock(key); if (rLock != null) { rLock.lock(); return true; } return false; } @Override public boolean tryLock() { RLock rLock = redissonClient.getLock(key); return rLock.tryLock(); } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { RLock rLock = redissonClient.getLock(key); return rLock.tryLock(time, unit); } @Override public void unLock() { RLock rLock = redissonClient.getLock(key); if (rLock != null && rLock.isHeldByCurrentThread()) { rLock.unlock(); } } @Override public LockedService setRedissonClient(RedissonClient redissonClient, String key) { this.redissonClient = redissonClient; this.key = key; return this; } @Override public LockedService setLock(Lock lock) { return this; }}
那么spring容器里面就有两个锁接口实现了,到底使用哪一个呢?模仿HttpMessageConverters
搞一个转换器。在init()
方法里面指定默认使用com.nyp.test.service.LockedServiceImpl
.
public class LockedServiceConverters { private LockedService lockedService; private String lockedServiceImplClass; public LockedServiceConverters(String lockedServiceImplClass) { this.lockedServiceImplClass = lockedServiceImplClass; } public LockedService getLockedService() { return lockedService; } @PostConstruct public void init() throws Exception { if (StringUtils.isBlank(lockedServiceImplClass)) { lockedServiceImplClass = "com.nyp.test.service.LockedServiceImpl"; } List lockedServiceList = SpringFactoriesLoader.loadFactories(LockedService.class, null); for (LockedService lockedService : lockedServiceList){ System.out.println(lockedService.getClass().getName()); if(lockedService.getClass().getName().equals(lockedServiceImplClass)){ this.lockedService = lockedService; } } if (lockedService == null) { throw new Exception("未发现lockedService : " + lockedServiceImplClass); } }}
使用的时候可以通过以下方式指定使用LockedRedissonServiceImpl
,或其它自定义的锁实现类。
@Configurationpublic class WebConfig { @Bean public LockedServiceConverters lockedServiceConverters() { return new LockedServiceConverters("com.nyp.test.service.LockedRedissonServiceImpl"); }}
2.3.3 通过SPI自定义实现锁首先实现一个LockedService
接口。
public class LockedServiceUserDefine implements LockedService{ @Override public boolean lock() { log.info("锁住"); return true; } @Override public boolean tryLock() { return true; } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return true; } @Override public void unLock() { log.info("解锁"); } @Override public LockedService setRedissonClient(RedissonClient redissonClient, String key) { return this; } @Override public LockedService setLock(Lock lock) { return this; }}
然后将此类加到spring.factory里面去
com.nyp.test.service.LockedService=\com.nyp.test.service.LockedServiceImpl,\com.nyp.test.service.LockedRedissonServiceImpl,\com.nyp.test.service.LockedServiceUserDefine
再将LockedServiceConverters
改为LockedServiceUserDefine
即可。
@Bean public LockedServiceConverters lockedServiceConverters() { return new LockedServiceConverters("com.nyp.test.service.LockedServiceUserDefine"); }
3.定义切面3.1 切面实现import com.nyp.test.config.LockType;import com.nyp.test.config.Locked;import lombok.extern.slf4j.Slf4j;import org.apache.commons.lang3.StringUtils;import org.aspectj.lang.ProceedingJoinPoint;import org.aspectj.lang.annotation.*;import org.aspectj.lang.reflect.MethodSignature;import org.ehcache.impl.internal.concurrent.ConcurrentHashMap;import org.redisson.api.RedissonClient;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.core.DefaultParameterNameDiscoverer;import org.springframework.expression.EvaluationContext;import org.springframework.expression.Expression;import org.springframework.expression.spel.standard.SpelExpressionParser;import org.springframework.expression.spel.support.StandardEvaluationContext;import org.springframework.stereotype.Component;import org.springframework.transaction.support.TransactionSynchronization;import org.springframework.transaction.support.TransactionSynchronizationManager;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;/** * @projectName: Test * @package: com.nyp.test.service * @className: LockAspect * @author: nyp * @description: TODO * @date: 2023/4/13 10:46 * @version: 1.0 */@Aspect@Component@Slf4jpublic class LockAspect { /** * 用于保存ReentrantLock类的锁 */ private volatile ConcurrentHashMap locks = new ConcurrentHashMap<>(); @Autowired private LockedServiceConverters lockedServiceConverters; @Autowired private RedissonClient redissonClient; /** * 用于SpEL表达式解析. */ private SpelExpressionParser parser = new SpelExpressionParser(); /** * 用于获取方法参数定义名字. */ private DefaultParameterNameDiscoverer discoverer = new DefaultParameterNameDiscoverer(); @Around("@annotation(locked)") public Object around(ProceedingJoinPoint joinPoint, Locked locked) throws Throwable { String lockKey = getKeyBySpEL(locked.key(), joinPoint); if (StringUtils.isBlank(lockKey)) { lockKey = joinPoint.getTarget().toString(); log.info("lockKey = {}", lockKey); } Lock lock = locks.get(lockKey); lock = getLockFromLocks(lock, locked.lockType(), lockKey); boolean isLock; if (locked.time() != 0) { if (locked.time() < 0) { isLock = lockedServiceConverters.getLockedService().setLock(lock).setRedissonClient(redissonClient, lockKey).tryLock(); } else { isLock = lockedServiceConverters.getLockedService().setLock(lock).setRedissonClient(redissonClient, lockKey).tryLock(locked.time(), locked.timeUnit()); } } else { isLock = lockedServiceConverters.getLockedService().setLock(lock).setRedissonClient(redissonClient, lockKey).lock(); } Object obj; try { obj = joinPoint.proceed(); } catch (Throwable throwable) { throw throwable; } finally { if(isLock){ // 如果有事务,保证事务完成(commit or rollback)过后再释放锁 // 这里不管声明式事务,还是编程式事务,只要还没完成就会进入// TODO 这里要考虑到事务的传播,特别是嵌套事务 if(TransactionSynchronizationManager.isActualTransactionActive()){ Lock finalLock = lock; String finalLockKey = lockKey; TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { @Override public void afterCompletion(int status) { lockedServiceConverters.getLockedService().setLock(finalLock).setRedissonClient(redissonClient, finalLockKey).unLock(); } }); } else{ lockedServiceConverters.getLockedService().setLock(lock).setRedissonClient(redissonClient, lockKey).unLock(); } } } return obj; } private Lock getLockFromLocks(Lock lock, LockType lockType, String lockKey) { // TODO 非Lock类锁可以不调用此方法 if (lock == null) { synchronized (LockAspect.class){ lock = locks.get(lockKey); if (lock == null) { if (lockType == LockType.ReentrantLock) { lock = new ReentrantLock(); locks.put(lockKey, lock); } } } } return lock; } public String getKeyBySpEL(String expressionStr, ProceedingJoinPoint joinPoint) { MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature(); String[] paramNames = discoverer.getParameterNames(methodSignature.getMethod()); Expression expression = parser.parseExpression(expressionStr); EvaluationContext context = new StandardEvaluationContext(); Object[] args = joinPoint.getArgs(); for (int i = 0; i < args.length; i++) { context.setVariable(paramNames[i], args[i]); } return expression.getValue(context) == null ? "" : expression.getValue(context).toString(); }}
1.其中locks
用于保存ReentrantLock类的锁,每个key一个锁对象,如果key为空,默认以方法名为key,意味着同一个方法共用一个锁对象。
2.lockedServiceConverters
为锁转换器,通过它可以获取容器当中使用的真正锁对象。3.redissonClient
对象在这里注入,通过lockedServiceConverters
获取的锁实现类将此锁对象注入到实现方法内部。4.SpelExpressionParser
和DefaultParameterNameDiscoverer
对象用于通过SpEL表达式动态从目标方法参数中获取锁的key。
5.@Around("@annotation(locked)")
这里使用一个环绕通知,拦截加了locked
注解的方法,进行锁操作。
具体执行流程为,拦截到方法:1.先获取到lockkey,然后根据lockkey获取锁对象。2.根据lockedServiceConverters拿到具体的锁实现类对象,根据锁对象类型以及time等参数,将需要的redissonClient,lock等参数传入,再调用锁方法实现,进行加锁操作。3.调用目标方法。4.目标方法调用完毕,进行解锁操作。这里判断一下是否还在一个事务当中,如果是的话,在事务完成之后再进行解锁。这块在后面再详细说明。
3.2 SpEL表达式获取动态key定义一个测试目标方法。key为#person.name
。
@Transactional(rollbackFor = Exception.class)@Locked(key = "#person.name", lockType = LockType.RedissonClient, time = 10, timeUnit = TimeUnit.MILLISECONDS) public void service(Person person) throws Exception { Thread.sleep(1000); log.info("业务方法"); }
再调用
Person person = new Person();person.setName("张三");lockTest.service(person);
准确获取到了张三
锁与事务结合为什么要单独拿出来讲?有什么问题吗?
具体可以看我的另一篇博文 https://juejin.cn/post/7213636024112234551 当transcational遇上synchronized。不管是synchronized还是Lock。道理是一样的。
简单说,spring使用动态代理加AOP实现事务管理。那么上面的方法实际上至少需要简化成3个步骤:
void begin();@Transactional(rollbackFor = Exception.class)public void service(Person person) throws Exception { try{ lock.lock(); } finally{ lock.unlock(); }}void commit();// void rollback();
如果在service()中向数据库insert/update一条数据,在serive()执行完毕锁释放之后,commit之前,有另一个线程拿到了锁开始执行service()方法,那么这时候它是读不到数据库里最新的记录的。除非事务隔离级别为读未提交。
但实际生产环境中,少有人使用读未提交这种隔离级别,为了避免上述的线程安全问题,就得借助事务同步器TransactionSynchronization
来实现。当线程中存在未完成的事务时,需要在afterCompletion
方法里释放锁。afterCompletion
表示事务完成,包括提交与回滚。
@Bean public LockedServiceConverters lockedServiceConverters() { return new LockedServiceConverters("com.nyp.test.service.LockedServiceImpl"); }
业务方法阻塞,锁获取超时时间为60秒。
@Transactional(rollbackFor = Exception.class) @Locked(key = "#person.name", lockType = LockType.ReentrantLock, time = 60000, timeUnit = TimeUnit.MILLISECONDS) public void service(Person person) throws Exception { Thread.sleep(10000000); log.info("业务方法"); }
先测试张三锁,再测试李四锁,再获取李四锁预期结果是张三获取到锁,李四因为是新的key所以也能获取锁,李四再次获取锁,因为之前李四获取的锁还没释放,所以一直阻塞获取不了锁。
执行日志:
[2023-04-18 19:02:17.639] INFO 53268 [http-nio-8657-exec-1] [com.nyp.test.service.LockAspect] : key = 张三[2023-04-18 19:02:17.640] INFO 53268 [http-nio-8657-exec-1] [com.nyp.test.service.LockAspect] : 张三 尝试获取锁 [2023-04-18 19:02:17.641] INFO 53268 [http-nio-8657-exec-1] [com.nyp.test.service.LockedServiceImpl] : 张三 获取到了锁[2023-04-18 19:02:20.280] INFO 53268 [http-nio-8657-exec-4] [com.nyp.test.service.LockAspect] : key = 李四[2023-04-18 19:02:20.281] INFO 53268 [http-nio-8657-exec-4] [com.nyp.test.service.LockAspect] : 李四 尝试获取锁 [2023-04-18 19:02:20.281] INFO 53268 [http-nio-8657-exec-4] [com.nyp.test.service.LockedServiceImpl] : 李四 获取到了锁[2023-04-18 19:02:22.181] INFO 53268 [http-nio-8657-exec-3] [com.nyp.test.service.LockAspect] : key = 李四[2023-04-18 19:02:22.181] INFO 53268 [http-nio-8657-exec-3] [com.nyp.test.service.LockAspect] : 李四 尝试获取锁 [2023-04-18 19:03:22.186] INFO 53268 [http-nio-8657-exec-3] [com.nyp.test.service.LockedServiceImpl] : 李四 获取锁超时
执行日志符合预期,李四第2次尝试获取锁,在60秒后超时失败。
4.2 RedissonClient测试@Bean public LockedServiceConverters lockedServiceConverters() { return new LockedServiceConverters("com.nyp.test.service.LockedRedissonServiceImpl"); }
还是上面的测试代码,结果符合预期。
[2023-04-18 19:10:47.895] INFO 117888 [http-nio-8657-exec-1] [com.nyp.test.service.LockAspect] : key = 张三[2023-04-18 19:10:47.896] INFO 117888 [http-nio-8657-exec-1] [com.nyp.test.service.LockAspect] : 张三 尝试获取锁 [2023-04-18 19:10:47.904] INFO 117888 [http-nio-8657-exec-1] [com.nyp.test.service.LockedRedissonServiceImpl] : 张三 获取到了锁[2023-04-18 19:10:50.555] INFO 117888 [http-nio-8657-exec-5] [com.nyp.test.service.LockAspect] : key = 李四[2023-04-18 19:10:50.556] INFO 117888 [http-nio-8657-exec-5] [com.nyp.test.service.LockAspect] : 李四 尝试获取锁 [2023-04-18 19:10:50.557] INFO 117888 [http-nio-8657-exec-5] [com.nyp.test.service.LockedRedissonServiceImpl] : 李四 获取到了锁[2023-04-18 19:10:55.882] INFO 117888 [http-nio-8657-exec-8] [com.nyp.test.service.LockAspect] : key = 李四[2023-04-18 19:10:55.883] INFO 117888 [http-nio-8657-exec-8] [com.nyp.test.service.LockAspect] : 李四 尝试获取锁 [2023-04-18 19:11:55.896] INFO 117888 [http-nio-8657-exec-8] [com.nyp.test.service.LockedRedissonServiceImpl] : 李四 获取锁超时
4.3 自定义锁测试使用LockedServiceUserDefine
@Bean public LockedServiceConverters lockedServiceConverters() { return new LockedServiceConverters("com.nyp.test.service.LockedServiceUserDefine"); }
@Transactional(rollbackFor = Exception.class) @Locked(key = "#person.name", lockType = LockType.ELSE, time = 60000, timeUnit = TimeUnit.MILLISECONDS) public void service(Person person) throws Exception { log.info("业务方法"); }
日志:
[2023-04-18 19:19:38.897] INFO 6264 [http-nio-8657-exec-1] [com.nyp.test.service.LockAspect] : key = 张三[2023-04-18 19:19:38.898] INFO 6264 [http-nio-8657-exec-1] [com.nyp.test.service.LockAspect] : 张三 尝试获取锁 [2023-04-18 19:19:38.898] INFO 6264 [http-nio-8657-exec-1] [com.nyp.test.service.LockedServiceUserDefine] : 锁住[2023-04-18 19:19:38.900] INFO 6264 [http-nio-8657-exec-1] [com.nyp.test.service.LockTest] : 业务方法[2023-04-18 19:19:38.901] INFO 6264 [http-nio-8657-exec-1] [com.nyp.test.service.LockedServiceUserDefine] : 解锁
这里只是证明容器执行了自定义的实现类。没有真正实现锁。
5.尾声 5.1 todo list1.将locks里不用的锁定期清除。2.getLockFromLocks()方法非Lock类的锁可不执行。3.重要
:在切面里,在事务同步器afterCompletion
后再释放锁,这里要考虑到事务的传播性,特别是嵌套事务的情况。这种情况下,会把锁的范围扩大。
在这里我们实现了一个声明式的锁,在注解里支持锁的粒度(key和SpEL动态key),支持定义获取锁的超时时间,默认支持ReentrantLock
和RedissonClient
两种锁。
也使用SPI支持自定义锁实现。
支持锁与事务的整合(考虑到事务的传播,或者叫有限支持)。
但是!!没经过严格的测试,特别是并发测试,不建议在生产环境中使用,仅作学习交流之用。希望能够对各位看官有所启发,欢迎留言讨论。