当前位置:Gxlcms > mysql > redis源代码分析23–VM(上)

redis源代码分析23–VM(上)

时间:2021-07-01 10:21:17 帮助过:20人阅读

VM是Redis2.0新增的一个功能。在没有VM之前,redis会把db中的所有数据放在内存中。随着redis的不断运行,所使用的内存会越来越大。但同时,client对某些数据的访问频度明显会比其他数据高。redis引入VM功能来试图解决这个问题。简言之,VM使得redis会把很少

VM是Redis2.0新增的一个功能。在没有VM之前,redis会把db中的所有数据放在内存中。随着redis的不断运行,所使用的内存会越来越大。但同时,client对某些数据的访问频度明显会比其他数据高。redis引入VM功能来试图解决这个问题。简言之,VM使得redis会把很少访问的value保存到磁盘中。但同时,所有value的key都放在内存中,这是为了让被换出的value的查找在启用VM前后性能差不多。

VM在redis中算是redis中最复杂的模块之一,我们分三节来介绍。这一节介绍redis的主要数据结构,下一节介绍非阻塞方式,最后一节介绍多线程方式。

我们先来看看redis中的通用对象结构redisObject :

// VM启用时, 对象所处位置
#define REDIS_VM_MEMORY 0       /* The object is on memory */
#define REDIS_VM_SWAPPED 1      /* The object is on disk */
#define REDIS_VM_SWAPPING 2     /* Redis is swapping this object on disk */
#define REDIS_VM_LOADING 3      /* Redis is loading this object from disk */
/* The VM object structure */
struct redisObjectVM {
    off_t page;         /* the page at witch the object is stored on disk */
    off_t usedpages;    /* number of pages used on disk */
    time_t atime;       /* Last access time */
} vm;
/* The actual Redis Object */
// 通用类型
// 对于key,需额外标志保存value的位置、类型等
typedef struct redisObject {
    void *ptr;
    unsigned char type;
    unsigned char encoding;
    unsigned char storage;  /* If this object is a key, where is the value?
                             * REDIS_VM_MEMORY, REDIS_VM_SWAPPED, ... */
    unsigned char vtype; /* If this object is a key, and value is swapped out,
                          * this is the type of the swapped out object. */
    int refcount;
    /* VM fields, this are only allocated if VM is active, otherwise the
     * object allocation function will just allocate
     * sizeof(redisObjct) minus sizeof(redisObjectVM), so using
     * Redis without VM active will not have any overhead. */
    struct redisObjectVM vm;
} robj;

robj 中的type保存了对象的类型,如string、list、set等。storage保存了该key对象对应的value所处的位置:内存、磁盘、正在被换出到磁盘,正在加载。vtype表示该key对象所对应的value的类型。page和usedpages保存了该key对象所对应的 value,atime是value的最后一次访问时间。因此,当robj所表示的key对象的storage类型为REDIS_VM_SWAPPED 时,就表示该key的value已不在内存中,需从VM中page的位置加载该value,vaue的类型为vtype,大小为usedpages。

创建对象的时候,根据是否启用VM机制,来分配合适大小的robj对象大小。

static robj *createObject(int type, void *ptr) {
   ---
   else {
        if (server.vm_enabled) {
            pthread_mutex_unlock(&server.obj_freelist_mutex);
            o = zmalloc(sizeof(*o));
        } else {
            o = zmalloc(sizeof(*o)-sizeof(struct redisObjectVM));
        }
    }
    ---
    if (server.vm_enabled) {
        /* Note that this code may run in the context of an I/O thread
         * and accessing to server.unixtime in theory is an error
         * (no locks). But in practice this is safe, and even if we read
         * garbage Redis will not fail, as it's just a statistical info */
        o->vm.atime = server.unixtime;
        o->storage = REDIS_VM_MEMORY;
    }
    return o;
}

VM的所有相关结构保存在redisServer 的如下几个字段中。

 /* Global server state structure */
