分布式锁实战

什么是锁

在单线程的应用程序中,如果存在多个线程可以同时更改一个变量A时,当一个进程更改后立马获取A值,A值是不可靠的,因为当你在你获取后的时候,还没进行相应的处理,这时候可能A值已经被其他进程更改了,(如超卖的问题,一个进程查找数据库中还有库存,正准备给用户订单写数据时,这时候另外一个进程已经将库存耗完了,这时候就会出现超卖现象),于是我们需要对资源加锁,保证一个时刻只能有一个进程访问该资源。

几种锁

1、线程锁: 该方式能够保证单个JVM中,同一时刻只有一个线程访问资源,主要通过在方法或者代码块上加锁,使用关键字Synchronized、ReentrantLcok,基本原理都是依靠线程之间共享内存实现的
2、进程锁: 控制同一个操作系统中多个进程访问一个共享资源,由于程序的独立性,只是因为程序的独立性,各个进程是无法控制其他进程对资源的访问的,但是可以使用本地系统的信号量控制(操作系统基本知识)
3、分布式锁: 当多个进程不在同一个系统中,用分布式锁控制多个进程对资源的访问。

分布式锁特点

1、互斥性:任意时刻,只能有一个客户端获取锁,不能同时有两个客户端获取到锁。
2、安全性:锁只能被持有该锁的客户端删除,不能由其它客户端删除。
3、死锁:获取锁的客户端因为某些原因(脱机等)而未能释放锁,其它客户端再也无法获取到该锁。
4、容错:当部分节点脱机时,客户端仍然能够获取锁和释放锁。

分布式锁的解决方案

1. 数据库实现方式

1.1 数据库乐观锁

乐观锁:假设不会发生并发冲突,只在提交操作时检查是否违反数据完整性。字面意思就是乐观的认为数据不会被修改,在提交的时候会判断一下在此期间有没有进程更新了数据。适用于多读少写入的应用场景。

一般有两种方式:使用数据库版本(Version)记录机制实现、使用时间戳。
(1)、使用数据版本(Version)记录机制实现,这是乐观锁最常用的一种实现方式。何谓数据版本?即为数据增加一个版本标识,一般是通过为数据库表增加一个数字类型的 “version” 字段来实现。当读取数据时,将version字段的值一同读出,数据每更新一次,对此version值加一。当我们提交更新的时候,判断数据库表对应记录的当前版本信息与第一次取出来的version值进行比对,如果数据库表当前版本号与第一次取出来的version值相等,则予以更新,否则认为是过期数据。
(2)、使用时间戳(timestamp)。乐观锁定的第二种实现方式和第一种差不多,同样是在需要乐观锁控制的table中增加一个字段,名称无所谓,字段类型使用时间戳(timestamp), 和上面的version类似,也是在更新提交的时候检查当前数据库中数据的时间戳和自己更新前取到的时间戳进行对比,如果一致则OK,否则就是版本冲突。

1.2 数据库悲观锁

悲观锁的悲观在于他认为本次操作会发生并发冲突,所以一开始就对商品加上锁(SELECT … FOR UPDATE),然后就可以安心的做判断和更新,因为这时候不会有别人更新这条商品库存。

2. Redis分布式锁实现方式

2.1 redisson实现

redisson实现的锁有很多种如:可重入锁、公平锁、联锁(MultiLock)、红锁(RedLock)
常见的就是可重入锁、公平锁

下面说一下就具体说说分布式锁的案例

场景:模拟用户秒杀商品,用户提交订单后,商品库存会减一,订单表会添加一条数据,如果增加的订单的数量与商品库存量总和与未秒杀之前商品总数量一致,则是我们期望的结果,反之就是超卖的现象(需要我们解决的)

2.1.1 模拟单服务下不使用任何锁会出现的超卖现象

代码结构

Model层

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Entity
@Data
@Table(name = "sysorder")
public class Order {

@Id
@GeneratedValue
private int id;
private String name;
private int ordercount;
}

@Entity
@Table(name = "sysproduct")
@Data
public class Product {

@Id
@GeneratedValue
private int id;
private String name;
private int count;
}

Dao层(使用jpa)

1
2
3
4
5
6
7
8
9
@Repository
public interface IOrderDao extends JpaRepository<Order,Integer>{

}

@Repository
public interface IProductDao extends JpaRepository<Product, Integer> {

}

