当前位置:Gxlcms > Python > 常用python编程模板汇总

常用python编程模板汇总

时间:2021-07-01 10:21:17 帮助过:86人阅读

在我们编程时,有一些代码是固定的,例如Socket连接的代码,读取文件内容的代码,一般情况下我都是到网上搜一下然后直接粘贴下来改一改,当然如果你能自己记住所有的代码那更厉害,但是自己写毕竟不如粘贴来的快,而且自己写的代码还要测试,而一段经过测试的代码则可以多次使用,所以这里我就自己总结了一下python中常用的编程模板,如果还有哪些漏掉了请大家及时补充哈。

一、读写文件

1、读文件

(1)、一次性读取全部内容

filepath='D:/data.txt' #文件路径

with open(filepath, 'r') as f:
  print f.read()

(2)读取固定字节大小

# -*- coding: UTF-8 -*-

filepath='D:/data.txt' #文件路径

f = open(filepath, 'r')
content=""
try:
  while True:
    chunk = f.read(8)
    if not chunk:
      break
    content+=chunk
finally:
  f.close()
  print content

(3)每次读取一行

# -*- coding: UTF-8 -*-

filepath='D:/data.txt' #文件路径

f = open(filepath, "r")
content=""
try:
  while True:
    line = f.readline()
    if not line:
      break
    content+=line
finally:
  f.close()
  print content

(4)一次读取所有的行

# -*- coding: UTF-8 -*-

filepath='D:/data.txt' #文件路径

with open(filepath, "r") as f:
  txt_list = f.readlines()

for i in txt_list:
  print i,

2、写文件

# -*- coding: UTF-8 -*-

filepath='D:/data1.txt' #文件路径

with open(filepath, "w") as f: #w会覆盖原来的文件,a会在文件末尾追加
  f.write('1234')

二、连接Mysql数据库

1、连接

#!/usr/bin/python
# -*- coding: UTF-8 -*-

import MySQLdb

DB_URL='localhost'
USER_NAME='root'
PASSWD='1234'
DB_NAME='test'

# 打开数据库连接
db = MySQLdb.connect(DB_URL,USER_NAME,PASSWD,DB_NAME)

# 使用cursor()方法获取操作游标 
cursor = db.cursor()

# 使用execute方法执行SQL语句
cursor.execute("SELECT VERSION()")

# 使用 fetchone() 方法获取一条数据库。
data = cursor.fetchone()

print "Database version : %s " % data

# 关闭数据库连接
db.close()

2、创建表

#!/usr/bin/python # -*- coding: UTF-8 -*- import MySQLdb # 打开数据库连接 db = MySQLdb.connect("localhost","testuser","test123","TESTDB" ) # 使用cursor()方法获取操作游标 cursor = db.cursor() # 如果数据表已经存在使用 execute() 方法删除表。 cursor.execute("DROP TABLE IF EXISTS EMPLOYEE") # 创建数据表SQL语句 sql = """CREATE TABLE EMPLOYEE ( FIRST_NAME CHAR(20) NOT NULL, LAST_NAME CHAR(20), AGE INT, SEX CHAR(1), INCOME FLOAT )""" cursor.execute(sql) # 关闭数据库连接 db.close()

3、插入

#!/usr/bin/python
# -*- coding: UTF-8 -*-

import MySQLdb

# 打开数据库连接
db = MySQLdb.connect("localhost","testuser","test123","TESTDB" )

# 使用cursor()方法获取操作游标 
cursor = db.cursor()

# SQL 插入语句
sql = """INSERT INTO EMPLOYEE(FIRST_NAME,
     LAST_NAME, AGE, SEX, INCOME)
     VALUES ('Mac', 'Mohan', 20, 'M', 2000)"""
try:
  # 执行sql语句
  cursor.execute(sql)
  # 提交到数据库执行
  db.commit()
except:
  # Rollback in case there is any error
  db.rollback()

# 关闭数据库连接
db.close()

4、查询

#!/usr/bin/python # -*- coding: UTF-8 -*- import MySQLdb # 打开数据库连接 db = MySQLdb.connect("localhost","testuser","test123","TESTDB" ) # 使用cursor()方法获取操作游标 cursor = db.cursor() # SQL 查询语句 sql = "SELECT * FROM EMPLOYEE \ WHERE INCOME > '%d'" % (1000) try: # 执行SQL语句 cursor.execute(sql) # 获取所有记录列表 results = cursor.fetchall() for row in results: fname = row[0] lname = row[1] age = row[2] sex = row[3] income = row[4] # 打印结果 print "fname=%s,lname=%s,age=%d,sex=%s,income=%d" % \ (fname, lname, age, sex, income ) except: print "Error: unable to fecth data" # 关闭数据库连接 db.close()

5、更新

#!/usr/bin/python
# -*- coding: UTF-8 -*-

import MySQLdb

# 打开数据库连接
db = MySQLdb.connect("localhost","testuser","test123","TESTDB" )

# 使用cursor()方法获取操作游标 
cursor = db.cursor()

# SQL 更新语句
sql = "UPDATE EMPLOYEE SET AGE = AGE + 1
             WHERE SEX = '%c'" % ('M')
try:
  # 执行SQL语句
  cursor.execute(sql)
  # 提交到数据库执行
  db.commit()
except:
  # 发生错误时回滚
  db.rollback()

# 关闭数据库连接
db.close()

三、Socket

1、服务器

from socket import *
from time import ctime

HOST = ''
PORT = 21568
BUFSIZ = 1024
ADDR = (HOST, PORT)

tcpSerSock = socket(AF_INET, SOCK_STREAM)
tcpSerSock.bind(ADDR)
tcpSerSock.listen(5)

while True:
  print 'waiting for connection...'
  tcpCliSock, addr = tcpSerSock.accept() 
  print '...connected from:', addr

  while True:
    try:
      data = tcpCliSock.recv(BUFSIZ) 
      print '<', data
      tcpCliSock.send('[%s] %s' % (ctime(), data)) 
    except:
      print 'disconnect from:', addr
      tcpCliSock.close() 
      break
tcpSerSock.close()

2、客户端

from socket import *

HOST = 'localhost'
PORT = 21568
BUFSIZ = 1024
ADDR = (HOST, PORT)

tcpCliSock = socket(AF_INET, SOCK_STREAM)
tcpCliSock.connect(ADDR) 

try:
  while True:
    data = raw_input('>')
    if data == 'close':
      break
    if not data:
      continue
    tcpCliSock.send(data) 
    data = tcpCliSock.recv(BUFSIZ) 
    print data
except:
  tcpCliSock.close() 

四、多线程

import time, threading

# 新线程执行的代码:
def loop():
  print 'thread %s is running...' % threading.current_thread().name
  n = 0
  while n < 5:
    n = n + 1
    print 'thread %s >>> %s' % (threading.current_thread().name, n)
    time.sleep(1)
  print 'thread %s ended.' % threading.current_thread().name

print 'thread %s is running...' % threading.current_thread().name
t = threading.Thread(target=loop, name='LoopThread')
t.start()
t.join()
print 'thread %s ended.' % threading.current_thread().name

还请大家积极补充!

人气教程排行