当前位置:Gxlcms > PHP教程 > 如何处理下面的并发问题

如何处理下面的并发问题

时间:2021-07-01 10:21:17 帮助过:3人阅读

1. 描述你的问题

工作中存在如下表:
表table_2,其中存在 unique key(device, seq)唯一索引。 其中seq字段是我们程序中维护的一个自增序列。
业务的需求就是:每次推送一条消息,会根据device获取表中最大的seq。
select seq from table_2 where device = ? order by seq desc
然后将获取的 seq+1 插入到table_2中作为当前消息的记录
insert into table_2 (device, seq) values(?, ?)

2 . 贴上相关代码。代码是我简化的结果,方便大家阅读

$device = "x-x";
$this->_db->startTrans();
//从table_2表中读取该device最大的seq,
//然后将该seq+1之后,重新将该device和seq插入的数据表中。
//现在的问题是当并发的时候,并发的select语句获取到了相同的seq,
//所以insert的时候冲突发生了。
$result = $this->_db->getRow(
    “SELECT * FROM table_2 WHERE device = ? ORDER BY seq DESC LIMIT 1”, $device
)
$Seq = $result['seq'] + 1;
$this->_db->execute(
    "INSERT INTO table_2(device, seq) VALUES(?, ?)", array($device, $Seq)
);
$this->_db->commit();

3. 贴上报错信息

失败方法1:
不合理的原因是:直接加上异常处理机制。当冲突时,并发的请求因为抛出异常,直接被捕获,程序继续运行,不做任何处理。结果就是:导致客户端请求失败,客户端需要重新发起请求。

try{
    //代码放这里
} catch(Exception $e){
    if ($e->getCode() == 1062){
        //
    }
}

失败方法2
尝试重试机制.当请求第一次失败时,会多次尝试请求。当数据库唯一索引冲突的时候,catch捕获该异常,然后重试。重试的条件必须是:捕获的异常是数据库插入冲突,返回1062异常code。

for($i=0; $i < 10; $i++)
try{
    //代码放这里
} catch(DBException $e){
    if($e->code() == 1062){
        continue;
    }
    throw $e;
}

失败方法3
通过redis来控制。因为冲突的根源就是于并发导致多个请求读取到了相同的seq导致的。所以考虑使用redis‘锁’的机制来实现。第一个获取到seq的请求会设置一个key,之后的请求都会在这个key的基础上进行获取。

$device = "";
$result = $this->_db->getRow(
    “SELECT * FROM table_2 WHERE device = ? ORDER BY seq DESC LIMIT 1”, $device
)
$Seq = $result['seq'] + 1;

//因为并发其实就是因为上面的sql语句查到了相同的seq,所以这里
//就只获取第一条执行的seq,之后的全部通过对该redis加1操作。使用setnx和incr是为了保证原子操作。
//这个处理方式仍然存在问题,比如在键值过期的时刻存在并发,这个时候$Seq就可能从1开始。
$lock = $redis->setnx($device, $Seq)
$redis->expire($device, 2);
if (!$lock){
    $Seq = $redis->incr($device);
}
$this->_db->execute(
    "INSERT INTO table_2(device, seq) VALUES(?, ?)", array($device, $Seq)
);
$this->_db->commit();

失败方式4
另一种重试机制。上面的重试机制是靠MySQL数据库的插入冲突,现在的重试是通过Redis在Select语句层面实现。

$device = "";
for($i=0;$i<6; $i++){
    if($i!=0){
        usleep(10000);
    } else {
        $result = $this->_db->getRow(
            “SELECT * FROM table_2 WHERE device = ? ORDER BY seq DESC LIMIT 1”, $device
        )
        $Seq = $result['seq'] + 1;
        $lock = $redis->setnx($device . $Seq, rand());
        if ($lock){
             $success = true;
             $redis->expire($device . $Seq, 2);
             break;
        }
    }
    if (!$success) {
            throw new Exception();
    }
    $this->_db->execute(
        "INSERT INTO table_2(device, seq) VALUES(?, ?)", array($device, $Seq)
    );
    $this->_db->commit();
}

