当前位置:Gxlcms > PHP教程 > RabbitMQ与PHP(二)——相关服务安装及怎么用PHP作为守护模式处理消息

RabbitMQ与PHP(二)——相关服务安装及怎么用PHP作为守护模式处理消息

时间:2021-07-01 10:21:17 帮助过:13人阅读

RabbitMQ与PHP(二)—— 相关服务安装及如何用PHP作为守护模式处理消息

在上一节中,详细介绍了RabbitMQ的exchange/routingkey/queue等概念,以及示例了如何使用PHP发送和处理消息的代码。这一节,将介绍在项目中如何使用PHP多线程的进行消息实时处理,以及简要介绍一些RabbitMQ的安装相关。熟悉的可以将安装这部分跳过。

一、RabbitMQ的安装:

需要首先安装erlang

#安装jdk环境sudo apt-get install openjdk-7-jdk#安装相关类库和工具包sudo apt-get install libncurses5-dev m4 fop freeglut3-dev libwxgtk2.8-dev g++ libssl-dev xsltproc build-essential tk8.5  unixodbc unixodbc-dev libxml2-utils#下载erlang安装包并安装wget http://www.erlang.org/download/otp_src_R16B03-1.tar.gztar -xzvf otp_src_R16B03-1.tar.gzcd otp_src_R16B03-1./configure --prefix=/usr/local/erlangmakesudo make install

注意:?这里将erlang安装到了指定的目录:?/usr/local/erlang,而不是使用默认的路径。这是一个好的习惯,对于版本控制等都会有好处。但是这会导致后面?rabbitMQ报错:找不到erl?执行文件,需要多做一些处理才行。

更新环境变量:

sudo vi /etc/profile

在最后一行加上

export PATH=/usr/local/erlang/bin:$PATH

保存退出后

source /etc/profile

然后到命令行中输入erl看是否安装成功

安装RabbitMQ

wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.2.3/rabbitmq-server-generic-unix-3.2.3.tar.gztar -xzvf rabbitmq-server-generic-unix-3.2.3.tar.gzmv rabbitmq_server-3.2.3/ /usr/local/cd /usr/local/rabbitmq_server-3.2.3/sbin./rabbitmq-server

到这里如果出现一个报错信息:?./rabbitmq-server:?line?86:?erl:?command?not?found??这是因为erlang指定了安装路径,在系统的PATH中找不到。只要export?PATH=$PATH:/usr/local/erlang/bin?就可以了。

如果为了rc.local启动方便,可以将?export?PATH=$PATH:/usr/local/erlang/bin?这一行写入到?rabbitmq-server?文件中:

1f178a82b9014a90f9dd87cda8773912b21beedb

执行后,ps?-aux?一下,看到进程中有/usr/local/erlang/lib/erlang/erts-5.10.1/bin/epmd?-daemon?和?/usr/local/erlang/lib/erlang/erts-5.10.1/bin/beam.smp?就OK了。

sbin目录下还有一个脚本:?rabbitmqctl?也很常用,与?rabbitmq-server?一样需要指明erlang的路径才能正确工作。

常用的方法:

rabbitmqctl?start_app?启动后,执行一下这个比较保险rabbitmqctl?list_exchanges?显示当前所有的交换机rabbitmqctl?list_queue?查看当前有效队列情况

二、 PHP?extension的安装:

PHP操作rabbitmq需要AMQP扩展的支持。下载扩展:?http://pecl.php.com/package/amqp?,安装过程与一般扩展一样,

/usr/local/php/bin/phpize./configure?--with-php-config=/usr/local/php/bin/php-configsudo make?&&?make?install

在编译安装过程中如果报错信息,可参考这篇文章:Ubuntu 12.04安装RabbitMQ的PHP扩展出现的问题及解决办法。

然后编辑php.ini?插入:

[amqp]extension?=?amqp.so

重启apache或nginx,查看phpinfo其中有关于anqp的段落,就OK了。

0df431adcbef760936f25aef2fdda3cc7dd99ec1

三、如何使用PHP进行实时后端消息处理

首先,要保证PHP文件可以正确的以堵塞方式处理消息。代码可参见上一节RabbitMQ与PHP(一)。

然后,我们来借助Python实现一个多线程的守护进程,由这个守护进程来调用PHP,把PHP作为工作线程。

启动脚本:start_jobs.py

