python 对mongodb进行压力测试
时间:2021-07-01 10:21:17
帮助过:3人阅读
from pymongo import Connection,MongoClient,MongoReplicaSetClient
import multiprocessing
import time
#connection = MongoClient(
‘mongodb://10.120.11.212:27017/‘)
#connection = Connection([
‘10.120.11.122‘,
‘10.120.11.221‘,
‘10.120.11.212‘],
27017)
‘‘‘数据库采用了读写分离设置,连接mongoDB的模式要配对‘‘‘
connection=
MongoReplicaSetClient(
‘10.120.11.122:27017,10.120.11.221:27017,10.120.11.212:27017‘,
replicaSet=
‘rs0‘,
read_preference=
3
# read_preference=
3
)
db = connection[
‘cms‘]
db.authenticate(‘cms‘,
‘cms‘)
#计时器
def func_time(func):
def _wrapper(*args,**
kwargs):
start =
time.time()
func(*args,**
kwargs)
print func.__name__,‘run:‘,time.time()-
start
return _wrapper
#插入测试方法
def insert(num):
posts =
db.userinfo
for x
in range(num):
post = {
"_id" : str(x),
"author": str(x),
"text":
"My first blog post!"
}
posts.insert(post)
#查询测试方法
def query(num):
get=
db.device
for i
in xrange(num):
get.find_one({
"scanid":
"010000138101010000009aaaaa"})
@func_time
def main(process_num,num):
pool = multiprocessing.Pool(processes=
process_num)
for i
in xrange(num):
pool.apply_async(query, (num, ))
pool.close()
pool.join()
print "Sub-process(es) done."
if __name__ ==
"__main__":
# query(500,
1)
main(800,
500)
原文发表于http://www.cnblogs.com/reach296/
python 对mongodb进行压力测试
标签: