当前位置:Gxlcms > mysql > 通过Nginx/Lua给Redis的PIPELINING减肥

通过Nginx/Lua给Redis的PIPELINING减肥

时间:2021-07-01 10:21:17 帮助过:42人阅读

某手机应用市场项目,其中请求量最大的功能是查询升级接口,具体点来说:客户端会不定期的把手机中应用名称及其应用版本发送到服务端,服务端通过比较版本来判断客户端的应用是否需要升级,如果需要就返回若干项相关信息。通常,一台手机里会装几十个到上百

某手机应用市场项目,其中请求量最大的功能是查询升级接口,具体点来说:客户端会不定期的把手机中应用名称及其应用版本发送到服务端,服务端通过比较版本来判断客户端的应用是否需要升级,如果需要就返回若干项相关信息。通常,一台手机里会装几十个到上百个应用不等,当海量客户端一起请求时,服务端的压力可想而知。

客户端请求的数据格式如下所示,可以采用GET或POST方法:

packages=foo|1&packages=bar|2&packages=|&...

服务端选择Lua作为编程语言,同时利用了Redis的PIPELINING机制批量查询数据:

local redis = require "resty.redis"
local cjson = require "cjson"
local config = require "config"
ngx.header["Content-Type"] = "application/json; charset=utf-8"
local args = ngx.req.get_uri_args(1000)
if ngx.var.request_method == "POST" then
    ngx.req.read_body()
    for key, val in pairs(ngx.req.get_post_args(1000)) do
        args[key] = val
    end
end
if type(args["packages"]) == "string" then
    args["packages"] = {args["packages"]}
end
if type(args["packages"]) ~= "table" then
    ngx.exit(ngx.HTTP_BAD_REQUEST)
end
local cache = redis.new()
local res, err = cache:connect(config.host, config.port)
if not res then
    ngx.log(ngx.ERR, "error: ", err)
    ngx.exit(ngx.HTTP_SERVICE_UNAVAILABLE)
end
cache:init_pipeline()
local packages = {}
for _, val in ipairs(args["packages"]) do
    if type(val) == "string" then
        local name, version = string.match(val, "([^|]+)|([0-9]+)")
        if name and version then
            packages[name] = tonumber(version)
            cache:hget(name, "all")
        end
    end
end
local res, err = cache:commit_pipeline()
if not res then
    ngx.log(ngx.ERR, "error: ", err)
    ngx.exit(ngx.HTTP_SERVICE_UNAVAILABLE)
end
local data = {}
for _, val in ipairs(res) do
    if type(val) == "string" then
        val = cjson.decode(val)
        if packages[val["name"]] < val["version"] then
            data[#data + 1] = val
        end
    end
end
ngx.say(cjson.encode(data))

补充:应用的数据作为HASH保存在Redis里,不过由于HGETALL非常耗费CPU,所以做了些处理:冗余保存了一个名为「all」的字段,用来保存原始数据的JSON编码,如此一来,就把复杂度为O(N)的HGETALL操作转化成了复杂度为O(1)的HGET操作,从而提升了效率。详细介绍请参考:《记Redis那坑人的HGETALL》。

如上代码平稳运行了一段时间,但随着访问量的增加,开始暴露问题:Redis时不时出现卡住的现象,究其原因就是单线程的Redis无法承载过大的PIPELINING请求。通常我们都是采用多实例的方法来规避Redis单线程的性能瓶颈问题,但在本例中,由于PIPELING很大,不是随便冗余几个实例就能解决问题,同时系统也没有太多的内存可用。

最后我们想到的办法是利用Nginx/Lua给Redis的PIPELINING减肥,具体一点来说:当客户端查询升级接口时,虽然会把多至上百个应用的信息同时发送到服务端,但其中真正升级的应用却很少,如果我们能把那些不升级的应用过滤掉,只查询升级的应用,无疑将大大提升系统的性能,形象一点说:当一个胖子请求经过过滤后,就变成了一个瘦子请求。实际操作时,我们可以把应用的版本缓存到Nginx/Lua共享内存里,客户端请求先在Nginx/Lua共享内存过滤一次,然后再判断是否需要查询Redis。

为了使用共享内存,需要在Nginx配置文件中声明:

lua_shared_dict versions 100m;

改良后的代码如下所示,注意其中共享内存的查询和设置部分的代码:

local redis = require "resty.redis"
local cjson = require "cjson"
local config = require "config"
local versions = ngx.shared.versions;
ngx.header["Content-Type"] = "application/json; charset=utf-8"
local args = ngx.req.get_uri_args(1000)
if ngx.var.request_method == "POST" then
    ngx.req.read_body()
    for key, val in pairs(ngx.req.get_post_args(1000)) do
        args[key] = val
    end
end
if type(args["packages"]) == "string" then
    args["packages"] = {args["packages"]}
end
if type(args["packages"]) ~= "table" then
    ngx.exit(ngx.HTTP_BAD_REQUEST)
end
local cache = redis.new()
local res, err = cache:connect(config.host, config.port)
if not res then
    ngx.log(ngx.ERR, "error: ", err)
    ngx.exit(ngx.HTTP_SERVICE_UNAVAILABLE)
end
cache:init_pipeline()
local packages = {}
for _, val in ipairs(args["packages"]) do
    if type(val) == "string" then
        local name, version = string.match(val, "([^|]+)|([0-9]+)")
        if name and version then
            version = tonumber(version)
            if not versions[name] or versions[name] > version then
                packages[name] = version
                cache:hget(name, "all")
            end
        end
    end
end
local res, err = cache:commit_pipeline()
if not res then
    ngx.log(ngx.ERR, "error: ", err)
    ngx.exit(ngx.HTTP_SERVICE_UNAVAILABLE)
end
local data = {}
for _, val in ipairs(res) do
    if type(val) == "string" then
        val = cjson.decode(val)
        if packages[val["name"]] < val["version"] then
            data[#data + 1] = val
        end
        local timeout = math.random(600, 1200)
        versions:set(val["name"], val["version"], timeout)
        packages[val["name"]] = nil
    end
end
for name in pair(packages) do
    versions:set(name, -1, 1800)
end
ngx.say(cjson.encode(data))

或许会有人会质疑:本质上请求量并没有减少啊,只是从Redis转嫁到了Nginx而已,这样就能提升性能么?问题的关键在于Redis是单线程的,而Nginx通过worker_processes指令,可以充分发挥多核CPU的能力,所以性能总体是提升的。

补充:代码里在设置共享内存过期时间的时候,没有采用固定值,而是采用了一个随机数的方式,之所以这样设计,是为了避免大量数据同时过期,系统性能出现抖动。

当然,随着访问量的增加,本文的方案可能还会出现问题,到时候怎么办呢?其实类似查询升级之类的功能,就不应该设计成同步的形式,如果能改成异步的方式,那么多数问题就都不存在了,不过这个就不多言了,现在的方案刚刚好够用。

人气教程排行