# _*_ coding:utf-8 _*_'''yoka at 实现多线程处理任务的守护进程Created on 2012-4-7@author: xwarrior@update: [email protected]'''#在此引入项目需要的数据包from MyJobs import MyJobsfrom MyThread import MyThreadimport loggingimport timedef main():    logger = logging.getLogger('main()')    logger.info('server start!')    worker_threads = 2 #定义需要启动的线程数量    timeline = 2 #线程检查时间间隔,秒    thread_pool = {}    for i in range(0, worker_threads ):        param = 'some param'        job = MyJobs( param )        thread = MyThread( job, i )        thread.setDaemon = True        thread.start()        logger.info('start thread %s' %( thread.getName() ))        thread_pool[i] = thread    #干完就结束模式    #for eachKey in thread_pool.keys():    # thread_pool[eachKey].join()    #保持线程数量模式    while 1:        time.sleep(timeline)        # 检查每一个线程        for eachKey in thread_pool.keys():            if thread_pool[eachKey].isAlive():                print 'thread alive:' + str(i)            else:                print 'thread down:' + str(i)                thread_pool[eachKey].run()    logger.info('main exist!')    returnif __name__ == '__main__':    #init config format    FORMAT = '%(asctime)-15s %(name)s %(levelname)s file %(filename)s:lineno %(lineno)s - %(message)s'    logging.basicConfig(format=FORMAT,level=logging.INFO)    main()    pass

线程脚本:?MyThread.py

# _*_ coding:utf-8 _*_'''Created on 2013-03-25@author: [email protected]'''from threading import Threadclass MyThread(Thread):    '''    创建线程    '''    def __init__(self,job,thread_id):        '''        Constructor        '''        self.job = job        Thread.__init__(self, name = 'my_thread_%s' %(thread_id))    def run(self):        self.job.run()    def stop(self):        self.job.exit()

任务脚本:?MyJobs.py

# _*_ coding:utf-8 _*_'''Created on 2013-03-25@author: [email protected]'''import osimport urllib2class MyJobs(object):    def __init__(self, param ):        #do something        self.param = param    def __del__(self):        ''' destruct '''        self.exit()    def exit(self):        ''' 退出'''        self.quit = True    def run(self):        ''' 开始处理 '''        #使用shell模式        #cmd = '/usr/bin/curl "http://at.yoka.com/try/amqp_consume.php?key=' + str(self.param) + '"'        cmd = '/usr/local/php/bin/php -c /usr/local/php/lib/nginx.ini /home/jimmy/at/DocumentRoot/try/amqp_consume.php ' + str(self.param)        re = os.system(cmd)        #使用web模式        #req = urllib2.Request('http://at.yoka.com/try/amqp_consume.php?key=' + str(self.param))        #response = urllib2.urlopen(req)        #re = response.read()        #print re

在任务调度(start_jobs.py)中,设计了两种工作模式:

一种工作模式是一共启动N个线程去干活,适合于尽快完成一个大任务;

另一种是保持进程数量,当发现某个进程完成后,再重新将进程启动起来。显然,用户守护处理消息适合这种模式。

具体工作在MyJob.py中,提供了系统Shell调用和采用URL调用两种方式。推荐使用shell直接调用php的方式,这样可以灵活控制Php.ini,比如增加auto_prepend_file、增长max_execution_time等。

实际项目中,假定有5种类型的消息,可以启动20个线程,将thread_id当作参数传递给PHP。PHP将thread_id%5当作待处理类型,就可以得到每种类型有4个线程工作的场景了。

考虑到PHP的执行时间限制及内存泄露问题,可以将consume.php脚本进行一下改进,让PHP脚本每次处理指定数量的消息后就退出,由Python多线程框架重新启动线程,以保证运行稳定可靠。另外,将应答改为手工应答,确保消息获得正确有效处理。

/$q->consume('processMessage'); //需手动应答/*** 消费回调函数*/function processMessage($envelope, $queue) {    global $counter;    $msg = $envelope->getBody();    echo $msg."\n"; //处理消息    $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答    if($counter++ > 5)return FALSE; //处理5个消息后退出}

用?两个线程,检查间隔2秒,SHELL模式?测试运行结果:

f703738da9773912b0eab410f9198618377ae2f6

由上图可见,运行开始后,检查2个线程都处于活跃状态,并对消息进行了正确处理,当处理到一定数量后,PHP程序结束,父进程检查到有进程处于完成状态,重新将其启动(第二个绿色框)。完全与预期相符。

?

http://nonfu.me/p/8838.html

人气教程排行