csv2mysql
时间:2021-07-01 10:21:17
帮助过:5人阅读
os
import re
import sys
import csv
import time
import argparse
import collections
import MySQLdb
import warnings
# suppress annoying mysql warnings
warnings.filterwarnings(action=
‘ignore‘, category=
MySQLdb.Warning)
def get_type(s):
"""Find type for this string
"""
# try integer type
try:
v =
int(s)
except ValueError:
pass
else:
if abs(v) > 2147483647
:
return ‘bigint‘
else:
return ‘int‘
# try float type
try:
float(s)
except ValueError:
pass
else:
return ‘double‘
# check for timestamp
dt_formats =
(
(‘%Y-%m-%d %H:%M:%S‘,
‘datetime‘),
(‘%Y-%m-%d %H:%M:%S.%f‘,
‘datetime‘),
(‘%Y-%m-%d‘,
‘date‘),
(‘%H:%M:%S‘,
‘time‘),
)
for dt_format, dt_type
in dt_formats:
try:
time.strptime(s, dt_format)
except ValueError:
pass
else:
return dt_type
# doesn‘t match any other types so assume text
if len(s) > 255
:
return ‘text‘
else:
return ‘varchar(255)‘
def most_common(l, default=
‘varchar(255)‘):
"""Return most common value from list
"""
# some formats trump others
if l:
for dt_type
in (
‘text‘,
‘bigint‘):
if dt_type
in l:
return dt_type
return max(l, key=
l.count)
return default
def get_col_types(input_file, max_rows=1000
):
"""Find the type for each CSV column
"""
csv_types =
collections.defaultdict(list)
reader =
csv.reader(open(input_file))
# test the first few rows for their data types
for row_i, row
in enumerate(reader):
if row_i ==
0:
header =
row
else:
for col_i, s
in enumerate(row):
data_type =
get_type(s)
csv_types[header[col_i]].append(data_type)
if row_i ==
max_rows:
break
# take the most common data type for each row
return [most_common(csv_types[col])
for col
in header]
def get_schema(table, header, col_types):
"""Generate the schema for this table from given types and columns
"""
schema_sql =
"""CREATE TABLE IF NOT EXISTS %s (
id int NOT NULL AUTO_INCREMENT,""" %
table
for col_name, col_type
in zip(header, col_types):
schema_sql +=
‘\n%s %s,‘ %
(col_name, col_type)
schema_sql +=
"""\nPRIMARY KEY (id)
) DEFAULT CHARSET=utf8;"""
return schema_sql
def get_insert(table, header):
"""Generate the SQL for inserting rows
"""
field_names =
‘, ‘.join(header)
field_markers =
‘, ‘.join(
‘%s‘ for col
in header)
return ‘INSERT INTO %s (%s) VALUES (%s);‘ %
(table, field_names, field_markers)
def format_header(row):
"""Format column names to remove illegal characters and duplicates
"""
safe_col =
lambda s: re.sub(
‘\W+‘,
‘_‘, s.lower()).strip(
‘_‘)
header =
[]
counts =
collections.defaultdict(int)
for col
in row:
col =
safe_col(col)
counts[col] += 1
if counts[col] > 1
:
col =
‘{}{}‘.format(col, counts[col])
header.append(col)
return header
def main(input_file, user, password, host, table, database, max_inserts=10000
):
print "Importing `%s‘ into MySQL database `%s.%s‘" %
(input_file, database, table)
db = MySQLdb.connect(host=host, user=user, passwd=password, charset=
‘utf8‘)
cursor =
db.cursor()
# create database and if doesn‘t exist
cursor.execute(
‘CREATE DATABASE IF NOT EXISTS %s;‘ %
database)
db.select_db(database)
# define table
print ‘Analyzing column types ...‘
col_types =
get_col_types(input_file)
print col_types
header =
None
for i, row
in enumerate(csv.reader(open(input_file))):
if header:
while len(row) <
len(header):
row.append(‘‘)
# this row is missing columns so pad blank values
cursor.execute(insert_sql, row)
if i % max_inserts ==
0:
db.commit()
print ‘commit‘
else:
header =
format_header(row)
schema_sql =
get_schema(table, header, col_types)
print schema_sql
# create table
cursor.execute(
‘DROP TABLE IF EXISTS %s;‘ %
table)
cursor.execute(schema_sql)
# create index for more efficient access
try:
cursor.execute(‘CREATE INDEX ids ON %s (id);‘ %
table)
except MySQLdb.OperationalError:
pass # index already exists
print ‘Inserting rows ...‘
# SQL string for inserting data
insert_sql =
get_insert(table, header)
# commit rows to database
print ‘Committing rows to database ...‘
db.commit()
print ‘Done!‘
if __name__ ==
‘__main__‘:
parser = argparse.ArgumentParser(description=
‘Automatically insert CSV contents into MySQL‘)
parser.add_argument(‘--table‘, dest=
‘table‘, help=
‘Set the name of the table. If not set the CSV filename will be used‘)
parser.add_argument(‘--database‘, dest=
‘database‘, default=
‘test‘, help=
‘Set the name of the database. If not set the test database will be used‘)
parser.add_argument(‘--user‘, dest=
‘user‘, default=
‘root‘, help=
‘The MySQL login username‘)
parser.add_argument(‘--password‘, dest=
‘password‘, default=
‘‘, help=
‘The MySQL login password‘)
parser.add_argument(‘--host‘, dest=
‘host‘, default=
‘localhost‘, help=
‘The MySQL host‘)
parser.add_argument(‘input_file‘, help=
‘The input CSV file‘)
args = parser.parse_args(sys.argv[1
:])
if not args.table:
# use input file name for table
args.table =
os.path.splitext(os.path.basename(args.input_file))[0]
main(args.input_file, args.user, args.password, args.host, args.table, args.database)
具体使用例子如下
[root@server1]# python csv2mysql.py --host=172.20.197.61 --user=PdYRxGWNpVRCQfHj --password=RX5a5YsViQcDdywr --database=cf_dc61100a_92a7_43ca_81dd_2a7e3fa0808a --table=performance_history_2 performance_history_2.csv
Importing `performance_history_2.csv‘ into MySQL database `cf_dc61100a_92a7_43ca_81dd_2a7e3fa0808a.performance_history_2‘
Analyzing column types ...
[‘datetime‘, ‘varchar(255)‘, ‘varchar(255)‘, ‘varchar(255)‘, ‘varchar(255)‘, ‘varchar(255)‘, ‘varchar(255)‘, ‘varchar(255)‘]
CREATE TABLE IF NOT EXISTS performance_history_2 (
id int NOT NULL AUTO_INCREMENT,
date_time datetime,
write_bw_mb_s varchar(255),
read_bw_mb_s varchar(255),
write_iops varchar(255),
read_iops varchar(255),
write_latency_usec varchar(255),
read_latency_usec varchar(255),
avg_latency_usec varchar(255),
PRIMARY KEY (id)
) DEFAULT CHARSET=utf8;
Inserting rows ...
commit
commit
commit
commit
commit
commit
commit
commit
commit
commit
commit
Committing rows to database ...
Done!
csv2mysql
标签:ons return creat operation orm zip path input schema