失败方法5
使用事务。使用MySQL机制的事务来实现。首先在Select语句中加上意向排他锁IX, 修改之后的SQL为 “SELECT * FROM table_2 WHERE device = ? ORDER BY seq DESC LIMIT 1 FOR UPDATE”, 后面的语句不变。

官方文档是这样介绍的:两个意向排他说并没有冲突,所以并发请求A和并发请求B同时会获取IX,insert操作需要一个X锁。

X IX S IS
X Conflict Conflict Conflict Conflict
IX Conflict Compatible Conflict Compatible
S Conflict Conflict Compatible Compatible
IS Conflict Compatible Compatible Compatible

我的猜测:A请求在获取X锁的时候,B请求也在获取X锁。导致A请求在他遍历的行上加锁,B请求也同时在加锁,造成了循环等待锁释放的情况,产生死锁。

//前一句使用select for update
//然后和insert包裹起来。多进程执行的话,会出现死锁。
//通过降低mysql的隔离水平确实可以实现,但是可能会产生不可重复读的问题.通俗的介绍就是:A请求在操作事务中两次读到的数据可能不一致。

失败方法6
将两条sql放在一条sql里。 INSERT INTO table_2(device, seq) VALUES(?, (select seq + 1 from table_2) where device = ? )", array($device)

//因为前一条sql是我简化版,其实他是一个分表的查询操作。
//这里的分表是一个router操作,所以不适合进行一条sql语句
//就是会依次查询我们这里分的几个表,最近的一个表查不到,查第二个

方法7
修改数据表结构,将seq保存到独立的一个其他表中,每个device对应一个最大的seq。这样在select的时候就可以保证select for update 不会产生间隙锁,而是指在一条数据行上加锁。

  1. 贴上相关截图

图片就如上所示。大部分的代码都是自己手工敲的,所以有些地方大家不用深究,了解清楚意思就可以。
希望大家可以有建设性建议

大神来指点指点吧

我实在是想不出什么好的办法了。将并发请求转化为串行,这个想法没有很好的实现。大神在哪里?

回复内容:

1. 描述你的问题

工作中存在如下表:
表table_2,其中存在 unique key(device, seq)唯一索引。 其中seq字段是我们程序中维护的一个自增序列。
业务的需求就是:每次推送一条消息,会根据device获取表中最大的seq。
select seq from table_2 where device = ? order by seq desc
然后将获取的 seq+1 插入到table_2中作为当前消息的记录
insert into table_2 (device, seq) values(?, ?)

2 . 贴上相关代码。代码是我简化的结果,方便大家阅读

$device = "x-x";
$this->_db->startTrans();
//从table_2表中读取该device最大的seq,
//然后将该seq+1之后,重新将该device和seq插入的数据表中。
//现在的问题是当并发的时候,并发的select语句获取到了相同的seq,
//所以insert的时候冲突发生了。
$result = $this->_db->getRow(
    “SELECT * FROM table_2 WHERE device = ? ORDER BY seq DESC LIMIT 1”, $device
)
$Seq = $result['seq'] + 1;
$this->_db->execute(
    "INSERT INTO table_2(device, seq) VALUES(?, ?)", array($device, $Seq)
);
$this->_db->commit();

3. 贴上报错信息

失败方法1:
不合理的原因是:直接加上异常处理机制。当冲突时,并发的请求因为抛出异常,直接被捕获,程序继续运行,不做任何处理。结果就是:导致客户端请求失败,客户端需要重新发起请求。

try{
    //代码放这里
} catch(Exception $e){
    if ($e->getCode() == 1062){
        //
    }
}

失败方法2
尝试重试机制.当请求第一次失败时,会多次尝试请求。当数据库唯一索引冲突的时候,catch捕获该异常,然后重试。重试的条件必须是:捕获的异常是数据库插入冲突,返回1062异常code。

for($i=0; $i < 10; $i++)
try{
    //代码放这里
} catch(DBException $e){
    if($e->code() == 1062){
        continue;
    }
    throw $e;
}

失败方法3
通过redis来控制。因为冲突的根源就是于并发导致多个请求读取到了相同的seq导致的。所以考虑使用redis‘锁’的机制来实现。第一个获取到seq的请求会设置一个key,之后的请求都会在这个key的基础上进行获取。