Service层

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@Service
@Transactional
public class OrderServiceImpl implements IOrderService{

@Autowired
private IOrderDao orderDao;

@Override
public int getAllOrderNum() {
return (int) orderDao.count();
}

@Override
public void saveOrder(Order order) {
orderDao.save(order);
}
}


@Service
@Transactional
public class ProductServiceImpl implements IProductService{

@Autowired
private IProductDao productDao;

@Override
public boolean consume() {
Product product = productDao.getOne(1);
if(product.getCount()-1 >= 0){
product.setCount(product.getCount()-1);
productDao.save(product);
return true;
}else {
return false;
}
}

@Override
public int getProductNum() {
return productDao.getOne(1).getCount();
}
}

Control层

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@RestController
@RequestMapping(value = "/test")
public class TestControl {


@Autowired
private IProductService productService;

@Autowired
private IOrderService orderService;

@RequestMapping(value = "/consume")
public Object consume() {
if(productService.consume()) {
Order order = new Order();
order.setOrdercount(1);
order.setName("A - " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " - 购买的");
orderService.saveOrder(order);
return "购买成功!";
}else {
return "商品卖完了!";
}
}

@RequestMapping(value = "/info")
public Object info() {
int allOrderNum = orderService.getAllOrderNum();
int productNum = productService.getProductNum();
return "库存剩余:" + productNum + "成功下单数量:" + allOrderNum;
}
}

接下来我们使用Apache24中的abs模拟并发请求,启动项目后,数据库中添加一条数据(初始化sysproduct商品表 ;id:1,count:100,name:冰箱)
进入Apache24>bin下执行,ab -n 100 -c 10 http://127.0.0.1:8081/test/consume/ (每次并发数为10个,循环100次)

并发请求执行之前 Alt text

并发请求执行之后 Alt text

显然出现了超卖的现象,那么怎么解决这个问题呢,单服务下很简单,保证每次只有一个线程能够访问这个方法就行了,在control层的consume方法上加上synchronized

2.1.2 解决单服务下出现的超卖现象

修改control层的consume方法

1
2
3
4
5
6
7
8
9
10
11
12
@RequestMapping(value = "/consume")
public synchronized Object consume() {
if(productService.consume()) {
Order order = new Order();
order.setOrdercount(1);
order.setName("A - " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " - 购买的");
orderService.saveOrder(order);
return "购买成功!";
}else {
return "商品卖完了!";
}
}

初始化sysproduct商品表 (id:1,count:100,name:冰箱 update sysproduct set count=100; delete from sysorder;)
进入Apache24>bin下执行, ab -n 100 -c 10 http://127.0.0.1:8081/test/consume/(每次并发数为10个,循环100次)

并发请求执行之前 Alt text

并发请求执行之后 Alt text

2.1.3 集群下超卖现象

集群环境下 仅仅靠synchronized锁是无法保证不出现超卖现象的
我们拷贝一份上面的项目(修改端口号为8082),启动两个项目后,我们需要配置负载均衡,以apache为例(apache端口为80)

1
2
3
4
5
6
7
8
ProxyRequests Off

<Proxy balancer://mycluster/>
BalancerMember "http://127.0.0.1:8081/"
BalancerMember "http://127.0.0.1:8082/"
</Proxy>
ProxyPass "/" "balancer://mycluster/"
ProxyPassReverse "/" "balancer://mycluster/"

配置好后,通过访问地址 http://localhost/test/info 可以随机访问到A项目/B项目
接下来进行测试,初始化sysproduct商品表 (id:1,count:100,name:冰箱 update sysproduct set count=100; delete from sysorder;)
进入Apache24>bin下执行, ab -n 100 -c 10 http://127.0.0.1/test/consume/(每次并发数为10个,循环100次)

并发请求执行之前 Alt text

并发请求执行之后 Alt text

显然出现了超卖的现象,那么怎么解决这个问题呢,集群下我们需要分布式锁来解决这个问题,这里介绍一种方案(redisson)

2.1.4 解决集群下超卖现象 - redisson

项目添加pom依赖:

1
2
3
4
5
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.9.1</version>
</dependency>

定义一个分布式锁的接口 Locker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65

import java.util.concurrent.TimeUnit;