struct redisServer {
    ---
    /* Virtual memory state */
    FILE *vm_fp;
    int vm_fd;
    off_t vm_next_page; /* Next probably empty page */
    off_t vm_near_pages; /* Number of pages allocated sequentially */
    unsigned char *vm_bitmap; /* Bitmap of free/used pages */
    time_t unixtime;    /* Unix time sampled every second. */
    /* Virtual memory I/O threads stuff */
    /* An I/O thread process an element taken from the io_jobs queue and
     * put the result of the operation in the io_done list. While the
     * job is being processed, it's put on io_processing queue. */
    list *io_newjobs; /* List of VM I/O jobs yet to be processed */
    list *io_processing; /* List of VM I/O jobs being processed */
    list *io_processed; /* List of VM I/O jobs already processed */
    list *io_ready_clients; /* Clients ready to be unblocked. All keys loaded */
    pthread_mutex_t io_mutex; /* lock to access io_jobs/io_done/io_thread_job */
    pthread_mutex_t obj_freelist_mutex; /* safe redis objects creation/free */
    pthread_mutex_t io_swapfile_mutex; /* So we can lseek + write */
    pthread_attr_t io_threads_attr; /* attributes for threads creation */
    int io_active_threads; /* Number of running I/O threads */
    int vm_max_threads; /* Max number of I/O threads running at the same time */
    /* Our main thread is blocked on the event loop, locking for sockets ready
     * to be read or written, so when a threaded I/O operation is ready to be
     * processed by the main thread, the I/O thread will use a unix pipe to
     * awake the main thread. The followings are the two pipe FDs. */
    int io_ready_pipe_read;
    int io_ready_pipe_write;
    /* Virtual memory stats */
    unsigned long long vm_stats_used_pages;
    unsigned long long vm_stats_swapped_objects;
    unsigned long long vm_stats_swapouts;
    unsigned long long vm_stats_swapins;
   ---
};

vm_fp 和vm_fd指向磁盘上的vm文件,通过这两个指针来读写vm文件。vm_bitmap管理着vm文件中每一页的分配与释放情况(每一项为0表示该页空闲,为1表示已使用)。每一页的大小通过vm-page-size来配置,页数通过vm-pages来配置。值得一提的是,redis中的每一页最多只能放置一个对象,一个对象可以放在连续的多个页上。unixtime只是缓存时间值,这在计算value的最近使用频率时会用到。接下来的结构跟多线程方式换出/换进vlue有关。使用多线程方式时,换进/换出value被看成一个个的job,job的类型有如下几种:

/* VM threaded I/O request message */
#define REDIS_IOJOB_LOAD 0          /* Load from disk to memory */
#define REDIS_IOJOB_PREPARE_SWAP 1  /* Compute needed pages */
#define REDIS_IOJOB_DO_SWAP 2       /* Swap from memory to disk */
typedef struct iojob {
    int type;   /* Request type, REDIS_IOJOB_* */
    redisDb *db;/* Redis database */
    robj *key;  /* This I/O request is about swapping this key */
    robj *val;  /* the value to swap for REDIS_IOREQ_*_SWAP, otherwise this
                 * field is populated by the I/O thread for REDIS_IOREQ_LOAD. */
    off_t page; /* Swap page where to read/write the object */
    off_t pages; /* Swap pages needed to save object. PREPARE_SWAP return val */
    int canceled; /* True if this command was canceled by blocking side of VM */
    pthread_t thread; /* ID of the thread processing this entry */
} iojob;

类型为REDIS_IOJOB_LOAD的job用来加载某个value,类型为REDIS_IOJOB_DO_SWAP的job就用来换出某个 value,在换出value之前,需要创建类型为REDIS_IOJOB_PREPARE_SWAP的job来计算所需的交换页数。

无论是上述3种中的哪一种,新建的job都会使用queueIOJob放在io_newjobs队列中,而线程入口函数IOThreadEntryPoint 会将io_newjobs中的job移入server.io_processing,然后在做完job类型的工作后(加载value/计算value所需交换页数/换出value),将job从server.io_processing移入io_processed中。然后往 server.io_ready_pipe_write所在的管道(io_ready_pipe_read、io_ready_pipe_write组成管道的两端)写入一个字节,让睡眠中的vmThreadedIOCompletedJob继续运行,该函数会做些后续工作。

io_ready_clients保存了可以继续运行的client链表(之前因为等待value已阻塞),后面几个结构跟多线程的保护和全局的vm统计有关。

