时间:2021-07-01 10:21:17 帮助过:11人阅读
import redis
# redis 只有密码,没有用户名
# 字符串类型
r = redis.Redis(host=‘211.149.***.**‘, port=6379, password=‘******‘, db=1) # 端口默认6379
# r.set(‘qxy_session‘, ‘201801211506‘) # set数据
# print(r.get(‘qxy_session‘)) # redis取出来的数据都是bytes类型的 b‘201801211506‘
# print(r.get(‘qxy_session‘).decode()) # 所以需要用decode方法转成字符串 201801211506
# r.delete(‘qxy_session‘) # 删除一个
# r.setex(‘qxy‘, ‘hahaha‘, 20) # 可以指定key的失效时间,单位是秒
# set get delete setex 都是针对string类型的 k - v
# 这种写法是有层级的
r.set(‘qxy:test1‘, ‘没交作业‘)
r.set(‘qxy:test2‘, ‘交了作业‘)
print(r.keys()) # 获取所有的key
print(r.keys(‘qxy*‘)) # 以txz开头的key
print(r.type(‘qxy:test1‘)) # 获取key的类型
# hash类型
r.hset(‘qxy_sessions‘, ‘q1‘, ‘1‘) # 插入数据
r.hset(‘qxy_sessions‘, ‘q2‘, ‘2‘)
r.hset(‘qxy_sessions‘, ‘q3‘, ‘3‘)
print(r.hget(‘qxy_sessions‘, ‘q1‘).decode()) # 获取某条数据
print(r.hgetall(‘qxy_sessions‘)) # 获取hash类型中所有的类型
all_data = {}
for k,v in r.hgetall(‘qxy_sessions‘).items():
k = k.decode()
v = v.decode()
all_data[k] = v
print(all_data)
# hash类型没有过期时间
练习题
import redis
# 将redis中db1的数据迁移至db8中
r = redis.Redis(host=‘211.149.***.**‘, port=6379, password=‘******‘, db=1)
r_new = redis.Redis(host=‘211.149.***.**‘, port=6379, password=‘******‘, db=8)
for k in r.keys(‘‘):
if r.type(r.keys()) == b‘string‘: # 或者用decode()
v = r.get(k)
r_new.set(k, v)
print(v.decode())
elif r.type(r.keys()) == b‘hash‘:
keys = r.hgetall(k)
for kk, vv in keys.items():
r_new.hset(k, kk, vv)
1) 暂时代替第三方接口
2) 辅助测试:用来代替没有开发好的接口
3) 查看数据
2. 需先安装flask模块:pip install flask
import flask
from conf import config
import json
from lib.tools import op_mysql
# import tools # tools.op_mysql()
# 接口,后台服务
server = flask.Flask(__name__)
@server.route(‘/get_user‘, methods=[‘get‘, ‘post‘]) # 这句话表示这个函数变身为接口
def get_all_user():
sql = ‘select * from users;‘
response = op_mysql(host=config.HOST, user=config.USER, password=config.PASSWORD, db=config.DBNAME, port=config.PORT,
charset=‘utf8‘, sql=sql)
res = json.dumps(response, ensure_ascii=False, indent=4)
return res
@server.route(‘/add_user‘, methods=[‘post‘])
def add_users():
user = flask.request.values.get(‘user‘)
passwd = flask.request.values.get(‘passwd‘)
print(user, passwd)
if user and passwd:
sql = "insert into users values(‘%s‘,‘%s‘);" % (user, passwd)
op_mysql(host=config.HOST, user=config.USER, password=config.PASSWORD, db=config.DBNAME, port=config.PORT,
charset=‘utf8‘, sql=sql)
response = {‘code‘: 200, ‘msg‘: ‘操作成功‘}
else:
response = {‘code‘: 503, ‘msq‘: ‘必填参数未填‘}
return json.dumps(response, ensure_ascii=False)
# host=‘0.0.0.0‘ 代表一个局域网内的所有人都可以访问;加上debug:不需要重启服务
server.run(port=8888, host=‘0.0.0.0‘, debug=True)
在postman中访问这两个接口


第6课:datetime模块、操作数据库、__name__、redis、mock接口
标签:date 成功 into 插入数据 l数据库 文件导入 连接 时间 server