public interface Locker {

/**
* 获取锁,如果锁不可用,则当前线程处于休眠状态,直到获得锁为止。
*
* @param lockKey
*/
void lock(String lockKey);

/**
* 释放锁
*
* @param lockKey
*/
void unlock(String lockKey);

/**
* 获取锁,如果锁不可用,则当前线程处于休眠状态,直到获得锁为止。如果获取到锁后,执行结束后解锁或达到超时时间后会自动释放锁
*
* @param lockKey
* @param timeout
*/
void lock(String lockKey, int timeout);

/**
* 获取锁,如果锁不可用,则当前线程处于休眠状态,直到获得锁为止。如果获取到锁后,执行结束后解锁或达到超时时间后会自动释放锁
*
* @param lockKey
* @param unit
* @param timeout
*/
void lock(String lockKey, TimeUnit unit, int timeout);

/**
* 尝试获取锁,获取到立即返回true,未获取到立即返回false
*
* @param lockKey
* @return
*/
boolean tryLock(String lockKey);

/**
* 尝试获取锁,在等待时间内获取到锁则返回true,否则返回false,如果获取到锁,则要么执行完后程序释放锁,
* 要么在给定的超时时间leaseTime后释放锁
*
* @param lockKey
* @param waitTime
* @param leaseTime
* @param unit
* @return
*/
boolean tryLock(String lockKey, long waitTime, long leaseTime, TimeUnit unit)
throws InterruptedException;

/**
* 锁是否被任意一个线程锁持有
*
* @param lockKey
* @return
*/
boolean isLocked(String lockKey);
}

锁的实现 RedissonLocker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64

import java.util.concurrent.TimeUnit;

import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;

import com.example.demo.utils.lock.tools.Locker;

public class RedissonLocker implements Locker{

private RedissonClient redissonClient;

public RedissonLocker(RedissonClient redissonClient) {
super();
this.redissonClient = redissonClient;
}

@Override
public void lock(String lockKey) {
RLock lock = redissonClient.getLock(lockKey);
lock.lock();
}

@Override
public void unlock(String lockKey) {
RLock lock = redissonClient.getLock(lockKey);
lock.unlock();
}

@Override
public void lock(String lockKey, int leaseTime) {
RLock lock = redissonClient.getLock(lockKey);
lock.lock(leaseTime, TimeUnit.SECONDS);
}

@Override
public void lock(String lockKey, TimeUnit unit, int timeout) {
RLock lock = redissonClient.getLock(lockKey);
lock.lock(timeout, unit);
}

public void setRedissonClient(RedissonClient redissonClient) {
this.redissonClient = redissonClient;
}

@Override
public boolean tryLock(String lockKey) {
RLock lock = redissonClient.getLock(lockKey);
return lock.tryLock();
}

@Override
public boolean tryLock(String lockKey, long waitTime, long leaseTime,
TimeUnit unit) throws InterruptedException{
RLock lock = redissonClient.getLock(lockKey);
return lock.tryLock(waitTime, leaseTime, unit);
}

@Override
public boolean isLocked(String lockKey) {
RLock lock = redissonClient.getLock(lockKey);
return lock.isLocked();
}
}

工具类 LockUtil

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84

import java.util.concurrent.TimeUnit;

import com.example.demo.utils.lock.tools.Locker;

public class LockUtil {

private static Locker locker;

/**
* 设置工具类使用的locker
* @param locker
*/
public static void setLocker(Locker locker) {
LockUtil.locker = locker;
}

/**
* 获取锁
* @param lockKey
*/
public static void lock(String lockKey) {
locker.lock(lockKey);
}

/**
* 释放锁
* @param lockKey
*/
public static void unlock(String lockKey) {
locker.unlock(lockKey);
}

/**
* 获取锁,超时释放
* @param lockKey
* @param timeout
*/
public static void lock(String lockKey, int timeout) {
locker.lock(lockKey, timeout);
}

/**
* 获取锁,超时释放,指定时间单位
* @param lockKey
* @param unit
* @param timeout
*/
public static void lock(String lockKey, TimeUnit unit, int timeout) {
locker.lock(lockKey, unit, timeout);
}

/**
* 尝试获取锁,获取到立即返回true,获取失败立即返回false
* @param lockKey
* @return
*/
public static boolean tryLock(String lockKey) {
return locker.tryLock(lockKey);
}

/**
* 尝试获取锁,在给定的waitTime时间内尝试,获取到返回true,获取失败返回false,获取到后再给定的leaseTime时间超时释放
* @param lockKey
* @param waitTime
* @param leaseTime
* @param unit
* @return
* @throws InterruptedException
*/
public static boolean tryLock(String lockKey, long waitTime, long leaseTime,
TimeUnit unit) throws InterruptedException {
return locker.tryLock(lockKey, waitTime, leaseTime, unit);
}

/**
* 锁释放被任意一个线程持有
* @param lockKey
* @return
*/
public static boolean isLocked(String lockKey) {
return locker.isLocked(lockKey);
}
}

