当前位置:Gxlcms > 数据库问题 > 谈反应式编程在服务端中的应用,数据库操作优化,提速 Upsert

谈反应式编程在服务端中的应用,数据库操作优化,提速 Upsert

时间:2021-07-01 10:21:17 帮助过:8人阅读

class BatchUpsert : IUpsertRepository
{
private readonly IDatabase _database;
private readonly IBatchOperator<(int, int), int> _batchOperator;

public BatchUpsert(IDatabase database)
{
_database = database;
var options = new BatchOperatorOptions<(int, int), int>
{
BufferCount = 100,
BufferTime = TimeSpan.FromMilliseconds(50),
DoManyFunc = DoManyFunc
};
_batchOperator = new BatchOperator<(int, int), int>(options);
}

private Task<int> DoManyFunc(IEnumerable<(int, int)> arg)
{
return _database.UpsertMany(arg.ToDictionary(x => x.Item1, x => x.Item2));
}

public Task UpsertAsync(int key, int value)
{
return _batchOperator.CreateTask((key, value));
}
}

然后,只要实现对应数据库的 UpsertMany 方法,便可以很好地完成这项优化。

各种数据库的操作

结合 Newbe.Claptrap 现在项目的实际。目前,被支持的数据库分别有 SQLite、PostgreSQL、MySql 和 MongoDB。以下,分别对不同类型的数据库的批量 Upsert 操作进行说明。

由于在 Newbe.Claptrap 项目中的 Upsert 需求都是以主键作为对比键,因此以下也只讨论这种情况。

SQLite

根据官方文档,使用 INSERT OR REPLACE INTO 便可以实现主键冲突时替换数据的需求。

具体的语句格式形如以下:

INSERT OR REPLACE INTO TestTable (id, value)
VALUES
(@id0,@value0),
...
(@idn,@valuen);

因此只要直接拼接语句和参数调用即可。需要注意的是,SQLite 的可传入参数默认为 999,因此拼接的变量也不应大于该数量。

官方文档:INSERT

PostgreSQL

众所周知,PostgreSQL 在进行批量写入时,可以使用高效的 COPY 语句来完成数据的高速导入,这远远快于 INSERT 语句。但可惜的是 COPY 并不能支持 ON CONFLICT DO UPDATE 子句。因此,无法使用 COPY 来完成 upsert 需求。

因此,我们还是回归使用 INSERT 配合 ON CONFLICT DO UPDATE 子句,以及 unnest 函数来完成批量 upsert 的需求。

具体的语句格式形如以下:

INSERT INTO TestTable (id, value)
VALUES (unnest(@ids), unnest(@values))
ON CONFLICT ON CONSTRAINT TestTable_pkey
DO UPDATE SET value=excluded.value;

其中的 ids 和 values 分别为两个等长的数组对象,unnest 函数可以将数组对象转换为行数据的形式。

注意,可能会出现 ON CONFLICT DO UPDATE command cannot affect row a second time 错误。

因此如果尝试使用上述方案,需要在传入数据库之前,先在程序中去重一遍。而且,通常来说,在程序中进行一次去重可以减少向数据库中传入的数据,这本身也很有意义。

官方文档:unnest 函数
官方文档:Insert 语句

MySql

MySql 与 SQLite 类似,支持 REPLACE 语法。具体语句形式如下:

REPLACE INTO TestTable (id, value)
VALUES
(@id0,@value0),
...
(@idn,@valuen);

官方文档:REPLACE 语句

MongoDB

MongoDB 原生支持 bulkWrite 的批量传输模式,也支持 replace 的 upsert 语法。因此操作非常简单。

那么这里展示一下 C# 操作方法:

private async Task SaveManyCoreMany(
IDbFactory dbFactory,
IEnumerable<StateEntity> entities)
{
var array = entities as StateEntity[] ?? entities.ToArray();
var items = array
.Select(x => new MongoStateEntity
{
claptrap_id = x.ClaptrapId,
claptrap_type_code = x.ClaptrapTypeCode,
version = x.Version,
state_data = x.StateData,
updated_time = x.UpdatedTime,
})
.ToArray();

var client = dbFactory.GetConnection(_connectionName);
var db = client.GetDatabase(_databaseName);
var collection = db.GetCollection<MongoStateEntity>(_stateCollectionName);

var upsertModels = items.Select(x =>
{
var filter = new ExpressionFilterDefinition<MongoStateEntity>(entity =>
entity.claptrap_id == x.claptrap_id && entity.claptrap_type_code == x.claptrap_type_code);
return new ReplaceOneModel<MongoStateEntity>(filter, x)
{
IsUpsert = true
};
});
await collection.BulkWriteAsync(upsertModels);
}

这是从 Newbe.Claptrap 项目业务场景中给出的代码,读者可以结合自身需求进行修改。

官方文档:db.collection.bulkWrite ()

通用型解法

优化的本质是减少数据库链接的使用,尽可能在一个链接内完成更多的工作。因此如果特定的数据库不支持以上数据库类似的操作。那么还是存在一种通用型的解法:

  1. 以尽可能快地方式将数据写入一临时表
  2. 将临时表的数据已连表 update 的方式更新的目标表
  3. 删除临时表

UPDATE with a join

性能测试

以 SQLite 为例,尝试对 12345 条数据进行 2 次 upsert 操作。

