当前位置:Gxlcms > PHP教程 > php实现的memcached队列类

php实现的memcached队列类

时间:2021-07-01 10:21:17 帮助过:6人阅读

  1. /*
  2. * memcache队列类
  3. * 支持多进程并发写入、读取
  4. * 边写边读,AB面轮值替换
  5. * @author guoyu
  6. * @create on 9:25 2014-9-28
  7. * @qq技术行业交流群:136112330
  8. *
  9. * @example:
  10. * $obj = new memcacheQueue('duilie');
  11. * $obj->add('1asdf');
  12. * $obj->getQueueLength();
  13. * $obj->read(11);
  14. * $obj->get(8);
  15. */
  16. class memcacheQueue{
  17. public static $client; //memcache客户端连接
  18. public $access; //队列是否可更新
  19. private $currentSide; //当前轮值的队列面:A/B
  20. private $lastSide; //上一轮值的队列面:A/B
  21. private $sideAHead; //A面队首值
  22. private $sideATail; //A面队尾值
  23. private $sideBHead; //B面队首值
  24. private $sideBTail; //B面队尾值
  25. private $currentHead; //当前队首值
  26. private $currentTail; //当前队尾值
  27. private $lastHead; //上轮队首值
  28. private $lastTail; //上轮队尾值
  29. private $expire; //过期时间,秒,1~2592000,即30天内;0为永不过期
  30. private $sleepTime; //等待解锁时间,微秒
  31. private $queueName; //队列名称,唯一值
  32. private $retryNum; //重试次数,= 10 * 理论并发数
  33. const MAXNUM = 2000; //(单面)最大队列数,建议上限10K
  34. const HEAD_KEY = '_lkkQueueHead_'; //队列首kye
  35. const TAIL_KEY = '_lkkQueueTail_'; //队列尾key
  36. const VALU_KEY = '_lkkQueueValu_'; //队列值key
  37. const LOCK_KEY = '_lkkQueueLock_'; //队列锁key
  38. const SIDE_KEY = '_lkkQueueSide_'; //轮值面key
  39. /*
  40. * 构造函数
  41. * @param [config] array memcache服务器参数
  42. * @param [queueName] string 队列名称
  43. * @param [expire] string 过期时间
  44. * @return NULL
  45. */
  46. public function __construct($queueName ='',$expire='',$config =''){
  47. if(empty($config)){
  48. self::$client = memcache_pconnect('localhost',11211);
  49. }elseif(is_array($config)){//array('host'=>'127.0.0.1','port'=>'11211')
  50. self::$client = memcache_pconnect($config['host'],$config['port']);
  51. }elseif(is_string($config)){//"127.0.0.1:11211"
  52. $tmp = explode(':',$config);
  53. $conf['host'] = isset($tmp[0]) ? $tmp[0] : '127.0.0.1';
  54. $conf['port'] = isset($tmp[1]) ? $tmp[1] : '11211';
  55. self::$client = memcache_pconnect($conf['host'],$conf['port']);
  56. }
  57. if(!self::$client) return false;
  58. ignore_user_abort(TRUE);//当客户断开连接,允许继续执行
  59. set_time_limit(0);//取消脚本执行延时上限
  60. $this->access = false;
  61. $this->sleepTime = 1000;
  62. $expire = (empty($expire) && $expire!=0) ? 3600 : (int)$expire;
  63. $this->expire = $expire;
  64. $this->queueName = $queueName;
  65. $this->retryNum = 10000;
  66. $side = memcache_add(self::$client, $queueName . self::SIDE_KEY, 'A',false, $expire);
  67. $this->getHeadNTail($queueName);
  68. if(!isset($this->sideAHead) || empty($this->sideAHead)) $this->sideAHead = 0;
  69. if(!isset($this->sideATail) || empty($this->sideATail)) $this->sideATail = 0;
  70. if(!isset($this->sideBHead) || empty($this->sideBHead)) $this->sideBHead = 0;
  71. if(!isset($this->sideBHead) || empty($this->sideBHead)) $this->sideBHead = 0;
  72. }
  73. /*
  74. * 获取队列首尾值
  75. * @param [queueName] string 队列名称
  76. * @return NULL
  77. */
  78. private function getHeadNTail($queueName){
  79. $this->sideAHead = (int)memcache_get(self::$client, $queueName.'A'. self::HEAD_KEY);
  80. $this->sideATail = (int)memcache_get(self::$client, $queueName.'A'. self::TAIL_KEY);
  81. $this->sideBHead = (int)memcache_get(self::$client, $queueName.'B'. self::HEAD_KEY);
  82. $this->sideBTail = (int)memcache_get(self::$client, $queueName.'B'. self::TAIL_KEY);
  83. }
  84. /*
  85. * 获取当前轮值的队列面
  86. * @return string 队列面名称
  87. */
  88. public function getCurrentSide(){
  89. $currentSide = memcache_get(self::$client, $this->queueName . self::SIDE_KEY);
  90. if($currentSide == 'A'){
  91. $this->currentSide = 'A';
  92. $this->lastSide = 'B';
  93. $this->currentHead = $this->sideAHead;
  94. $this->currentTail = $this->sideATail;
  95. $this->lastHead = $this->sideBHead;
  96. $this->lastTail = $this->sideBTail;
  97. }else{
  98. $this->currentSide = 'B';
  99. $this->lastSide = 'A';
  100. $this->currentHead = $this->sideBHead;
  101. $this->currentTail = $this->sideBTail;
  102. $this->lastHead = $this->sideAHead;
  103. $this->lastTail = $this->sideATail;
  104. }
  105. return $this->currentSide;
  106. }
  107. /*
  108. * 队列加锁
  109. * @return boolean
  110. */
  111. private function getLock(){
  112. if($this->access === false){
  113. while(!memcache_add(self::$client, $this->queueName .self::LOCK_KEY, 1, false, $this->expire) ){
  114. usleep($this->sleepTime);
  115. @$i++;
  116. if($i > $this->retryNum){//尝试等待N次
  117. return false;
  118. break;
  119. }
  120. }
  121. return $this->access = true;
  122. }
  123. return false;
  124. }
  125. /*
  126. * 队列解锁
  127. * @return NULL
  128. */
  129. private function unLock(){
  130. memcache_delete(self::$client, $this->queueName .self::LOCK_KEY);
  131. $this->access = false;
  132. }
  133. /*
  134. * 添加数据
  135. * @param [data] 要存储的值
  136. * @return boolean
  137. */
  138. public function add($data){
  139. $result = false;
  140. if(!$this->getLock()){
  141. return $result;
  142. }
  143. $this->getHeadNTail($this->queueName);
  144. $this->getCurrentSide();
  145. if($this->isFull()){
  146. $this->unLock();
  147. return false;
  148. }
  149. if($this->currentTail < self::MAXNUM){
  150. $value_key = $this->queueName .$this->currentSide . self::VALU_KEY . $this->currentTail;
  151. if(memcache_add(self::$client, $value_key, $data, false, $this->expire)){
  152. $this->changeTail();
  153. $result = true;
  154. }
  155. }else{//当前队列已满,更换轮值面
  156. $this->unLock();
  157. $this->changeCurrentSide();
  158. return $this->add($data);
  159. }
  160. $this->unLock();
  161. return $result;
  162. }
  163. /*
  164. * 取出数据
  165. * @param [length] int 数据的长度
  166. * @return array
  167. */
  168. public function get($length=0){
  169. if(!is_numeric($length)) return false;
  170. if(empty($length)) $length = self::MAXNUM * 2;//默认读取所有
  171. if(!$this->getLock()) return false;
  172. if($this->isEmpty()){
  173. $this->unLock();
  174. return false;
  175. }
  176. $keyArray = $this->getKeyArray($length);
  177. $lastKey = $keyArray['lastKey'];
  178. $currentKey = $keyArray['currentKey'];
  179. $keys = $keyArray['keys'];
  180. $this->changeHead($this->lastSide,$lastKey);
  181. $this->changeHead($this->currentSide,$currentKey);
  182. $data = @memcache_get(self::$client, $keys);
  183. foreach($keys as $v){//取出之后删除
  184. @memcache_delete(self::$client, $v, 0);
  185. }
  186. $this->unLock();
  187. return $data;
  188. }
  189. /*
  190. * 读取数据
  191. * @param [length] int 数据的长度
  192. * @return array
  193. */
  194. public function read($length=0){
  195. if(!is_numeric($length)) return false;
  196. if(empty($length)) $length = self::MAXNUM * 2;//默认读取所有
  197. $keyArray = $this->getKeyArray($length);
  198. $data = @memcache_get(self::$client, $keyArray['keys']);
  199. return $data;
  200. }
  201. /*
  202. * 获取队列某段长度的key数组
  203. * @param [length] int 队列长度
  204. * @return array
  205. */
  206. private function getKeyArray($length){
  207. $result = array('keys'=>array(),'lastKey'=>array(),'currentKey'=>array());
  208. $this->getHeadNTail($this->queueName);
  209. $this->getCurrentSide();
  210. if(empty($length)) return $result;
  211. //先取上一面的key
  212. $i = $result['lastKey'] = 0;
  213. for($i=0;$i<$length;$i++){
  214. $result['lastKey'] = $this->lastHead + $i;
  215. if($result['lastKey'] >= $this->lastTail) break;
  216. $result['keys'][] = $this->queueName .$this->lastSide . self::VALU_KEY . $result['lastKey'];
  217. }
  218. //再取当前面的key
  219. $j = $length - $i;
  220. $k = $result['currentKey'] = 0;
  221. for($k=0;$k<$j;$k++){
  222. $result['currentKey'] = $this->currentHead + $k;
  223. if($result['currentKey'] >= $this->currentTail) break;
  224. $result['keys'][] = $this->queueName .$this->currentSide . self::VALU_KEY . $result['currentKey'];
  225. }
  226. return $result;
  227. }
  228. /*
  229. * 更新当前轮值面队列尾的值
  230. * @return NULL
  231. */
  232. private function changeTail(){
  233. $tail_key = $this->queueName .$this->currentSide . self::TAIL_KEY;
  234. memcache_add(self::$client, $tail_key, 0,false, $this->expire);//如果没有,则插入;有则false;
  235. //memcache_increment(self::$client, $tail_key, 1);//队列尾+1
  236. $v = memcache_get(self::$client, $tail_key) +1;
  237. memcache_set(self::$client, $tail_key,$v,false,$this->expire);
  238. }
  239. /*
  240. * 更新队列首的值
  241. * @param [side] string 要更新的面
  242. * @param [headValue] int 队列首的值
  243. * @return NULL
  244. */
  245. private function changeHead($side,$headValue){
  246. if($headValue < 1) return false;
  247. $head_key = $this->queueName .$side . self::HEAD_KEY;
  248. $tail_key = $this->queueName .$side . self::TAIL_KEY;
  249. $sideTail = memcache_get(self::$client, $tail_key);
  250. if($headValue < $sideTail){
  251. memcache_set(self::$client, $head_key,$headValue+1,false,$this->expire);
  252. }elseif($headValue >= $sideTail){
  253. $this->resetSide($side);
  254. }
  255. }
  256. /*
  257. * 重置队列面,即将该队列面的队首、队尾值置为0
  258. * @param [side] string 要重置的面
  259. * @return NULL
  260. */
  261. private function resetSide($side){
  262. $head_key = $this->queueName .$side . self::HEAD_KEY;
  263. $tail_key = $this->queueName .$side . self::TAIL_KEY;
  264. memcache_set(self::$client, $head_key,0,false,$this->expire);
  265. memcache_set(self::$client, $tail_key,0,false,$this->expire);
  266. }
  267. /*
  268. * 改变当前轮值队列面
  269. * @return string
  270. */
  271. private function changeCurrentSide(){
  272. $currentSide = memcache_get(self::$client, $this->queueName . self::SIDE_KEY);
  273. if($currentSide == 'A'){
  274. memcache_set(self::$client, $this->queueName . self::SIDE_KEY,'B',false,$this->expire);
  275. $this->currentSide = 'B';
  276. }else{
  277. memcache_set(self::$client, $this->queueName . self::SIDE_KEY,'A',false,$this->expire);
  278. $this->currentSide = 'A';
  279. }
  280. return $this->currentSide;
  281. }
  282. /*
  283. * 检查当前队列是否已满
  284. * @return boolean
  285. */
  286. public function isFull(){
  287. $result = false;
  288. if($this->sideATail == self::MAXNUM && $this->sideBTail == self::MAXNUM){
  289. $result = true;
  290. }
  291. return $result;
  292. }
  293. /*
  294. * 检查当前队列是否为空
  295. * @return boolean
  296. */
  297. public function isEmpty(){
  298. $result = true;
  299. if($this->sideATail > 0 || $this->sideBTail > 0){
  300. $result = false;
  301. }
  302. return $result;
  303. }
  304. /*
  305. * 获取当前队列的长度
  306. * 该长度为理论长度,某些元素由于过期失效而丢失,真实长度小于或等于该长度
  307. * @return int
  308. */
  309. public function getQueueLength(){
  310. $this->getHeadNTail($this->queueName);
  311. $this->getCurrentSide();
  312. $sideALength = $this->sideATail - $this->sideAHead;
  313. $sideBLength = $this->sideBTail - $this->sideBHead;
  314. $result = $sideALength + $sideBLength;
  315. return $result;
  316. }
  317. /*
  318. * 清空当前队列数据,仅保留HEAD_KEY、TAIL_KEY、SIDE_KEY三个key
  319. * @return boolean
  320. */
  321. public function clear(){
  322. if(!$this->getLock()) return false;
  323. for($i=0;$i @memcache_delete(self::$client, $this->queueName.'A'. self::VALU_KEY .$i, 0);
  324. @memcache_delete(self::$client, $this->queueName.'B'. self::VALU_KEY .$i, 0);
  325. }
  326. $this->unLock();
  327. $this->resetSide('A');
  328. $this->resetSide('B');
  329. return true;
  330. }
  331. /*
  332. * 清除所有memcache缓存数据
  333. * @return NULL
  334. */
  335. public function memFlush(){
  336. memcache_flush(self::$client);
  337. }
  338. }

php, memcached

人气教程排行