$device = "";
$result = $this->_db->getRow(
    “SELECT * FROM table_2 WHERE device = ? ORDER BY seq DESC LIMIT 1”, $device
)
$Seq = $result['seq'] + 1;

//因为并发其实就是因为上面的sql语句查到了相同的seq,所以这里
//就只获取第一条执行的seq,之后的全部通过对该redis加1操作。使用setnx和incr是为了保证原子操作。
//这个处理方式仍然存在问题,比如在键值过期的时刻存在并发,这个时候$Seq就可能从1开始。
$lock = $redis->setnx($device, $Seq)
$redis->expire($device, 2);
if (!$lock){
    $Seq = $redis->incr($device);
}
$this->_db->execute(
    "INSERT INTO table_2(device, seq) VALUES(?, ?)", array($device, $Seq)
);
$this->_db->commit();

失败方式4
另一种重试机制。上面的重试机制是靠MySQL数据库的插入冲突,现在的重试是通过Redis在Select语句层面实现。

$device = "";
for($i=0;$i<6; $i++){
    if($i!=0){
        usleep(10000);
    } else {
        $result = $this->_db->getRow(
            “SELECT * FROM table_2 WHERE device = ? ORDER BY seq DESC LIMIT 1”, $device
        )
        $Seq = $result['seq'] + 1;
        $lock = $redis->setnx($device . $Seq, rand());
        if ($lock){
             $success = true;
             $redis->expire($device . $Seq, 2);
             break;
        }
    }
    if (!$success) {
            throw new Exception();
    }
    $this->_db->execute(
        "INSERT INTO table_2(device, seq) VALUES(?, ?)", array($device, $Seq)
    );
    $this->_db->commit();
}

失败方法5
使用事务。使用MySQL机制的事务来实现。首先在Select语句中加上意向排他锁IX, 修改之后的SQL为 “SELECT * FROM table_2 WHERE device = ? ORDER BY seq DESC LIMIT 1 FOR UPDATE”, 后面的语句不变。

官方文档是这样介绍的:两个意向排他说并没有冲突,所以并发请求A和并发请求B同时会获取IX,insert操作需要一个X锁。

X IX S IS
X Conflict Conflict Conflict Conflict
IX Conflict Compatible Conflict Compatible
S Conflict Conflict Compatible Compatible
IS Conflict Compatible Compatible Compatible

我的猜测:A请求在获取X锁的时候,B请求也在获取X锁。导致A请求在他遍历的行上加锁,B请求也同时在加锁,造成了循环等待锁释放的情况,产生死锁。

//前一句使用select for update
//然后和insert包裹起来。多进程执行的话,会出现死锁。
//通过降低mysql的隔离水平确实可以实现,但是可能会产生不可重复读的问题.通俗的介绍就是:A请求在操作事务中两次读到的数据可能不一致。

失败方法6
将两条sql放在一条sql里。 INSERT INTO table_2(device, seq) VALUES(?, (select seq + 1 from table_2) where device = ? )", array($device)

//因为前一条sql是我简化版,其实他是一个分表的查询操作。
//这里的分表是一个router操作,所以不适合进行一条sql语句
//就是会依次查询我们这里分的几个表,最近的一个表查不到,查第二个

方法7
修改数据表结构,将seq保存到独立的一个其他表中,每个device对应一个最大的seq。这样在select的时候就可以保证select for update 不会产生间隙锁,而是指在一条数据行上加锁。

  1. 贴上相关截图

图片就如上所示。大部分的代码都是自己手工敲的,所以有些地方大家不用深究,了解清楚意思就可以。
希望大家可以有建设性建议

大神来指点指点吧

我实在是想不出什么好的办法了。将并发请求转化为串行,这个想法没有很好的实现。大神在哪里?

insert into table_2 select max(seq)+1,device from table_2 WHERE device = ?;

直接使用redis的incr来生成自增id,incr是原子操作,不会有并发问题。

你的问题归纳一下就是:如何实现同一个device下seq自增长问题。
解决方法:使用redis的hash保存device与seq的关系,当需要为指定device生产一个唯一seq时使用HINCRBY命令即可。

人气教程排行