单条并发:1 分 6 秒

批量处理:2.9 秒

可以在该链接找到测试的代码。

样例中不包含有 MySql、PostgreSQL 和 MongoDB 的样例,因为没有优化之前,在不提高连接池的情况下,一并发基本就爆炸了。所有优化的结果是直接解决了可用性的问题。

所有的示例代码均可以在代码库中找到。如果 Github Clone 存在困难,也可以点击此处从 Gitee 进行 Clone

常见问题解答

此处对一些常见的问题进行解答。

客户端是等待批量操作的结果吗?

这是一个很多网友提出的问题。答案是:是的。

假设我们公开了一个 WebApi 作为接口,由浏览器调用。如果同时有 100 个浏览器同时发出请求。

那么这 100 个请求会被合并,然后写入数据库。而在写入数据库之前,这些客户端都不会得到服务端的响应,会一直等待。

这也是该合并方案区别于普通的 “写队列,后写库” 方案的地方。

原理上讲,这种和 bulkcopy 有啥不一样?

两者是不相关,必须同时才有作用的功能。
首先,代码中的 database.InsertMany 就是你提到的 bulkcopy。

这个代码的关键不是 InsertMany ,而是如何将单次的插入请求合并。
试想一下,你可以在 webapi 上公开一个 bulkcopy 的 API。
但是,你无法将来自不同客户端的请求合并在同一个 API 里面来调用 bulkcopy。
例如,有一万个客户端都在调用你的 API,那怎么合并这些 API 请求呢?

如果如果通过上面这种方式,虽然你只是对外公开了一个单次插入的 API。你却实现了来自不同客户端请求的合并,变得可以使用 bulkcopy 了。这在高并发下很有意义。

另外,这符合开闭的原理,因为你没有修改 Repository 的 InsertOne 接口,却实现了 bulkcopy
的效果。

如果批量操作中一个操作异常失败是否会导致被合并的其他操作全部失败?

如果业务场景是合并会有影响,那当然不应该合并。

批量操作一个失败,当然是一起失败,因为底层的数据库事务肯定也是一起失败。

除非批量接口也支持对每个传入的 ID 做区别对待。典型的,比如 mongodb 的 bulkcopy 可以返回哪些成功哪些失败,那么我们就有能力设置不同的 Tcs 状态。

哪些该合并,哪些不该合并,完全取决于业务。样例给出的是如果要合并,应该怎么合并。不会要求所有都要合并。

Insert 和 Upsert 都说了,那 Delete 和 Select 呢?

笔者笼统地将该模式称为 “反应式批量处理”。要确认业务场景是否应用该模式,需要具备以下这两个基本的要求:

  • 业务下游的批量处理是否会比累积的单条处理要快,如果会,那可以用
  • 业务上游是否会出现短时间的突增频率的请求,如果会,那可以用

当然,还需要考量,比如:下游的批量操作能否却分每个请求的结果等等问题。但以上两点是一定需要考量的。

那么以 Delete 为例:

  • Delete Where In 的速度会比 Delete = 的速度快吗?试一下
  • 会有突增的 Delete 需求吗?想一下

小小工具 Zeal

笔者是一个完整存储过程都写不出来的人。能够查阅到这些数据库的文档,全靠一款名为 Zeal 的离线文档查看免费软件。推荐给您,您也值得拥有。

技术图片

Zeal 官网地址:https://zealdocs.org/

最后但是最重要!

最近作者正在构建以反应式Actor模式事件溯源为理论基础的一套服务端开发框架。希望为开发者提供能够便于开发出 “分布式”、“可水平扩展”、“可测试性高” 的应用系统 ——Newbe.Claptrap

本篇文章是该框架的一篇技术选文,属于技术构成的一部分。如果读者对该内容感兴趣,欢迎转发、评论、收藏文章以及项目。您的支持是促进项目成功的关键。

如果你对该项目感兴趣,你可以通过 github issues 提交您的看法。

如果您无法正常访问 github issue,您也可以发送邮件到 newbe-claptrap@googlegroups.com 来参与我们的讨论。

点击链接 QQ 交流【Newbe.Claptrap】:https://jq.qq.com/?_wv=1027&k=5uJGXf5。

您还可以查阅本系列的其他选文:

  • Newbe.Claptrap - 一套以 “事件溯源” 和 “Actor 模式” 作为基本理论的服务端开发框架
  • 十万同时在线用户,需要多少内存?——Newbe.Claptrap 框架水平扩展实验
  • 谈反应式编程在服务端中的应用,数据库操作优化,从 20 秒到 0.5 秒
  • 谈反应式编程在服务端中的应用,数据库操作优化,提速 Upsert
  • Newbe.Claptrap 项目周报 1 - 还没轮影,先用轮跑

GitHub 项目地址:https://github.com/newbe36524/Newbe.Claptrap

Gitee 项目地址:https://gitee.com/yks/Newbe.Claptrap

 

技术图片

  • 本文作者: newbe36524
  • 本文链接: https://www.newbe.pro/Newbe.Claptrap/Reactive-In-Server-2/
  • 版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!

谈反应式编程在服务端中的应用,数据库操作优化,提速 Upsert

标签:const   失败   一个   发送   扩展   can   优化   lap   出现   

人气教程排行