当前位置:Gxlcms > mysql > Avro用php请求python服务的例子

Avro用php请求python服务的例子

时间:2021-07-01 10:21:17 帮助过:58人阅读

Avro是Hadoop项目之一。主要用来支持数据密集型应用,它定义了一种数据格式并在多种编程语言中支持这种格式。我们最主要是用来操作Cassandra,其次是以RPC的方式,实现语言之间的相互调用。 架构中有很多轻量的python脚本,比如,PHP接收到一个URL,需要调用

Avro是Hadoop项目之一。主要用来支持数据密集型应用,它定义了一种数据格式并在多种编程语言中支持这种格式。我们最主要是用来操作Cassandra,其次是以RPC的方式,实现语言之间的相互调用。

架构中有很多轻量的python脚本,比如,PHP接收到一个URL,需要调用Python脚本,去取这个URL的标题,然后返回。之前是用cli从命令行直接调用,效果不是很好,经常会卡死,有时还会占用PHP的端口导致php-fpm无法启动。这种场景又不能用RabbitMQ之类的异步处理。所以用Avro实现了一个简单的调用,跑了一个多月,目前一切正常,现在分享相关代码。

原理:PHP和Python使用同一个Schema,用Python的HTTPServer做一个http服务,PHP端将数据以avro/binary的方式编码,传给Python,Python端以同样的方法解码,获得传递的数据,进行处理后返回结果给PHP端。

需要用到avro官方提供的Python模块和PHP库。用pip安装avro时,需要先安装libsnappy。

Schema

创建一个JSON文件,命名为avro-protocol.json,内容如下:

{"namespace": "com.dmyz.avro",
    "protocol": "Post",
    "types": [
        {"name": "Action", "type": "record",
            "fields": [
                {"name": "url","type": "string"},
                {"name": "charset","type": "string"}
            ]
        }
    ],
    "messages": {
        "send": {
            "request": [{"name": "message", "type": "Action"}],
            "response": "string"
        }
    }
}

以上内容定义了一个Avro的protocol,Python端根据它的定义来接受参数,生成服务;PHP端不提供服务,所以实际上只用到了定义中schema(type节点部分)。

Python

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
try:
    from cStringIO import StringIO
except ImportError:
    from StringIO import StringIO
?
import cgi
import json
from avro import io, ipc, protocol, schema
?
PROTOCOL_FILE = 'avro-protocol.json'
PROTOCOL_JSON = json.loads(open(PROTOCOL_FILE).read())
PROTOCOL = protocol.parse(open(PROTOCOL_FILE).read())
SCHEMA = schema.parse(json.dumps(PROTOCOL_JSON['types'][0]))
?
SERVER_ADDR = ('localhost', 9090)
?
class PostRequestHandler(BaseHTTPRequestHandler):
    def do_POST(self):
?
        length       = int(self.headers.getheader('content-length'))
        post_data    = self.rfile.read(length)
?
        data_reader  = StringIO(post_data)
        decoder      = io.BinaryDecoder(data_reader)
?
        datum_reader = io.DatumReader(SCHEMA)
        data         = datum_reader.read(decoder)
        url          = data['url'])
        charset      = data['charset']
        #已经取得从PHP发过来的数据,在这里执行其他逻辑
        self.wfile.write('title') #返回取到的title
?
if __name__ == '__main__':
    server = HTTPServer(SERVER_ADDR, PostRequestHandler)
    server.allow_reuse_address = True
    server.serve_forever()

PHP

?
$avro         = json_decode(file_get_contents('avro-protocol.json'), true);
?
$avroProtocol = new AvroProtocol();
$protocol     = $avroProtocol->real_parse($avro);
$schema       = AvroSchema::real_parse($avro['types'][0]);
?
$datum_writer = new AvroIODatumWriter($schema);
$write_io     = new AvroStringIO();
$encoder      = new AvroIOBinaryEncoder($write_io);
?
$message = array('url' => 'http://dmyz.org', 'charset' => 'utf-8');
$datum_writer->write($message, $encoder);
?
$content      = $write_io->string();
?
$headers = array(
    "POST / HTTP/1.1",
    "Host: " . self::$host,
    "Content-Type: avro/binary",
    "Content-Length: " . strlen($content),
);
?
$socket = stream_socket_client('localhost:9090', $errno, $errstr, 5);
?
if (!$socket)
    throw new Exception($errstr, $errno);
?
fwrite($socket, implode("\r\n", $headers) . "\r\n\r\n");
fwrite($socket, $content);
?
$result = '';
while (!feof($socket)) {
    $result .= fgets($socket);
}
fclose($socket);
?
return $result; #得到Python端返回的数据

人气教程排行