Zookeeper分布式锁案例
案例场景:
- 商品表
- 商品库存表
- 抢购商品下单
Zookeeper概述
Zookeeper是一款开源的分布式服务协调中间件,是由雅虎团队研发而来,其设计的初衷是开发一个通用的,无单点问题的分布式协调框架,采用统一的协调管理方式更好地管理各个子系统,从而让开发者将更多的经理集中在业务逻辑处理上。最终整个分布式系统看上去就想是一个大型的动物园,而这个中间件正好用来协调分布式环境中的各个子系统,zookeeper因此而得名
Zookeeper可以做什么
官网:官方文档
- 统一配置管理:将每个子系统都需要配置的文件统一放到zookeeper中的znode节点中。
- 统一命名服务:通过给存放在znode上的资源进行统一命名,各个子系统便可以通过名字获取到节点上响应的资源。
- 分布式锁:通过创建于该共享资源相关的”顺序临时节点“与动态watcher监听机制,而从监控多线程对共享资源的并发访问。(排队取号的机制,如果不明白可以百度找一些文章,其实更加建议结合redis去理解会更好)
- 为什么一定是:顺序临时节点,因为可以保证每个线程执行的都是唯一的。
- watcher机制机制:把最小的序号删除以后,一定要把List第一个元素给删除,让其他的线程继续获取锁。
- 集群状态:通过动态地感知节点的增加,删除,从而保证集群下的相关节点数据主,副本数据的一致。
Zookeeper分布式锁流程
zookeeper实现分布式锁主要是通过创建与共享资源相关的:“顺序临时节点” 并采用其提供的Watcher监听机制,控制多线程对共享资源的并发访问,整体如下:
SpringBoot整合Zookeeper
zookeeper下载地址:下载地址
1、依赖
<!--zookeeper--><dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.6.3</version> <exclusions> <exclusion> <artifactId>slf4j-log4j12</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusions></dependency><!-- https://mvnrepository.com/artifact/org.apache.curator/curator-framework --><dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.3.0</version></dependency><dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.3.0</version></dependency>
2、配置
#zookeeper配置zk.host=127.0.0.1:2181zk.namespace=pug_middle_lock
3、配置初始化
/** * 通用化配置 **/@Configurationpublic class CuratorFrameworkConfiguration {
@Value("${zk.host}") private String host; @Value("${zk.namespace}") private String namespace;
// 自定义注入Bean-ZooKeeper高度封装过的客户端Curator实例 @Bean public CuratorFramework curatorFramework() { //创建CuratorFramework实例 //(1)创建的方式是采用工厂模式进行创建; //(2)指定了客户端连接到ZooKeeper服务端的策略:这里是采用重试的机制(5次,每次间隔1s) CuratorFramework curatorFramework = CuratorFrameworkFactory.builder() .connectString(host).namespace(namespace) .retryPolicy(new RetryNTimes(5, 1000)).build(); curatorFramework.start(); //返回CuratorFramework实例 return curatorFramework; }
}
4、用户注册实现分布式锁
/** * 处理用户注册信息提交服务Service **/@Servicepublic class UserRegService { //定义日志实例 private static final Logger log= LoggerFactory.getLogger(UserRegService.class); //定义用户注册Mapper操作接口实例 @Autowired private UserRegMapper userRegMapper;
//定义ZooKeeper客户端CuratorFramework实例 @Autowired private CuratorFramework client; //ZooKeeper分布式锁的实现原理是由ZNode节点的创建与删除跟监听机制构成的 //而ZNoe节点将对应一个具体的路径-跟Unix文件夹路径类似-需要以 / 开头 private static final String pathPrefix="/middleware/zkLock/";
/** * 处理用户提交注册的请求-加ZooKeeper分布式锁 * @param dto * @throws Exception */ public void userRegWithZKLock(UserRegDto dto) throws Exception{ //创建ZooKeeper互斥锁组件实例,需要将监控用的客户端实例、精心构造的共享资源 作为构造参数 InterProcessMutex mutex=new InterProcessMutex(client,pathPrefix+dto.getUserName()+"-lock"); try { //采用互斥锁组件尝试获取分布式锁-其中尝试的最大时间在这里设置为10s //当然,具体的情况需要根据实际的业务而定 if (mutex.acquire(10L, TimeUnit.SECONDS)){ //TODO:真正的核心处理逻辑
//根据用户名查询用户实体信息 UserReg reg=userRegMapper.selectByUserName(dto.getUserName()); //如果当前用户名还未被注册,则将当前用户信息注册入数据库中 if (reg==null){ log.info("---加了ZooKeeper分布式锁---,当前用户名为:{} ",dto.getUserName()); //创建用户注册实体信息 UserReg entity=new UserReg(); //将提交的用户注册请求实体信息中对应的字段取值 //复制到新创建的用户注册实体的相应字段中 BeanUtils.copyProperties(dto,entity); //设置注册时间 entity.setCreateTime(new Date()); //插入用户注册信息 userRegMapper.insertSelective(entity);
}else { //如果用户名已被注册,则抛出异常 throw new Exception("用户信息已经存在!"); } }else{ throw new RuntimeException("获取ZooKeeper分布式锁失败!"); } }catch (Exception e){ throw e; }finally { //TODO:不管发生何种情况,在处理完核心业务逻辑之后,需要释放该分布式锁 mutex.release(); } }
}
5、实现商品抢购扣减库存
-- ------------------------------ Table structure for book_rob-- ----------------------------DROP TABLE IF EXISTS `book_rob`;CREATE TABLE `book_rob` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键', `user_id` int(11) NOT NULL COMMENT '用户id', `book_no` varchar(255) NOT NULL COMMENT '商品编号', `rob_time` datetime DEFAULT NULL COMMENT '抢购时间', PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=153 DEFAULT CHARSET=utf8 COMMENT='商品抢购记录';
-- ------------------------------ Records of book_rob-- ----------------------------INSERT INTO `book_rob` VALUES ('2', '10010', 'BS20190421001', '2019-04-22 22:49:05');INSERT INTO `book_rob` VALUES ('147', '10040', 'BS20190421001', '2019-04-22 23:28:05');INSERT INTO `book_rob` VALUES ('148', '10042', 'BS20190421001', '2019-04-22 23:28:05');INSERT INTO `book_rob` VALUES ('149', '10041', 'BS20190421001', '2019-04-22 23:28:05');INSERT INTO `book_rob` VALUES ('150', '10045', 'BS20190421001', '2019-04-22 23:28:05');INSERT INTO `book_rob` VALUES ('151', '10043', 'BS20190421001', '2019-04-22 23:28:05');INSERT INTO `book_rob` VALUES ('152', '10044', 'BS20190421001', '2019-04-22 23:28:05');
-- ------------------------------ Table structure for book_stock-- ----------------------------DROP TABLE IF EXISTS `book_stock`;CREATE TABLE `book_stock` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键', `book_no` varchar(255) NOT NULL COMMENT '商品编号', `stock` int(255) NOT NULL COMMENT '库存', `is_active` tinyint(255) DEFAULT '1' COMMENT '是否上架(1=是;0=否)', PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8 COMMENT='商品库存表';
-- ------------------------------ Records of book_stock-- ----------------------------INSERT INTO `book_stock` VALUES ('1', 'BS20190421001', '4', '1');
1、bean
//商品抢购记录实体@Data@ToStringpublic class BookRob { private Integer id; //主键id private Integer userId;//用户id private String bookNo; //商品编号 private Date robTime; //抢购时间}
//商品库存实体@Data@ToStringpublic class BookStock { private Integer id; //主键Id private String bookNo;//商品编号 private Integer stock;//存库 private Byte isActive;//是否上架}
2、mapper
//商品抢购成功的记录实体Mapper操作接口public interface BookRobMapper { //插入抢购成功的记录信息 int insertSelective(BookRob record); //统计每个用户每本书的抢购数量 //用于判断用户是否抢购过该商品 int countByBookNoUserId(@Param("userId") Integer userId,@Param("bookNo") String bookNo);}
//商品库存实体操作接口Mapperpublic interface BookStockMapper { //根据商品编号查询 BookStock selectByBookNo(@Param("bookNo") String bookNo); //更新商品库存-不加锁 int updateStock(@Param("bookNo") String bookNo); //更新商品库存-加锁 int updateStockWithLock(@Param("bookNo") String bookNo);}
3、service
/** * 商品抢购服务 **/@Servicepublic class BookRobService { //定义日志实例 private static final Logger log= LoggerFactory.getLogger(BookRobService.class); //定义商品库存实体操作接口Mapper实例 @Autowired private BookStockMapper bookStockMapper; //定义商品抢购实体操作接口Mapper实例 @Autowired private BookRobMapper bookRobMapper;
//定义ZooKeeper客户端CuratorFramework实例 @Autowired private CuratorFramework client; //ZooKeeper分布式锁的实现原理是由ZNode节点的创建与删除跟监听机制构成的 //而ZNoe节点将对应一个具体的路径-跟Unix文件夹路径类似-需要以 / 开头 private static final String pathPrefix="/middleware/zkLock/";
/** * 处理商品抢购逻辑-不加分布式锁 * @param dto * @throws Exception */ @Transactional(rollbackFor = Exception.class) public void robWithNoLock(BookRobDto dto) throws Exception{ //根据商品编号查询记录 BookStock stock=bookStockMapper.selectByBookNo(dto.getBookNo()); //统计每个用户每本书的抢购数量 int total=bookRobMapper.countByBookNoUserId(dto.getUserId(),dto.getBookNo());
//商品记录存在、库存充足,而且用户还没抢购过本书,则代表当前用户可以抢购 if (stock!=null && stock.getStock()>0 && total<=0){ log.info("---处理商品抢购逻辑-不加分布式锁---,当前信息:{} ",dto);
//当前用户抢购到商品,库存减一 int res=bookStockMapper.updateStock(dto.getBookNo()); //更新库存成功后,需要添加抢购记录 if (res>0){ //创建商品抢购记录实体信息 BookRob entity=new BookRob(); //将提交的用户抢购请求实体信息中对应的字段取值 //复制到新创建的商品抢购记录实体的相应字段中 BeanUtils.copyProperties(dto,entity); //设置抢购时间 entity.setRobTime(new Date()); //插入用户注册信息 bookRobMapper.insertSelective(entity); } }else { //如果不满足上述的任意一个if条件,则抛出异常 throw new Exception("该商品库存不足!"); } }
/** * 处理商品抢购逻辑-加ZooKeeper分布式锁 * @param dto * @throws Exception */ @Transactional(rollbackFor = Exception.class) public void robWithZKLock(BookRobDto dto) throws Exception{ //创建ZooKeeper互斥锁组件实例,需要将CuratorFramework实例、精心构造的共享资源 作为构造参数 InterProcessMutex mutex=new InterProcessMutex(client,pathPrefix+dto.getBookNo()+dto.getUserId()+"-lock"); try { //采用互斥锁组件尝试获取分布式锁-其中尝试的最大时间在这里设置为15s //当然,具体的情况需要根据实际的业务而定 if (mutex.acquire(15L, TimeUnit.SECONDS)){ //TODO:真正的核心处理逻辑
//根据商品编号查询记录 BookStock stock=bookStockMapper.selectByBookNo(dto.getBookNo()); //统计每个用户每本书的抢购数量 int total=bookRobMapper.countByBookNoUserId(dto.getUserId(),dto.getBookNo());
//商品记录存在、库存充足,而且用户还没抢购过本书,则代表当前用户可以抢购 if (stock!=null && stock.getStock()>0 && total<=0){ log.info("---处理商品抢购逻辑-加ZooKeeper分布式锁---,当前信息:{} ",dto);
//当前用户抢购到商品,库存减一 int res=bookStockMapper.updateStock(dto.getBookNo()); //更新库存成功后,需要添加抢购记录 if (res>0){ //创建商品抢购记录实体信息 BookRob entity=new BookRob(); //将提交的用户抢购请求实体信息中对应的字段取值 //复制到新创建的商品抢购记录实体的相应字段中 entity.setUserId(dto.getUserId()); entity.setBookNo(dto.getBookNo()); //设置抢购时间 entity.setRobTime(new Date()); //插入用户注册信息 bookRobMapper.insertSelective(entity); } }else { //如果不满足上述的任意一个if条件,则抛出异常 throw new Exception("该商品库存不足!"); }
}else{ throw new RuntimeException("获取ZooKeeper分布式锁失败!"); } }catch (Exception e){ throw e; }finally { //TODO:不管发生何种情况,在处理完核心业务逻辑之后,需要释放该分布式锁 mutex.release(); } }}
4、controller
/** * 商品抢购Controller * @Author:debug (SteadyJack) * @Date: 2019/4/21 23:31 **/@RestControllerpublic class BookRobController { //定义日志 private static final Logger log= LoggerFactory.getLogger(BookRobController.class); //定义请求前缀 private static final String prefix="book/rob"; //定义核心逻辑处理服务类 @Autowired private BookRobService bookRobService;
/** * 用户抢购商品请求 * @param dto * @return */ @RequestMapping(value = prefix+"/request",method = RequestMethod.GET) public BaseResponse takeMoney(BookRobDto dto){ if (Strings.isNullOrEmpty(dto.getBookNo()) || dto.getUserId()==null || dto.getUserId()<=0){ return new BaseResponse(StatusCode.InvalidParams); } BaseResponse response=new BaseResponse(StatusCode.Success); try { //加ZooKeeper分布式锁的情况 bookRobService.robWithZKLock(dto);
}catch (Exception e){ response=new BaseResponse(StatusCode.Fail.getCode(),e.getMessage()); } return response; }}