配置项 RedissonConfig

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.example.demo.utils.lock.LockUtil;
import com.example.demo.utils.lock.tools.impl.RedissonLocker;

@Configuration
public class RedissonConfig {

@Autowired
private RedissonClient redissonClient;

@Bean
public RedissonLocker redissonLocker(){
RedissonLocker locker = new RedissonLocker(redissonClient);
//设置LockUtil的锁处理对象
LockUtil.setLocker(locker);
return locker;
}
}

yml配置文件 application.yml

1
2
3
4
spring:
redis:
host: 127.0.0.1
port: 6379

一切配置好后,通过访问地址 http://localhost/test/info 可以随机访问到A项目/B项目
接下来进行测试,初始化sysproduct商品表 (id:1,count:100,name:冰箱 update sysproduct set count=100; delete from sysorder;)
进入Apache24>bin下执行, ab -n 100 -c 10 http://127.0.0.1/test/consume/(每次并发数为10个,循环100次)

并发请求执行之前 Alt text

并发请求执行之后 Alt text

从数量上看达到了我们的期望,没有出现超卖的现象,分布式锁生效了。

2.1.5 解决集群下超卖现象 - redisson(升级版 - 封装分布式锁为注解方式)

新建自定义注解类 SelfLock.java

1
2
3
4
5
6
7
8
9
10
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface SelfLock {

}

新建注解解析类 LockAspect.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.stereotype.Component;
import com.example.demo.utils.lock.LockUtil;
import lombok.extern.slf4j.Slf4j;

@Aspect
@Component
@Slf4j
public class LockAspect {

private final static String localkey = "locakkey";

@Pointcut("@annotation(com.example.demo.anno.SelfLock)")
private void cut() {
}

@Around("cut()")
public void advice(ProceedingJoinPoint joinPoint) throws Throwable {
LockUtil.lock(localkey);
try{
joinPoint.proceed();
}catch(Exception e){
log.error(e.getMessage(),e);
}finally{
LockUtil.unlock(localkey);
}
}
}

将 Control中需要加锁的代码改写成如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
 @RequestMapping(value = "/consume")
@SelfLock
public synchronized Object consume() {
Object obj = null;
if(productService.consume()) {
Order order = new Order();
order.setOrdercount(1);
order.setName("B - " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " - 购买的");
orderService.saveOrder(order);
obj = "购买成功!";
}else {
obj = "商品卖完了!";
}
return obj;
}

一切配置好后,通过访问地址 http://localhost/test/info 可以随机访问到A项目/B项目
接下来进行测试,初始化sysproduct商品表 (id:1,count:100,name:冰箱 update sysproduct set count=100; delete from sysorder;)
进入Apache24>bin下执行, ab -n 100 -c 10 http://127.0.0.1/test/consume/(每次并发数为10个,循环100次)

并发请求执行之后 Alt text

从数量上看达到了我们的期望,没有出现超卖的现象,自定义注解方式的分布式锁生效了。

这里需要注意的是,如果将consume()中的代码块使用TestControl的另外一个A方法去实现,然后在A方法上加上自定义注解@SelfLock,在consume()方法中调用A方法,注解是不生效的,如:

1
2
3
4
5
6
7
8
9
10
11
12

@RequestMapping(value = "/consume")
public void consume(){
A()
}

@SelfLock
public void A(){
...
一系列需要加同步锁的代码块
...
}

上面的这种代码是注解是不生效的,同步锁失效,主要是因为spring 在扫描bean的时候会扫描方法上是否包含@SelfLock注解,如果包含,spring会为这个bean动态地生成一个代理类(Spring AOP中强制使用CglibAopProxy代理:接口的实现类,还有一种是JdkDynamicAopProxy:子类),当这个方法被调用的时候,实际上是由代理类来调用的,代理类在调用之前就会执行切面的方法,然而,如果这个注解是同一个类中的其他方法调用的,那么该方法的调用是不使用代理类的,直接通过原来的那个bean,自然就不会执行切面中的方法。

3. ZooKeeper分布式锁

该方式以后接触到了再更新吧!

总结

分布式的CAP理论告诉我们,任何一个分布式系统都无法同时满足一致性(Consistency)、可用性(Availability)和分区容错性(Partition tolerance),最多只能同时满足两项。