商城系统中,抢购和秒杀是很常见的营销场景,在一定时间内有大量的用户访问商场下单,主要需要解决的问题有两个:
1.高并发对数据库产生的压力;
2.竞争状态下如何解决商品库存超卖;
高并发对数据库产生的压力
对于第一个问题,使用缓存来处理,避免直接操作数据库,例如使用Redis。
竞争状态下如何解决商品库存超卖
对于第二个问题,需要重点说明。
常规写法:查询出对应商品的库存,判断库存数量否大于0,然后执行生成订单等操作,但是在判断库存是否大于0处,如果在高并发下就会有问题,导致库存量出现负数。
测试表 sql
把如下表数据导入到数据库中
/*
Navicat MySQL Data Transfer
Source Server : 01 本地localhost
Source Server Version : 50553
Source Host : localhost:3306
Source Database : test
Target Server Type : MYSQL
Target Server Version : 50553
File Encoding : 65001
Date: 2020-11-06 14:31:35
*/
SET FOREIGN_KEY_CHECKS=0;
-- ----------------------------
-- Table structure for products
-- ----------------------------
DROP TABLE IF EXISTS `products`;
CREATE TABLE `products` (
`id` int(10) NOT NULL AUTO_INCREMENT COMMENT 'ID',
`title` varchar(50) DEFAULT NULL COMMENT '货品名称',
`store` int(11) DEFAULT '0' COMMENT '货品库存',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COMMENT='货品表';
-- ----------------------------
-- Records of products
-- ----------------------------
INSERT INTO `products` VALUES ('1', '稻花香大米', '20');
-- ----------------------------
-- Table structure for order_log
-- ----------------------------
DROP TABLE IF EXISTS `order_log`;
CREATE TABLE `order_log` (
`id` int(10) unsigned NOT NULL AUTO_INCREMENT,
`content` varchar(255) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '日志内容',
`c_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间',
PRIMARY KEY (`id`)
) ENGINE=MyISAM DEFAULT CHARSET=latin1;
-- ----------------------------
-- Table structure for order
-- ----------------------------
DROP TABLE IF EXISTS `order`;
CREATE TABLE `order` (
`oid` bigint(20) unsigned NOT NULL DEFAULT '0' COMMENT '订单号',
`product_id` int(11) DEFAULT '0' COMMENT '商品ID',
`number` int(11) DEFAULT '0' COMMENT '购买数量',
`c_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间',
PRIMARY KEY (`oid`)
) ENGINE=MyISAM DEFAULT CHARSET=latin1 COMMENT='订单表';
下单处理代码
<?php
db();
global $con;
//step1 接收下单参数
$product_id = 1;// 商品ID
$buy_num = 1;// 购买数量
//step2 查询商品信息
$sql = "select * from products where id={$product_id}";
$result = mysqli_query($con, $sql);
$row = mysqli_fetch_assoc($result);
//step3 判断商品下单数量是否大于商品库存数量
//此处在高并发下,可能出现上一个下单后还没来得及更新库存,下一个下单判断库存数不是最新的库存
if ($row['store'] > 0) {
sleep(1);
//step4 更新商品库存数量(减去下单数量)
$sql = "update products set store=store-{$buy_num} where id={$product_id}";
if (mysqli_query($con, $sql)) {
echo "更新成功";
//step5 生成订单号创建订单
$oid = build_order_no();
create_order($oid, $product_id, $buy_num);
insertLog('库存减少成功,下单成功');
} else {
echo "更新失败";
insertLog('库存减少失败');
}
} else {
echo "没有库存";
insertLog('库存不够');
}
function db()
{
global $con;
$con = new mysqli('localhost','root','root','test');
if (!$con) {
echo "数据库连接失败";
}
}
/**
* 生成唯一订单号
*/
function build_order_no()
{
return date('Ymd') . str_pad(mt_rand(1, 99999), 5, '0', STR_PAD_LEFT);
}
function create_order($oid, $product_id, $number)
{
global $con;
$sql = "INSERT INTO `order` (oid, product_id, number) values('$oid', '$product_id', '$number')";
mysqli_query($con, $sql);
}
/**
* 记录日志
*/
function insertLog($content)
{
global $con;
$sql = "INSERT INTO `order_log` (content) values('$content')";
mysqli_query($con, $sql);
}
将库存字段字段设为unsigned
因为库存字段不能为负数,在下单后更新商品库存时,如果出现负数将返回false
<?php
db();
global $con;
//step1 接收下单参数
$product_id = 1;// 商品ID
$buy_num = 1;// 购买数量
//step2 查询商品信息
$sql = "select * from products where id={$product_id} for UPDATE";//利用for update 开启行锁
$result = mysqli_query($con, $sql);
$row = mysqli_fetch_assoc($result);
//step3 判断商品下单数量是否大于商品库存数量
if ($row['store'] > 0) {
sleep(1);
//step4 更新商品库存数量(减去下单数量)
$sql = "update products set store=store-{$buy_num} where id={$product_id}";
if (mysqli_query($con, $sql)) {
echo "更新成功";
//step5 生成订单号创建订单
$oid = build_order_no();
create_order($oid, $product_id, $buy_num);
insertLog('库存减少成功,下单成功');
} else {
// 如果出现负数将返回false
echo "更新失败";
insertLog('库存减少失败');
}
} else {
//商品已经抢购完
echo "没有库存";
insertLog('库存不够');
}
function db()
{
global $con;
$con = new mysqli('localhost','root','root','test');
if (!$con) {
echo "数据库连接失败";
}
}
/**
* 生成唯一订单号
*/
function build_order_no()
{
return date('Ymd') . str_pad(mt_rand(1, 99999), 5, '0', STR_PAD_LEFT);
}
function create_order($oid, $product_id, $number)
{
global $con;
$sql = "INSERT INTO `order` (oid, product_id, number) values('$oid', '$product_id', '$number')";
mysqli_query($con, $sql);
}
/**
* 记录日志
*/
function insertLog($content)
{
global $con;
$sql = "INSERT INTO `order_log` (content) values('$content')";
mysqli_query($con, $sql);
}
使用mysql的事务,锁住操作的行
在下单处理过程中,使用mysql的事务将正在下单商品行数据锁定
<?php
db();
global $con;
//step1 接收下单参数
$product_id = 1;// 商品ID
$buy_num = 1;// 购买数量
mysqli_query($con, "BEGIN"); //开始事务
//step2 查询商品信息
$sql = "select * from products where id={$product_id} for UPDATE";//利用for update 开启行锁
$result = mysqli_query($con, $sql);
$row = mysqli_fetch_assoc($result);
//step3 判断商品下单数量是否大于商品库存数量
if ($row['store'] > 0) {
sleep(1);
//step4 更新商品库存数量(减去下单数量)
$sql = "update products set store=store-{$buy_num} where id={$product_id}";
if (mysqli_query($con, $sql)) {
echo "更新成功";
//step5 生成订单号创建订单
$oid = build_order_no();
create_order($oid, $product_id, $buy_num);
insertLog('库存减少成功,下单成功');
mysqli_query($con, "COMMIT");//事务提交即解锁
} else {
echo "更新失败";
insertLog('库存减少失败');
mysqli_query($con, "ROLLBACK");//事务回滚即解锁
}
} else {
//商品已经抢购完
echo "没有库存";
insertLog('库存不够');
mysqli_query($con, "ROLLBACK");//事务回滚即解锁
}
function db()
{
global $con;
$con = new mysqli('localhost','root','root','test');
if (!$con) {
echo "数据库连接失败";
}
}
/**
* 生成唯一订单号
*/
function build_order_no()
{
return date('Ymd') . str_pad(mt_rand(1, 99999), 5, '0', STR_PAD_LEFT);
}
function create_order($oid, $product_id, $number)
{
global $con;
$sql = "INSERT INTO `order` (oid, product_id, number) values('$oid', '$product_id', '$number')";
mysqli_query($con, $sql);
}
/**
* 记录日志
*/
function insertLog($content)
{
global $con;
$sql = "INSERT INTO `order_log` (content) values('$content')";
mysqli_query($con, $sql);
}
使用非阻塞的文件排他锁
在处理下单请求的时候,用flock锁定一个文件,如果锁定失败说明有其他订单正在处理,此时要么等待要么直接提示用户”服务器繁忙”,计数器存储抢购的商品数量,避免查询数据库。
阻塞(等待)模式:并发时,当有第二个用户请求时,会等待第一个用户请求完成、释放锁,获得文件锁之后,程序才会继续运行下去。
<?php
db();
global $con;
//step1 接收下单参数
$product_id = 1;// 商品ID
$buy_num = 1;// 购买数量
$fp = fopen('lock.txt', 'w');
if (flock($fp, LOCK_EX)) { //文件独占锁,阻塞
//step2 查询商品信息
$sql = "select * from products where id={$product_id}";
$result = mysqli_query($con, $sql);
$row = mysqli_fetch_assoc($result);
//step3 判断商品下单数量是否大于商品库存数量
if ($row['store'] > 0) {
//处理订单
sleep(1);
//step4 更新商品库存数量(减去下单数量)
$sql = "update products set store=store-{$buy_num} where id={$product_id}";
if (mysqli_query($con, $sql)) {
echo "更新成功";
//step5 生成订单号创建订单
$oid = build_order_no();
create_order($oid, $product_id, $buy_num);
insertLog('库存减少成功,下单成功');
} else {
echo "更新失败";
insertLog('库存减少失败');
}
} else {
//商品已经抢购完
echo "没有库存";
insertLog('库存不够');
}
flock($fp, LOCK_UN); //释放锁
}
fclose($fp);
function db()
{
global $con;
$con = new mysqli('localhost','root','root','test');
if (!$con) {
echo "数据库连接失败";
}
}
/**
* 生成唯一订单号
*/
function build_order_no()
{
return date('Ymd') . str_pad(mt_rand(1, 99999), 5, '0', STR_PAD_LEFT);
}
function create_order($oid, $product_id, $number)
{
global $con;
$sql = "INSERT INTO `order` (oid, product_id, number) values('$oid', '$product_id', '$number')";
mysqli_query($con, $sql);
}
/**
* 记录日志
*/
function insertLog($content)
{
global $con;
$sql = "INSERT INTO `order_log` (content) values('$content')";
mysqli_query($con, $sql);
}
非阻塞模式:并发时,第一个用户请求,拿得文件锁之后。后面请求的用户直接返回系统繁忙,请稍后再试
<?php
db();
global $con;
//step1 接收下单参数
$product_id = 1;// 商品ID
$buy_num = 1;// 购买数量
$fp = fopen('lock.txt', 'w');
if (flock($fp, LOCK_EX|LOCK_NB)) { //文件独占锁,非阻塞
//step2 查询商品信息
$sql = "select * from products where id={$product_id}";
$result = mysqli_query($con, $sql);
$row = mysqli_fetch_assoc($result);
//step3 判断商品下单数量是否大于商品库存数量
if ($row['store'] > 0) {
//处理订单
sleep(1);
//step4 更新商品库存数量(减去下单数量)
$sql = "update products set store=store-{$buy_num} where id={$product_id}";
if (mysqli_query($con, $sql)) {
echo "更新成功";
//step5 生成订单号创建订单
$oid = build_order_no();
create_order($oid, $product_id, $buy_num);
insertLog('库存减少成功,下单成功');
} else {
echo "更新失败";
insertLog('库存减少失败');
}
} else {
//商品已经抢购完
echo "没有库存";
insertLog('库存不够');
}
flock($fp, LOCK_UN); //释放锁
} else {
//系统繁忙,请稍后再试
echo "系统繁忙,请稍后再试";
insertLog('系统繁忙,请稍后再试');
}
fclose($fp);
function db()
{
global $con;
$con = new mysqli('localhost','root','root','test');
if (!$con) {
echo "数据库连接失败";
}
}
/**
* 生成唯一订单号
*/
function build_order_no()
{
return date('Ymd') . str_pad(mt_rand(1, 99999), 5, '0', STR_PAD_LEFT);
}
function create_order($oid, $product_id, $number)
{
global $con;
$sql = "INSERT INTO `order` (oid, product_id, number) values('$oid', '$product_id', '$number')";
mysqli_query($con, $sql);
}
/**
* 记录日志
*/
function insertLog($content)
{
global $con;
$sql = "INSERT INTO `order_log` (content) values('$content')";
mysqli_query($con, $sql);
}
使用redis队列
- 因为pop操作是原子的,即使有很多用户同时到达,也是依次执行,推荐使用
- mysql事务在高并发下性能下降很厉害,文件锁的方式也是
- 先将商品库存到redis队列
<?php
db();
global $con;
// 查询商品信息
$product_id = 1;
$sql = "select * from products where id={$product_id}";
$result = mysqli_query($con, $sql);
$row = mysqli_fetch_assoc($result);
$store = $row['store'];
// 获取商品在redis缓存的库存
$redis = new Redis();
$result = $redis->connect('127.0.0.1', 6379);
$key = 'goods_store_' . $product_id;
$res = $redis->llen($key);
$count = $store - $res;
for ($i=0; $i<$count; $i++) {
$redis->lpush($key, 1);
}
echo $redis->llen($key);
function db()
{
global $con;
$con = new mysqli('localhost','root','root','test');
if (!$con) {
echo "数据库连接失败";
}
}
2.抢购、秒杀逻辑
<?php
db();
global $con;
//step1 接收下单参数
$product_id = 1;// 商品ID
$buy_num = 1;// 购买数量
//step2 下单前判断redis队列库存量
$redis = new Redis();
$result = $redis->connect('127.0.0.1',6379);
$count = $redis->lpop('goods_store_' . $product_id);
if (!$count) {
insertLog('error:no store redis');
return '秒杀结束,没有商品库存了';
}
sleep(1);
//step3 更新商品库存数量(减去下单数量)
$sql = "update products set store=store-{$buy_num} where id={$product_id}";
if (mysqli_query($con, $sql)) {
echo "更新成功";
//step4 生成订单号创建订单
$oid = build_order_no();
create_order($oid, $product_id, $buy_num);
insertLog('库存减少成功,下单成功');
} else {
echo "更新失败";
insertLog('库存减少失败');
}
function db()
{
global $con;
$con = new mysqli('localhost','root','root','test');
if (!$con) {
echo "数据库连接失败";
}
}
/**
* 生成唯一订单号
*/
function build_order_no()
{
return date('Ymd') . str_pad(mt_rand(1, 99999), 5, '0', STR_PAD_LEFT);
}
function create_order($oid, $product_id, $number)
{
global $con;
$sql = "INSERT INTO `order` (oid, product_id, number) values('$oid', '$product_id', '$number')";
mysqli_query($con, $sql);
}
/**
* 记录日志
*/
function insertLog($content)
{
global $con;
$sql = "INSERT INTO `order_log` (content) values('$content')";
mysqli_query($con, $sql);
}
redis乐观锁防止超卖
<?php
$redis =new Redis();
$redis->connect("127.0.0.1", 6379);
$redis->watch('sales');//乐观锁 监视作用 set() 初始值0
$sales = $redis->get('sales');
$n = 20;// 库存
if ($sales >= $n) {
exit('秒杀结束');
}
//redis开启事务
$redis->multi();
$redis->incr('sales'); //将 key 中储存的数字值增一 ,如果 key 不存在,那么 key 的值会先被初始化为 0 ,然后再执行 INCR 操作。
$res = $redis->exec(); //成功1 失败0
if ($res) {
//秒杀成功
$con = new mysqli('localhost','root','root','test');
if (!$con) {
echo "数据库连接失败";
}
$product_id = 1;// 商品ID
$buy_num = 1;// 购买数量
sleep(1);
$sql = "update products set store=store-{$buy_num} where id={$product_id}";
if (mysqli_query($con, $sql)) {
echo "秒杀完成";
}
} else {
exit('抢购失败');
}