时间:2021-07-01 10:21:17 帮助过:20人阅读
VM是Redis2.0新增的一个功能。在没有VM之前,redis会把db中的所有数据放在内存中。随着redis的不断运行,所使用的内存会越来越大。但同时,client对某些数据的访问频度明显会比其他数据高。redis引入VM功能来试图解决这个问题。简言之,VM使得redis会把很少
VM是Redis2.0新增的一个功能。在没有VM之前,redis会把db中的所有数据放在内存中。随着redis的不断运行,所使用的内存会越来越大。但同时,client对某些数据的访问频度明显会比其他数据高。redis引入VM功能来试图解决这个问题。简言之,VM使得redis会把很少访问的value保存到磁盘中。但同时,所有value的key都放在内存中,这是为了让被换出的value的查找在启用VM前后性能差不多。
VM在redis中算是redis中最复杂的模块之一,我们分三节来介绍。这一节介绍redis的主要数据结构,下一节介绍非阻塞方式,最后一节介绍多线程方式。
我们先来看看redis中的通用对象结构redisObject :
// VM启用时, 对象所处位置 #define REDIS_VM_MEMORY 0 /* The object is on memory */ #define REDIS_VM_SWAPPED 1 /* The object is on disk */ #define REDIS_VM_SWAPPING 2 /* Redis is swapping this object on disk */ #define REDIS_VM_LOADING 3 /* Redis is loading this object from disk */ /* The VM object structure */ struct redisObjectVM { off_t page; /* the page at witch the object is stored on disk */ off_t usedpages; /* number of pages used on disk */ time_t atime; /* Last access time */ } vm; /* The actual Redis Object */ // 通用类型 // 对于key,需额外标志保存value的位置、类型等 typedef struct redisObject { void *ptr; unsigned char type; unsigned char encoding; unsigned char storage; /* If this object is a key, where is the value? * REDIS_VM_MEMORY, REDIS_VM_SWAPPED, ... */ unsigned char vtype; /* If this object is a key, and value is swapped out, * this is the type of the swapped out object. */ int refcount; /* VM fields, this are only allocated if VM is active, otherwise the * object allocation function will just allocate * sizeof(redisObjct) minus sizeof(redisObjectVM), so using * Redis without VM active will not have any overhead. */ struct redisObjectVM vm; } robj;
robj 中的type保存了对象的类型,如string、list、set等。storage保存了该key对象对应的value所处的位置:内存、磁盘、正在被换出到磁盘,正在加载。vtype表示该key对象所对应的value的类型。page和usedpages保存了该key对象所对应的 value,atime是value的最后一次访问时间。因此,当robj所表示的key对象的storage类型为REDIS_VM_SWAPPED 时,就表示该key的value已不在内存中,需从VM中page的位置加载该value,vaue的类型为vtype,大小为usedpages。
创建对象的时候,根据是否启用VM机制,来分配合适大小的robj对象大小。
static robj *createObject(int type, void *ptr) { --- else { if (server.vm_enabled) { pthread_mutex_unlock(&server.obj_freelist_mutex); o = zmalloc(sizeof(*o)); } else { o = zmalloc(sizeof(*o)-sizeof(struct redisObjectVM)); } } --- if (server.vm_enabled) { /* Note that this code may run in the context of an I/O thread * and accessing to server.unixtime in theory is an error * (no locks). But in practice this is safe, and even if we read * garbage Redis will not fail, as it's just a statistical info */ o->vm.atime = server.unixtime; o->storage = REDIS_VM_MEMORY; } return o; }
VM的所有相关结构保存在redisServer 的如下几个字段中。
/* Global server state structure */ struct redisServer { --- /* Virtual memory state */ FILE *vm_fp; int vm_fd; off_t vm_next_page; /* Next probably empty page */ off_t vm_near_pages; /* Number of pages allocated sequentially */ unsigned char *vm_bitmap; /* Bitmap of free/used pages */ time_t unixtime; /* Unix time sampled every second. */ /* Virtual memory I/O threads stuff */ /* An I/O thread process an element taken from the io_jobs queue and * put the result of the operation in the io_done list. While the * job is being processed, it's put on io_processing queue. */ list *io_newjobs; /* List of VM I/O jobs yet to be processed */ list *io_processing; /* List of VM I/O jobs being processed */ list *io_processed; /* List of VM I/O jobs already processed */ list *io_ready_clients; /* Clients ready to be unblocked. All keys loaded */ pthread_mutex_t io_mutex; /* lock to access io_jobs/io_done/io_thread_job */ pthread_mutex_t obj_freelist_mutex; /* safe redis objects creation/free */ pthread_mutex_t io_swapfile_mutex; /* So we can lseek + write */ pthread_attr_t io_threads_attr; /* attributes for threads creation */ int io_active_threads; /* Number of running I/O threads */ int vm_max_threads; /* Max number of I/O threads running at the same time */ /* Our main thread is blocked on the event loop, locking for sockets ready * to be read or written, so when a threaded I/O operation is ready to be * processed by the main thread, the I/O thread will use a unix pipe to * awake the main thread. The followings are the two pipe FDs. */ int io_ready_pipe_read; int io_ready_pipe_write; /* Virtual memory stats */ unsigned long long vm_stats_used_pages; unsigned long long vm_stats_swapped_objects; unsigned long long vm_stats_swapouts; unsigned long long vm_stats_swapins; --- };
vm_fp 和vm_fd指向磁盘上的vm文件,通过这两个指针来读写vm文件。vm_bitmap管理着vm文件中每一页的分配与释放情况(每一项为0表示该页空闲,为1表示已使用)。每一页的大小通过vm-page-size来配置,页数通过vm-pages来配置。值得一提的是,redis中的每一页最多只能放置一个对象,一个对象可以放在连续的多个页上。unixtime只是缓存时间值,这在计算value的最近使用频率时会用到。接下来的结构跟多线程方式换出/换进vlue有关。使用多线程方式时,换进/换出value被看成一个个的job,job的类型有如下几种:
/* VM threaded I/O request message */ #define REDIS_IOJOB_LOAD 0 /* Load from disk to memory */ #define REDIS_IOJOB_PREPARE_SWAP 1 /* Compute needed pages */ #define REDIS_IOJOB_DO_SWAP 2 /* Swap from memory to disk */ typedef struct iojob { int type; /* Request type, REDIS_IOJOB_* */ redisDb *db;/* Redis database */ robj *key; /* This I/O request is about swapping this key */ robj *val; /* the value to swap for REDIS_IOREQ_*_SWAP, otherwise this * field is populated by the I/O thread for REDIS_IOREQ_LOAD. */ off_t page; /* Swap page where to read/write the object */ off_t pages; /* Swap pages needed to save object. PREPARE_SWAP return val */ int canceled; /* True if this command was canceled by blocking side of VM */ pthread_t thread; /* ID of the thread processing this entry */ } iojob;
类型为REDIS_IOJOB_LOAD的job用来加载某个value,类型为REDIS_IOJOB_DO_SWAP的job就用来换出某个 value,在换出value之前,需要创建类型为REDIS_IOJOB_PREPARE_SWAP的job来计算所需的交换页数。
无论是上述3种中的哪一种,新建的job都会使用queueIOJob放在io_newjobs队列中,而线程入口函数IOThreadEntryPoint 会将io_newjobs中的job移入server.io_processing,然后在做完job类型的工作后(加载value/计算value所需交换页数/换出value),将job从server.io_processing移入io_processed中。然后往 server.io_ready_pipe_write所在的管道(io_ready_pipe_read、io_ready_pipe_write组成管道的两端)写入一个字节,让睡眠中的vmThreadedIOCompletedJob继续运行,该函数会做些后续工作。
io_ready_clients保存了可以继续运行的client链表(之前因为等待value已阻塞),后面几个结构跟多线程的保护和全局的vm统计有关。
VM的初始化在vmInit中,主要做的工作就是上面介绍的几个结构的初始化。除此之外,最重要的工作就是设置管道的read事件的处理函数vmThreadedIOCompletedJob,该函数会在管道可读时运行,跟多线程的运行密切相关。
static void vmInit(void) { off_t totsize; int pipefds[2]; size_t stacksize; struct flock fl; if (server.vm_max_threads != 0) zmalloc_enable_thread_safeness(); /* we need thread safe zmalloc() */ redisLog(REDIS_NOTICE,"Using '%s' as swap file",server.vm_swap_file); /* Try to open the old swap file, otherwise create it */ if ((server.vm_fp = fopen(server.vm_swap_file,"r+b")) == NULL) { server.vm_fp = fopen(server.vm_swap_file,"w+b"); } if (server.vm_fp == NULL) { redisLog(REDIS_WARNING, "Can't open the swap file: %s. Exiting.", strerror(errno)); exit(1); } server.vm_fd = fileno(server.vm_fp); /* Lock the swap file for writing, this is useful in order to avoid * another instance to use the same swap file for a config error. */ fl.l_type = F_WRLCK; fl.l_whence = SEEK_SET; fl.l_start = fl.l_len = 0; if (fcntl(server.vm_fd,F_SETLK,&fl) == -1) { redisLog(REDIS_WARNING, "Can't lock the swap file at '%s': %s. Make sure it is not used by another Redis instance.", server.vm_swap_file, strerror(errno)); exit(1); } /* Initialize */ server.vm_next_page = 0; server.vm_near_pages = 0; server.vm_stats_used_pages = 0; server.vm_stats_swapped_objects = 0; server.vm_stats_swapouts = 0; server.vm_stats_swapins = 0; totsize = server.vm_pages*server.vm_page_size; redisLog(REDIS_NOTICE,"Allocating %lld bytes of swap file",totsize); if (ftruncate(server.vm_fd,totsize) == -1) { redisLog(REDIS_WARNING,"Can't ftruncate swap file: %s. Exiting.", strerror(errno)); exit(1); } else { redisLog(REDIS_NOTICE,"Swap file allocated with success"); } server.vm_bitmap = zmalloc((server.vm_pages+7)/8); redisLog(REDIS_VERBOSE,"Allocated %lld bytes page table for %lld pages", (long long) (server.vm_pages+7)/8, server.vm_pages); memset(server.vm_bitmap,0,(server.vm_pages+7)/8); /* Initialize threaded I/O (used by Virtual Memory) */ server.io_newjobs = listCreate(); server.io_processing = listCreate(); server.io_processed = listCreate(); server.io_ready_clients = listCreate(); pthread_mutex_init(&server.io_mutex,NULL); pthread_mutex_init(&server.obj_freelist_mutex,NULL); pthread_mutex_init(&server.io_swapfile_mutex,NULL); server.io_active_threads = 0; if (pipe(pipefds) == -1) { redisLog(REDIS_WARNING,"Unable to intialized VM: pipe(2): %s. Exiting." ,strerror(errno)); exit(1); } server.io_ready_pipe_read = pipefds[0]; server.io_ready_pipe_write = pipefds[1]; redisAssert(anetNonBlock(NULL,server.io_ready_pipe_read) != ANET_ERR); /* LZF requires a lot of stack */ pthread_attr_init(&server.io_threads_attr); pthread_attr_getstacksize(&server.io_threads_attr, &stacksize); /* Solaris may report a stacksize of 0, let's set it to 1 otherwise 115 * multiplying it by 2 in the while loop later will not really help */ if (!stacksize) stacksize = 1; while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2; pthread_attr_setstacksize(&server.io_threads_attr, stacksize); /* Listen for events in the threaded I/O pipe */ if (aeCreateFileEvent(server.el, server.io_ready_pipe_read, AE_READABLE, vmThreadedIOCompletedJob, NULL) == AE_ERR) oom("creating file event"); }
原文地址:redis源代码分析23–VM(上), 感谢原作者分享。