VM的初始化在vmInit中,主要做的工作就是上面介绍的几个结构的初始化。除此之外,最重要的工作就是设置管道的read事件的处理函数vmThreadedIOCompletedJob,该函数会在管道可读时运行,跟多线程的运行密切相关。

static void vmInit(void) {
    off_t totsize;
    int pipefds[2];
    size_t stacksize;
    struct flock fl;
    if (server.vm_max_threads != 0)
        zmalloc_enable_thread_safeness(); /* we need thread safe zmalloc() */
    redisLog(REDIS_NOTICE,"Using '%s' as swap file",server.vm_swap_file);
    /* Try to open the old swap file, otherwise create it */
    if ((server.vm_fp = fopen(server.vm_swap_file,"r+b")) == NULL) {
        server.vm_fp = fopen(server.vm_swap_file,"w+b");
    }
    if (server.vm_fp == NULL) {
        redisLog(REDIS_WARNING,
            "Can't open the swap file: %s. Exiting.",
            strerror(errno));
        exit(1);
    }
    server.vm_fd = fileno(server.vm_fp);
    /* Lock the swap file for writing, this is useful in order to avoid
     * another instance to use the same swap file for a config error. */
    fl.l_type = F_WRLCK;
    fl.l_whence = SEEK_SET;
    fl.l_start = fl.l_len = 0;
    if (fcntl(server.vm_fd,F_SETLK,&fl) == -1) {
        redisLog(REDIS_WARNING,
            "Can't lock the swap file at '%s': %s. Make sure it is not used by another Redis instance.", server.vm_swap_file, strerror(errno));
        exit(1);
    }
    /* Initialize */
    server.vm_next_page = 0;
    server.vm_near_pages = 0;
    server.vm_stats_used_pages = 0;
    server.vm_stats_swapped_objects = 0;
    server.vm_stats_swapouts = 0;
    server.vm_stats_swapins = 0;
    totsize = server.vm_pages*server.vm_page_size;
    redisLog(REDIS_NOTICE,"Allocating %lld bytes of swap file",totsize);
    if (ftruncate(server.vm_fd,totsize) == -1) {
        redisLog(REDIS_WARNING,"Can't ftruncate swap file: %s. Exiting.",
            strerror(errno));
        exit(1);
    } else {
        redisLog(REDIS_NOTICE,"Swap file allocated with success");
    }
    server.vm_bitmap = zmalloc((server.vm_pages+7)/8);
    redisLog(REDIS_VERBOSE,"Allocated %lld bytes page table for %lld pages",
        (long long) (server.vm_pages+7)/8, server.vm_pages);
    memset(server.vm_bitmap,0,(server.vm_pages+7)/8);
    /* Initialize threaded I/O (used by Virtual Memory) */
    server.io_newjobs = listCreate();
    server.io_processing = listCreate();
    server.io_processed = listCreate();
    server.io_ready_clients = listCreate();
    pthread_mutex_init(&server.io_mutex,NULL);
    pthread_mutex_init(&server.obj_freelist_mutex,NULL);
    pthread_mutex_init(&server.io_swapfile_mutex,NULL);
    server.io_active_threads = 0;
    if (pipe(pipefds) == -1) {
        redisLog(REDIS_WARNING,"Unable to intialized VM: pipe(2): %s. Exiting."
            ,strerror(errno));
        exit(1);
    }
    server.io_ready_pipe_read = pipefds[0];
    server.io_ready_pipe_write = pipefds[1];
    redisAssert(anetNonBlock(NULL,server.io_ready_pipe_read) != ANET_ERR);
    /* LZF requires a lot of stack */
    pthread_attr_init(&server.io_threads_attr);
    pthread_attr_getstacksize(&server.io_threads_attr, &stacksize);
    /* Solaris may report a stacksize of 0, let's set it to 1 otherwise 115
     * multiplying it by 2 in the while loop later will not really help   */
    if (!stacksize) stacksize = 1;
    while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;
    pthread_attr_setstacksize(&server.io_threads_attr, stacksize);
    /* Listen for events in the threaded I/O pipe */
    if (aeCreateFileEvent(server.el, server.io_ready_pipe_read, AE_READABLE,
        vmThreadedIOCompletedJob, NULL) == AE_ERR)
        oom("creating file event");
}

人气教程排行